This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 376632713c Fix insert OOM && Optimize new standalone auto create 
schema (#6204)
376632713c is described below

commit 376632713c2e1e4a76b65f62fdf3ee0d07615717
Author: Haonan <[email protected]>
AuthorDate: Thu Jun 9 21:51:16 2022 +0800

    Fix insert OOM && Optimize new standalone auto create schema (#6204)
---
 .../iotdb/db/engine/storagegroup/DataRegion.java   |   3 +-
 .../org/apache/iotdb/db/mpp/plan/Coordinator.java  |   4 +-
 .../mpp/plan/analyze/StandaloneSchemaFetcher.java  | 135 +++++++++------------
 .../planner/plan/node/write/InsertTabletNode.java  |   7 ++
 .../db/mpp/plan/StandaloneCoordinatorTest.java     |  15 ++-
 .../plan/scheduler/StandaloneSchedulerTest.java    |  58 +++++++--
 6 files changed, 124 insertions(+), 98 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 480311094a..0f571931f0 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.StorageEngineV2;
 import org.apache.iotdb.db.engine.compaction.CompactionRecoverManager;
 import org.apache.iotdb.db.engine.compaction.CompactionScheduler;
 import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
@@ -1048,7 +1049,7 @@ public class DataRegion {
       int before = loc;
       // before time partition
       long beforeTimePartition =
-          StorageEngine.getTimePartition(insertTabletNode.getTimes()[before]);
+          
StorageEngineV2.getTimePartition(insertTabletNode.getTimes()[before]);
       // init map
       long lastFlushTime =
           lastFlushTimeManager.ensureFlushedTimePartitionAndInit(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index d004e328bd..d9ba6a3716 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -130,7 +130,9 @@ public class Coordinator {
                   DataNodeEndPoints.LOCAL_HOST_INTERNAL_ENDPOINT),
               partitionFetcher,
               schemaFetcher);
-      queryExecutionMap.put(queryId, execution);
+      if (execution.isQuery()) {
+        queryExecutionMap.put(queryId, execution);
+      }
       execution.start();
 
       return execution.getStatus();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
index efff9ea7e3..599d608a80 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.db.mpp.plan.analyze;
 
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.partition.SchemaPartition;
@@ -31,13 +30,8 @@ import 
org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
 import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
-import org.apache.iotdb.db.mpp.plan.Coordinator;
-import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
-import org.apache.iotdb.db.mpp.plan.statement.Statement;
-import 
org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
-import 
org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesByDeviceStatement;
-import org.apache.iotdb.db.query.control.SessionManager;
-import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -56,10 +50,8 @@ import static 
org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncodin
 public class StandaloneSchemaFetcher implements ISchemaFetcher {
 
   private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-  private final Coordinator coordinator = Coordinator.getInstance();
   private final LocalConfigNode localConfigNode = 
LocalConfigNode.getInstance();
   private final SchemaEngine schemaEngine = SchemaEngine.getInstance();
-  private final IPartitionFetcher partitionFetcher = 
StandalonePartitionFetcher.getInstance();
 
   private StandaloneSchemaFetcher() {}
 
@@ -98,6 +90,26 @@ public class StandaloneSchemaFetcher implements 
ISchemaFetcher {
     return fetchSchema(patternTree);
   }
 
+  private SchemaTree fetchSchemaForWrite(PathPatternTree patternTree) {
+    patternTree.constructTree();
+    Set<String> storageGroupSet = new HashSet<>();
+    SchemaTree schemaTree = new SchemaTree();
+    List<PartialPath> partialPathList = patternTree.splitToPathList();
+    try {
+      for (PartialPath path : partialPathList) {
+        String storageGroup = 
localConfigNode.getBelongedStorageGroup(path).getFullPath();
+        storageGroupSet.add(storageGroup);
+        SchemaRegionId schemaRegionId = 
localConfigNode.getBelongedSchemaRegionId(path);
+        ISchemaRegion schemaRegion = 
schemaEngine.getSchemaRegion(schemaRegionId);
+        
schemaTree.appendMeasurementPaths(schemaRegion.getMeasurementPaths(path, 
false));
+      }
+    } catch (MetadataException e) {
+      throw new RuntimeException(e);
+    }
+    schemaTree.setStorageGroups(new ArrayList<>(storageGroupSet));
+    return schemaTree;
+  }
+
   @Override
   public SchemaTree fetchSchemaWithAutoCreate(
       PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, 
boolean aligned) {
@@ -112,14 +124,12 @@ public class StandaloneSchemaFetcher implements 
ISchemaFetcher {
     SchemaTree fetchedSchemaTree;
 
     if (!config.isAutoCreateSchemaEnabled()) {
-      fetchedSchemaTree =
-          fetchSchema(patternTree, 
partitionFetcher.getSchemaPartition(patternTree));
+      fetchedSchemaTree = fetchSchemaForWrite(patternTree);
       schemaTree.mergeSchemaTree(fetchedSchemaTree);
       return schemaTree;
     }
 
-    fetchedSchemaTree =
-        fetchSchema(patternTree, 
partitionFetcher.getOrCreateSchemaPartition(patternTree));
+    fetchedSchemaTree = fetchSchemaForWrite(patternTree);
     schemaTree.mergeSchemaTree(fetchedSchemaTree);
 
     SchemaTree missingSchemaTree =
@@ -150,14 +160,12 @@ public class StandaloneSchemaFetcher implements 
ISchemaFetcher {
     SchemaTree fetchedSchemaTree;
 
     if (!config.isAutoCreateSchemaEnabled()) {
-      fetchedSchemaTree =
-          fetchSchema(patternTree, 
partitionFetcher.getSchemaPartition(patternTree));
+      fetchedSchemaTree = fetchSchemaForWrite(patternTree);
       schemaTree.mergeSchemaTree(fetchedSchemaTree);
       return schemaTree;
     }
 
-    fetchedSchemaTree =
-        fetchSchema(patternTree, 
partitionFetcher.getOrCreateSchemaPartition(patternTree));
+    fetchedSchemaTree = fetchSchemaForWrite(patternTree);
     schemaTree.mergeSchemaTree(fetchedSchemaTree);
 
     SchemaTree missingSchemaTree;
@@ -253,68 +261,41 @@ public class StandaloneSchemaFetcher implements 
ISchemaFetcher {
       List<String> measurements,
       List<TSDataType> tsDataTypes,
       boolean isAligned) {
-
-    if (isAligned) {
-      CreateAlignedTimeSeriesStatement createAlignedTimeSeriesStatement =
-          new CreateAlignedTimeSeriesStatement();
-      createAlignedTimeSeriesStatement.setDevicePath(devicePath);
-      createAlignedTimeSeriesStatement.setMeasurements(measurements);
-      createAlignedTimeSeriesStatement.setDataTypes(tsDataTypes);
-      List<TSEncoding> encodings = new ArrayList<>();
-      List<CompressionType> compressors = new ArrayList<>();
-      for (TSDataType dataType : tsDataTypes) {
-        encodings.add(getDefaultEncoding(dataType));
-        
compressors.add(TSFileDescriptor.getInstance().getConfig().getCompressor());
-      }
-      createAlignedTimeSeriesStatement.setEncodings(encodings);
-      createAlignedTimeSeriesStatement.setCompressors(compressors);
-      createAlignedTimeSeriesStatement.setAliasList(null);
-
-      executeCreateStatement(createAlignedTimeSeriesStatement);
-    } else {
-
-      executeCreateTimeseriesByDeviceStatement(
-          new CreateTimeSeriesByDeviceStatement(devicePath, measurements, 
tsDataTypes));
-    }
-  }
-
-  private void executeCreateStatement(Statement statement) {
-    long queryId = SessionManager.getInstance().requestQueryId(false);
-    ExecutionResult executionResult =
-        coordinator.execute(statement, queryId, null, "", partitionFetcher, 
this);
-    // TODO: throw exception
-    try {
-      int statusCode = executionResult.status.getCode();
-      if (statusCode != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-          && statusCode != 
TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
-        throw new RuntimeException(
-            "cannot auto create schema, status is: " + executionResult.status);
-      }
-    } finally {
-      coordinator.getQueryExecution(queryId).stopAndCleanup();
-    }
-  }
-
-  private void executeCreateTimeseriesByDeviceStatement(
-      CreateTimeSeriesByDeviceStatement statement) {
-    long queryId = SessionManager.getInstance().requestQueryId(false);
-    ExecutionResult executionResult =
-        coordinator.execute(statement, queryId, null, "", partitionFetcher, 
this);
-    // TODO: throw exception
     try {
-      int statusCode = executionResult.status.getCode();
-      if (statusCode == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        return;
-      }
-
-      for (TSStatus subStatus : executionResult.status.subStatus) {
-        if (subStatus.code != 
TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
-          throw new RuntimeException(
-              "cannot auto create schema, status is: " + 
executionResult.status);
+      if (isAligned) {
+        CreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan = new 
CreateAlignedTimeSeriesPlan();
+        createAlignedTimeSeriesPlan.setPrefixPath(devicePath);
+        createAlignedTimeSeriesPlan.setMeasurements(measurements);
+        createAlignedTimeSeriesPlan.setDataTypes(tsDataTypes);
+        List<TSEncoding> encodings = new ArrayList<>();
+        List<CompressionType> compressors = new ArrayList<>();
+        for (TSDataType dataType : tsDataTypes) {
+          encodings.add(getDefaultEncoding(dataType));
+          
compressors.add(TSFileDescriptor.getInstance().getConfig().getCompressor());
+        }
+        createAlignedTimeSeriesPlan.setEncodings(encodings);
+        createAlignedTimeSeriesPlan.setCompressors(compressors);
+        SchemaRegionId schemaRegionId =
+            
localConfigNode.getBelongedSchemaRegionIdWithAutoCreate(devicePath);
+        ISchemaRegion schemaRegion = 
schemaEngine.getSchemaRegion(schemaRegionId);
+        schemaRegion.createAlignedTimeSeries(createAlignedTimeSeriesPlan);
+      } else {
+        for (int i = 0; i < measurements.size(); i++) {
+          CreateTimeSeriesPlan createTimeSeriesPlan = new 
CreateTimeSeriesPlan();
+          createTimeSeriesPlan.setPath(
+              new PartialPath(devicePath.getFullPath(), measurements.get(i)));
+          createTimeSeriesPlan.setDataType(tsDataTypes.get(i));
+          
createTimeSeriesPlan.setEncoding(getDefaultEncoding(tsDataTypes.get(i)));
+          createTimeSeriesPlan.setCompressor(
+              TSFileDescriptor.getInstance().getConfig().getCompressor());
+          SchemaRegionId schemaRegionId =
+              
localConfigNode.getBelongedSchemaRegionIdWithAutoCreate(devicePath);
+          ISchemaRegion schemaRegion = 
schemaEngine.getSchemaRegion(schemaRegionId);
+          schemaRegion.createTimeseries(createTimeSeriesPlan, -1);
         }
       }
-    } finally {
-      coordinator.getQueryExecution(queryId).stopAndCleanup();
+    } catch (MetadataException e) {
+      throw new RuntimeException("cannot auto create schema ", e);
     }
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
index db9baaf74c..794520470e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
@@ -235,6 +235,13 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
     for (Map.Entry<TRegionReplicaSet, List<Integer>> entry : 
splitMap.entrySet()) {
       // generate a new times and values
       locs = entry.getValue();
+      // Avoid using system arraycopy when there is no need to split
+      if (splitMap.size() == 1) {
+        setRange(locs);
+        setDataRegionReplicaSet(entry.getKey());
+        result.add(this);
+        return result;
+      }
       int count = 0;
       for (int i = 0; i < locs.size(); i += 2) {
         int start = locs.get(i);
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/StandaloneCoordinatorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/StandaloneCoordinatorTest.java
index a3dfc999e6..8ebf7f8b72 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/StandaloneCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/StandaloneCoordinatorTest.java
@@ -77,6 +77,7 @@ public class StandaloneCoordinatorTest {
   @After
   public void tearDown() throws Exception {
     configNode.clear();
+    WALManager.getInstance().clear();
     WALManager.getInstance().stop();
     StorageEngineV2.getInstance().stop();
     FlushManager.getInstance().stop();
@@ -113,7 +114,7 @@ public class StandaloneCoordinatorTest {
             put("tag2", "v2");
           }
         });
-    executeStatement(createTimeSeriesStatement);
+    executeStatement(createTimeSeriesStatement, false);
   }
 
   @Test
@@ -121,7 +122,7 @@ public class StandaloneCoordinatorTest {
 
     String insertSql = "insert into root.sg.d1(time,s1,s2) values 
(100,222,333)";
     Statement insertStmt = StatementGenerator.createStatement(insertSql, 
ZoneId.systemDefault());
-    executeStatement(insertStmt);
+    executeStatement(insertStmt, false);
   }
 
   @Test
@@ -129,18 +130,20 @@ public class StandaloneCoordinatorTest {
     String createUserSql = "create user username 'password'";
     Statement createStmt =
         StatementGenerator.createStatement(createUserSql, 
ZoneId.systemDefault());
-    executeStatement(createStmt);
+    executeStatement(createStmt, false);
   }
 
-  private void executeStatement(Statement statement) {
-    long queryId = SessionManager.getInstance().requestQueryId(false);
+  private void executeStatement(Statement statement, boolean isDataQuery) {
+    long queryId = SessionManager.getInstance().requestQueryId(isDataQuery);
     ExecutionResult executionResult =
         coordinator.execute(statement, queryId, null, "", partitionFetcher, 
schemaFetcher);
     try {
       int statusCode = executionResult.status.getCode();
       Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
statusCode);
     } finally {
-      coordinator.getQueryExecution(queryId).stopAndCleanup();
+      if (isDataQuery) {
+        coordinator.getQueryExecution(queryId).stopAndCleanup();
+      }
     }
   }
 }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
index ee16c73244..ce58dd7e41 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
@@ -155,9 +155,15 @@ public class StandaloneSchedulerTest {
             executor,
             null,
             null);
-    standaloneScheduler.start();
-
-    Assert.assertEquals(QueryState.FINISHED, stateMachine.getState());
+    try {
+      standaloneScheduler.start();
+      Assert.assertEquals(QueryState.FINISHED, stateMachine.getState());
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail();
+    } finally {
+      standaloneScheduler.stop();
+    }
   }
 
   @Test
@@ -248,9 +254,15 @@ public class StandaloneSchedulerTest {
             executor,
             null,
             null);
-    standaloneScheduler.start();
-
-    Assert.assertEquals(QueryState.FINISHED, stateMachine.getState());
+    try {
+      standaloneScheduler.start();
+      Assert.assertEquals(QueryState.FINISHED, stateMachine.getState());
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail();
+    } finally {
+      standaloneScheduler.stop();
+    }
   }
 
   @Test
@@ -351,9 +363,15 @@ public class StandaloneSchedulerTest {
             executor,
             null,
             null);
-    standaloneScheduler.start();
-
-    Assert.assertEquals(QueryState.FINISHED, stateMachine.getState());
+    try {
+      standaloneScheduler.start();
+      Assert.assertEquals(QueryState.FINISHED, stateMachine.getState());
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail();
+    } finally {
+      standaloneScheduler.stop();
+    }
   }
 
   @Test
@@ -393,8 +411,15 @@ public class StandaloneSchedulerTest {
             executor,
             null,
             null);
-    standaloneScheduler.start();
-    Assert.assertEquals(QueryState.FINISHED, stateMachine.getState());
+    try {
+      standaloneScheduler.start();
+      Assert.assertEquals(QueryState.FINISHED, stateMachine.getState());
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail();
+    } finally {
+      standaloneScheduler.stop();
+    }
   }
 
   @Test
@@ -463,8 +488,15 @@ public class StandaloneSchedulerTest {
             executor,
             null,
             null);
-    standaloneScheduler.start();
-    Assert.assertEquals(QueryState.FINISHED, stateMachine.getState());
+    try {
+      standaloneScheduler.start();
+      Assert.assertEquals(QueryState.FINISHED, stateMachine.getState());
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail();
+    } finally {
+      standaloneScheduler.stop();
+    }
   }
 
   private TRegionReplicaSet genRegionReplicaSet(TConsensusGroupType type) {

Reply via email to