This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 376632713c Fix insert OOM && Optimize new standalone auto create
schema (#6204)
376632713c is described below
commit 376632713c2e1e4a76b65f62fdf3ee0d07615717
Author: Haonan <[email protected]>
AuthorDate: Thu Jun 9 21:51:16 2022 +0800
Fix insert OOM && Optimize new standalone auto create schema (#6204)
---
.../iotdb/db/engine/storagegroup/DataRegion.java | 3 +-
.../org/apache/iotdb/db/mpp/plan/Coordinator.java | 4 +-
.../mpp/plan/analyze/StandaloneSchemaFetcher.java | 135 +++++++++------------
.../planner/plan/node/write/InsertTabletNode.java | 7 ++
.../db/mpp/plan/StandaloneCoordinatorTest.java | 15 ++-
.../plan/scheduler/StandaloneSchedulerTest.java | 58 +++++++--
6 files changed, 124 insertions(+), 98 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 480311094a..0f571931f0 100755
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.engine.compaction.CompactionRecoverManager;
import org.apache.iotdb.db.engine.compaction.CompactionScheduler;
import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
@@ -1048,7 +1049,7 @@ public class DataRegion {
int before = loc;
// before time partition
long beforeTimePartition =
- StorageEngine.getTimePartition(insertTabletNode.getTimes()[before]);
+
StorageEngineV2.getTimePartition(insertTabletNode.getTimes()[before]);
// init map
long lastFlushTime =
lastFlushTimeManager.ensureFlushedTimePartitionAndInit(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index d004e328bd..d9ba6a3716 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -130,7 +130,9 @@ public class Coordinator {
DataNodeEndPoints.LOCAL_HOST_INTERNAL_ENDPOINT),
partitionFetcher,
schemaFetcher);
- queryExecutionMap.put(queryId, execution);
+ if (execution.isQuery()) {
+ queryExecutionMap.put(queryId, execution);
+ }
execution.start();
return execution.getStatus();
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
index efff9ea7e3..599d608a80 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.mpp.plan.analyze;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.partition.SchemaPartition;
@@ -31,13 +30,8 @@ import
org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
-import org.apache.iotdb.db.mpp.plan.Coordinator;
-import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
-import org.apache.iotdb.db.mpp.plan.statement.Statement;
-import
org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
-import
org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesByDeviceStatement;
-import org.apache.iotdb.db.query.control.SessionManager;
-import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -56,10 +50,8 @@ import static
org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncodin
public class StandaloneSchemaFetcher implements ISchemaFetcher {
private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- private final Coordinator coordinator = Coordinator.getInstance();
private final LocalConfigNode localConfigNode =
LocalConfigNode.getInstance();
private final SchemaEngine schemaEngine = SchemaEngine.getInstance();
- private final IPartitionFetcher partitionFetcher =
StandalonePartitionFetcher.getInstance();
private StandaloneSchemaFetcher() {}
@@ -98,6 +90,26 @@ public class StandaloneSchemaFetcher implements
ISchemaFetcher {
return fetchSchema(patternTree);
}
+ private SchemaTree fetchSchemaForWrite(PathPatternTree patternTree) {
+ patternTree.constructTree();
+ Set<String> storageGroupSet = new HashSet<>();
+ SchemaTree schemaTree = new SchemaTree();
+ List<PartialPath> partialPathList = patternTree.splitToPathList();
+ try {
+ for (PartialPath path : partialPathList) {
+ String storageGroup =
localConfigNode.getBelongedStorageGroup(path).getFullPath();
+ storageGroupSet.add(storageGroup);
+ SchemaRegionId schemaRegionId =
localConfigNode.getBelongedSchemaRegionId(path);
+ ISchemaRegion schemaRegion =
schemaEngine.getSchemaRegion(schemaRegionId);
+
schemaTree.appendMeasurementPaths(schemaRegion.getMeasurementPaths(path,
false));
+ }
+ } catch (MetadataException e) {
+ throw new RuntimeException(e);
+ }
+ schemaTree.setStorageGroups(new ArrayList<>(storageGroupSet));
+ return schemaTree;
+ }
+
@Override
public SchemaTree fetchSchemaWithAutoCreate(
PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes,
boolean aligned) {
@@ -112,14 +124,12 @@ public class StandaloneSchemaFetcher implements
ISchemaFetcher {
SchemaTree fetchedSchemaTree;
if (!config.isAutoCreateSchemaEnabled()) {
- fetchedSchemaTree =
- fetchSchema(patternTree,
partitionFetcher.getSchemaPartition(patternTree));
+ fetchedSchemaTree = fetchSchemaForWrite(patternTree);
schemaTree.mergeSchemaTree(fetchedSchemaTree);
return schemaTree;
}
- fetchedSchemaTree =
- fetchSchema(patternTree,
partitionFetcher.getOrCreateSchemaPartition(patternTree));
+ fetchedSchemaTree = fetchSchemaForWrite(patternTree);
schemaTree.mergeSchemaTree(fetchedSchemaTree);
SchemaTree missingSchemaTree =
@@ -150,14 +160,12 @@ public class StandaloneSchemaFetcher implements
ISchemaFetcher {
SchemaTree fetchedSchemaTree;
if (!config.isAutoCreateSchemaEnabled()) {
- fetchedSchemaTree =
- fetchSchema(patternTree,
partitionFetcher.getSchemaPartition(patternTree));
+ fetchedSchemaTree = fetchSchemaForWrite(patternTree);
schemaTree.mergeSchemaTree(fetchedSchemaTree);
return schemaTree;
}
- fetchedSchemaTree =
- fetchSchema(patternTree,
partitionFetcher.getOrCreateSchemaPartition(patternTree));
+ fetchedSchemaTree = fetchSchemaForWrite(patternTree);
schemaTree.mergeSchemaTree(fetchedSchemaTree);
SchemaTree missingSchemaTree;
@@ -253,68 +261,41 @@ public class StandaloneSchemaFetcher implements
ISchemaFetcher {
List<String> measurements,
List<TSDataType> tsDataTypes,
boolean isAligned) {
-
- if (isAligned) {
- CreateAlignedTimeSeriesStatement createAlignedTimeSeriesStatement =
- new CreateAlignedTimeSeriesStatement();
- createAlignedTimeSeriesStatement.setDevicePath(devicePath);
- createAlignedTimeSeriesStatement.setMeasurements(measurements);
- createAlignedTimeSeriesStatement.setDataTypes(tsDataTypes);
- List<TSEncoding> encodings = new ArrayList<>();
- List<CompressionType> compressors = new ArrayList<>();
- for (TSDataType dataType : tsDataTypes) {
- encodings.add(getDefaultEncoding(dataType));
-
compressors.add(TSFileDescriptor.getInstance().getConfig().getCompressor());
- }
- createAlignedTimeSeriesStatement.setEncodings(encodings);
- createAlignedTimeSeriesStatement.setCompressors(compressors);
- createAlignedTimeSeriesStatement.setAliasList(null);
-
- executeCreateStatement(createAlignedTimeSeriesStatement);
- } else {
-
- executeCreateTimeseriesByDeviceStatement(
- new CreateTimeSeriesByDeviceStatement(devicePath, measurements,
tsDataTypes));
- }
- }
-
- private void executeCreateStatement(Statement statement) {
- long queryId = SessionManager.getInstance().requestQueryId(false);
- ExecutionResult executionResult =
- coordinator.execute(statement, queryId, null, "", partitionFetcher,
this);
- // TODO: throw exception
- try {
- int statusCode = executionResult.status.getCode();
- if (statusCode != TSStatusCode.SUCCESS_STATUS.getStatusCode()
- && statusCode !=
TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
- throw new RuntimeException(
- "cannot auto create schema, status is: " + executionResult.status);
- }
- } finally {
- coordinator.getQueryExecution(queryId).stopAndCleanup();
- }
- }
-
- private void executeCreateTimeseriesByDeviceStatement(
- CreateTimeSeriesByDeviceStatement statement) {
- long queryId = SessionManager.getInstance().requestQueryId(false);
- ExecutionResult executionResult =
- coordinator.execute(statement, queryId, null, "", partitionFetcher,
this);
- // TODO: throw exception
try {
- int statusCode = executionResult.status.getCode();
- if (statusCode == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return;
- }
-
- for (TSStatus subStatus : executionResult.status.subStatus) {
- if (subStatus.code !=
TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
- throw new RuntimeException(
- "cannot auto create schema, status is: " +
executionResult.status);
+ if (isAligned) {
+ CreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan = new
CreateAlignedTimeSeriesPlan();
+ createAlignedTimeSeriesPlan.setPrefixPath(devicePath);
+ createAlignedTimeSeriesPlan.setMeasurements(measurements);
+ createAlignedTimeSeriesPlan.setDataTypes(tsDataTypes);
+ List<TSEncoding> encodings = new ArrayList<>();
+ List<CompressionType> compressors = new ArrayList<>();
+ for (TSDataType dataType : tsDataTypes) {
+ encodings.add(getDefaultEncoding(dataType));
+
compressors.add(TSFileDescriptor.getInstance().getConfig().getCompressor());
+ }
+ createAlignedTimeSeriesPlan.setEncodings(encodings);
+ createAlignedTimeSeriesPlan.setCompressors(compressors);
+ SchemaRegionId schemaRegionId =
+
localConfigNode.getBelongedSchemaRegionIdWithAutoCreate(devicePath);
+ ISchemaRegion schemaRegion =
schemaEngine.getSchemaRegion(schemaRegionId);
+ schemaRegion.createAlignedTimeSeries(createAlignedTimeSeriesPlan);
+ } else {
+ for (int i = 0; i < measurements.size(); i++) {
+ CreateTimeSeriesPlan createTimeSeriesPlan = new
CreateTimeSeriesPlan();
+ createTimeSeriesPlan.setPath(
+ new PartialPath(devicePath.getFullPath(), measurements.get(i)));
+ createTimeSeriesPlan.setDataType(tsDataTypes.get(i));
+
createTimeSeriesPlan.setEncoding(getDefaultEncoding(tsDataTypes.get(i)));
+ createTimeSeriesPlan.setCompressor(
+ TSFileDescriptor.getInstance().getConfig().getCompressor());
+ SchemaRegionId schemaRegionId =
+
localConfigNode.getBelongedSchemaRegionIdWithAutoCreate(devicePath);
+ ISchemaRegion schemaRegion =
schemaEngine.getSchemaRegion(schemaRegionId);
+ schemaRegion.createTimeseries(createTimeSeriesPlan, -1);
}
}
- } finally {
- coordinator.getQueryExecution(queryId).stopAndCleanup();
+ } catch (MetadataException e) {
+ throw new RuntimeException("cannot auto create schema ", e);
}
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
index db9baaf74c..794520470e 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
@@ -235,6 +235,13 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
for (Map.Entry<TRegionReplicaSet, List<Integer>> entry :
splitMap.entrySet()) {
// generate a new times and values
locs = entry.getValue();
+ // Avoid using system arraycopy when there is no need to split
+ if (splitMap.size() == 1) {
+ setRange(locs);
+ setDataRegionReplicaSet(entry.getKey());
+ result.add(this);
+ return result;
+ }
int count = 0;
for (int i = 0; i < locs.size(); i += 2) {
int start = locs.get(i);
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/StandaloneCoordinatorTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/StandaloneCoordinatorTest.java
index a3dfc999e6..8ebf7f8b72 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/StandaloneCoordinatorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/StandaloneCoordinatorTest.java
@@ -77,6 +77,7 @@ public class StandaloneCoordinatorTest {
@After
public void tearDown() throws Exception {
configNode.clear();
+ WALManager.getInstance().clear();
WALManager.getInstance().stop();
StorageEngineV2.getInstance().stop();
FlushManager.getInstance().stop();
@@ -113,7 +114,7 @@ public class StandaloneCoordinatorTest {
put("tag2", "v2");
}
});
- executeStatement(createTimeSeriesStatement);
+ executeStatement(createTimeSeriesStatement, false);
}
@Test
@@ -121,7 +122,7 @@ public class StandaloneCoordinatorTest {
String insertSql = "insert into root.sg.d1(time,s1,s2) values
(100,222,333)";
Statement insertStmt = StatementGenerator.createStatement(insertSql,
ZoneId.systemDefault());
- executeStatement(insertStmt);
+ executeStatement(insertStmt, false);
}
@Test
@@ -129,18 +130,20 @@ public class StandaloneCoordinatorTest {
String createUserSql = "create user username 'password'";
Statement createStmt =
StatementGenerator.createStatement(createUserSql,
ZoneId.systemDefault());
- executeStatement(createStmt);
+ executeStatement(createStmt, false);
}
- private void executeStatement(Statement statement) {
- long queryId = SessionManager.getInstance().requestQueryId(false);
+ private void executeStatement(Statement statement, boolean isDataQuery) {
+ long queryId = SessionManager.getInstance().requestQueryId(isDataQuery);
ExecutionResult executionResult =
coordinator.execute(statement, queryId, null, "", partitionFetcher,
schemaFetcher);
try {
int statusCode = executionResult.status.getCode();
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
statusCode);
} finally {
- coordinator.getQueryExecution(queryId).stopAndCleanup();
+ if (isDataQuery) {
+ coordinator.getQueryExecution(queryId).stopAndCleanup();
+ }
}
}
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
index ee16c73244..ce58dd7e41 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
@@ -155,9 +155,15 @@ public class StandaloneSchedulerTest {
executor,
null,
null);
- standaloneScheduler.start();
-
- Assert.assertEquals(QueryState.FINISHED, stateMachine.getState());
+ try {
+ standaloneScheduler.start();
+ Assert.assertEquals(QueryState.FINISHED, stateMachine.getState());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ } finally {
+ standaloneScheduler.stop();
+ }
}
@Test
@@ -248,9 +254,15 @@ public class StandaloneSchedulerTest {
executor,
null,
null);
- standaloneScheduler.start();
-
- Assert.assertEquals(QueryState.FINISHED, stateMachine.getState());
+ try {
+ standaloneScheduler.start();
+ Assert.assertEquals(QueryState.FINISHED, stateMachine.getState());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ } finally {
+ standaloneScheduler.stop();
+ }
}
@Test
@@ -351,9 +363,15 @@ public class StandaloneSchedulerTest {
executor,
null,
null);
- standaloneScheduler.start();
-
- Assert.assertEquals(QueryState.FINISHED, stateMachine.getState());
+ try {
+ standaloneScheduler.start();
+ Assert.assertEquals(QueryState.FINISHED, stateMachine.getState());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ } finally {
+ standaloneScheduler.stop();
+ }
}
@Test
@@ -393,8 +411,15 @@ public class StandaloneSchedulerTest {
executor,
null,
null);
- standaloneScheduler.start();
- Assert.assertEquals(QueryState.FINISHED, stateMachine.getState());
+ try {
+ standaloneScheduler.start();
+ Assert.assertEquals(QueryState.FINISHED, stateMachine.getState());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ } finally {
+ standaloneScheduler.stop();
+ }
}
@Test
@@ -463,8 +488,15 @@ public class StandaloneSchedulerTest {
executor,
null,
null);
- standaloneScheduler.start();
- Assert.assertEquals(QueryState.FINISHED, stateMachine.getState());
+ try {
+ standaloneScheduler.start();
+ Assert.assertEquals(QueryState.FINISHED, stateMachine.getState());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ } finally {
+ standaloneScheduler.stop();
+ }
}
private TRegionReplicaSet genRegionReplicaSet(TConsensusGroupType type) {