This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 638be8ae212 Pipe: Added batch to schema snapshot execution in template 
activation and timeseries creation & Fixed the bug that the 
"CreateMultiTimeSeries" group with alias is not idempotent (#12380)
638be8ae212 is described below

commit 638be8ae212ae7fff87a34d391a946705cf5749a
Author: Caideyipi <[email protected]>
AuthorDate: Wed Apr 24 12:09:20 2024 +0800

    Pipe: Added batch to schema snapshot execution in template activation and 
timeseries creation & Fixed the bug that the "CreateMultiTimeSeries" group with 
alias is not idempotent (#12380)
---
 .../schema/PipeSchemaRegionSnapshotEvent.java      |  55 +++---
 .../protocol/thrift/IoTDBDataNodeReceiver.java     |  56 ++++---
 .../visitor/PipeStatementTSStatusVisitor.java      |  52 +++---
 .../visitor/PipeStatementToBatchVisitor.java       | 185 +++++++++++++++++++++
 .../apache/iotdb/commons/conf/CommonConfig.java    |   9 +
 .../iotdb/commons/conf/CommonDescriptor.java       |   5 +
 .../iotdb/commons/pipe/config/PipeConfig.java      |   5 +
 7 files changed, 303 insertions(+), 64 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
index 541f0d4638e..e984c0c6f09 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
@@ -55,10 +55,20 @@ public class PipeSchemaRegionSnapshotEvent extends 
PipeSnapshotEvent {
     PLAN_NODE_2_STATEMENT_TYPE_MAP.put(
         PlanNodeType.CREATE_ALIGNED_TIME_SERIES.getNodeType(),
         StatementType.CREATE_ALIGNED_TIME_SERIES);
+    PLAN_NODE_2_STATEMENT_TYPE_MAP.put(
+        PlanNodeType.INTERNAL_CREATE_MULTI_TIMESERIES.getNodeType(),
+        StatementType.INTERNAL_CREATE_MULTI_TIMESERIES);
+
     PLAN_NODE_2_STATEMENT_TYPE_MAP.put(
         PlanNodeType.ACTIVATE_TEMPLATE.getNodeType(), 
StatementType.ACTIVATE_TEMPLATE);
+    PLAN_NODE_2_STATEMENT_TYPE_MAP.put(
+        PlanNodeType.BATCH_ACTIVATE_TEMPLATE.getNodeType(), 
StatementType.BATCH_ACTIVATE_TEMPLATE);
+
     PLAN_NODE_2_STATEMENT_TYPE_MAP.put(
         PlanNodeType.CREATE_LOGICAL_VIEW.getNodeType(), 
StatementType.CREATE_LOGICAL_VIEW);
+    // For logical view
+    PLAN_NODE_2_STATEMENT_TYPE_MAP.put(
+        PlanNodeType.ALTER_TIME_SERIES.getNodeType(), 
StatementType.ALTER_TIME_SERIES);
   }
 
   public PipeSchemaRegionSnapshotEvent() {
@@ -67,17 +77,17 @@ public class PipeSchemaRegionSnapshotEvent extends 
PipeSnapshotEvent {
   }
 
   public PipeSchemaRegionSnapshotEvent(
-      String mTreeSnapshotPath, String tagLogSnapshotPath, String 
databaseName) {
+      final String mTreeSnapshotPath, final String tagLogSnapshotPath, final 
String databaseName) {
     this(mTreeSnapshotPath, tagLogSnapshotPath, databaseName, null, null, 
null);
   }
 
   public PipeSchemaRegionSnapshotEvent(
-      String mTreeSnapshotPath,
-      String tagLogSnapshotPath,
-      String databaseName,
-      String pipeName,
-      PipeTaskMeta pipeTaskMeta,
-      PipePattern pattern) {
+      final String mTreeSnapshotPath,
+      final String tagLogSnapshotPath,
+      final String databaseName,
+      final String pipeName,
+      final PipeTaskMeta pipeTaskMeta,
+      final PipePattern pattern) {
     super(pipeName, pipeTaskMeta, pattern, PipeResourceManager.snapshot());
     this.mTreeSnapshotPath = mTreeSnapshotPath;
     this.tagLogSnapshotPath = Objects.nonNull(tagLogSnapshotPath) ? 
tagLogSnapshotPath : "";
@@ -97,14 +107,14 @@ public class PipeSchemaRegionSnapshotEvent extends 
PipeSnapshotEvent {
   }
 
   @Override
-  public boolean internallyIncreaseResourceReferenceCount(String 
holderMessage) {
+  public boolean internallyIncreaseResourceReferenceCount(final String 
holderMessage) {
     try {
       mTreeSnapshotPath = 
resourceManager.increaseSnapshotReference(mTreeSnapshotPath);
       if (!tagLogSnapshotPath.isEmpty()) {
         tagLogSnapshotPath = 
resourceManager.increaseSnapshotReference(tagLogSnapshotPath);
       }
       return true;
-    } catch (Exception e) {
+    } catch (final Exception e) {
       LOGGER.warn(
           String.format(
               "Increase reference count for mTree snapshot %s or tLog %s 
error. Holder Message: %s",
@@ -115,14 +125,14 @@ public class PipeSchemaRegionSnapshotEvent extends 
PipeSnapshotEvent {
   }
 
   @Override
-  public boolean internallyDecreaseResourceReferenceCount(String 
holderMessage) {
+  public boolean internallyDecreaseResourceReferenceCount(final String 
holderMessage) {
     try {
       resourceManager.decreaseSnapshotReference(mTreeSnapshotPath);
       if (!tagLogSnapshotPath.isEmpty()) {
         resourceManager.decreaseSnapshotReference(tagLogSnapshotPath);
       }
       return true;
-    } catch (Exception e) {
+    } catch (final Exception e) {
       LOGGER.warn(
           String.format(
               "Decrease reference count for mTree snapshot %s or tLog %s 
error. Holder Message: %s",
@@ -134,18 +144,18 @@ public class PipeSchemaRegionSnapshotEvent extends 
PipeSnapshotEvent {
 
   @Override
   public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-      String pipeName,
-      PipeTaskMeta pipeTaskMeta,
-      PipePattern pattern,
-      long startTime,
-      long endTime) {
+      final String pipeName,
+      final PipeTaskMeta pipeTaskMeta,
+      final PipePattern pattern,
+      final long startTime,
+      final long endTime) {
     return new PipeSchemaRegionSnapshotEvent(
         mTreeSnapshotPath, tagLogSnapshotPath, databaseName, pipeName, 
pipeTaskMeta, pattern);
   }
 
   @Override
   public ByteBuffer serializeToByteBuffer() {
-    ByteBuffer result =
+    final ByteBuffer result =
         ByteBuffer.allocate(
             Byte.BYTES
                 + 3 * Integer.BYTES
@@ -160,7 +170,7 @@ public class PipeSchemaRegionSnapshotEvent extends 
PipeSnapshotEvent {
   }
 
   @Override
-  public void deserializeFromByteBuffer(ByteBuffer buffer) {
+  public void deserializeFromByteBuffer(final ByteBuffer buffer) {
     mTreeSnapshotPath = ReadWriteIOUtils.readString(buffer);
     tagLogSnapshotPath = ReadWriteIOUtils.readString(buffer);
     databaseName = ReadWriteIOUtils.readString(buffer);
@@ -168,22 +178,23 @@ public class PipeSchemaRegionSnapshotEvent extends 
PipeSnapshotEvent {
 
   /////////////////////////////// Type parsing ///////////////////////////////
 
-  public static boolean needTransferSnapshot(Set<PlanNodeType> 
listenedTypeSet) {
+  public static boolean needTransferSnapshot(final Set<PlanNodeType> 
listenedTypeSet) {
     final Set<Short> types = new 
HashSet<>(PLAN_NODE_2_STATEMENT_TYPE_MAP.keySet());
     types.retainAll(
         
listenedTypeSet.stream().map(PlanNodeType::getNodeType).collect(Collectors.toSet()));
     return !types.isEmpty();
   }
 
-  public void confineTransferredTypes(Set<PlanNodeType> listenedTypeSet) {
+  public void confineTransferredTypes(final Set<PlanNodeType> listenedTypeSet) 
{
     final Set<Short> types = new 
HashSet<>(PLAN_NODE_2_STATEMENT_TYPE_MAP.keySet());
     types.retainAll(
         
listenedTypeSet.stream().map(PlanNodeType::getNodeType).collect(Collectors.toSet()));
     transferredTypes = types;
   }
 
-  public static Set<StatementType> getStatementTypeSet(String sealTypes) {
-    Map<Short, StatementType> statementTypeMap = new 
HashMap<>(PLAN_NODE_2_STATEMENT_TYPE_MAP);
+  public static Set<StatementType> getStatementTypeSet(final String sealTypes) 
{
+    final Map<Short, StatementType> statementTypeMap =
+        new HashMap<>(PLAN_NODE_2_STATEMENT_TYPE_MAP);
     statementTypeMap
         .keySet()
         .retainAll(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index c47ba81f5c9..ea8a1efc618 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -50,6 +50,7 @@ import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionSnapshotEven
 import org.apache.iotdb.db.pipe.receiver.visitor.PipePlanToStatementVisitor;
 import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementExceptionVisitor;
 import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementTSStatusVisitor;
+import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementToBatchVisitor;
 import org.apache.iotdb.db.protocol.session.SessionManager;
 import org.apache.iotdb.db.queryengine.common.SessionInfo;
 import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
@@ -89,6 +90,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -106,6 +108,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
   private final PipeStatementTSStatusVisitor statusVisitor = new 
PipeStatementTSStatusVisitor();
   private final PipeStatementExceptionVisitor exceptionVisitor =
       new PipeStatementExceptionVisitor();
+  private final PipeStatementToBatchVisitor batchVisitor = new 
PipeStatementToBatchVisitor();
 
   // Used for data transfer: confignode (cluster A) -> datanode (cluster B) -> 
confignode (cluster
   // B).
@@ -120,7 +123,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
       folderManager =
           new FolderManager(
               Arrays.asList(RECEIVER_FILE_BASE_DIRS), 
DirectoryStrategyType.SEQUENCE_STRATEGY);
-    } catch (DiskSpaceInsufficientException e) {
+    } catch (final DiskSpaceInsufficientException e) {
       LOGGER.error(
           "Fail to create pipe receiver file folders allocation strategy 
because all disks of folders are full.",
           e);
@@ -198,14 +201,15 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
           receiverId.get(),
           status);
       return new TPipeTransferResp(status);
-    } catch (IOException e) {
+    } catch (final IOException e) {
       final String error = String.format("Serialization error during pipe 
receiving, %s", e);
       LOGGER.warn("Receiver id = {}: {}", receiverId.get(), error, e);
       return new TPipeTransferResp(RpcUtils.getStatus(TSStatusCode.PIPE_ERROR, 
error));
     }
   }
 
-  private TPipeTransferResp 
handleTransferTabletInsertNode(PipeTransferTabletInsertNodeReq req) {
+  private TPipeTransferResp handleTransferTabletInsertNode(
+      final PipeTransferTabletInsertNodeReq req) {
     final InsertBaseStatement statement = req.constructStatement();
     return new TPipeTransferResp(
         statement.isEmpty()
@@ -213,7 +217,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
             : executeStatementAndClassifyExceptions(statement));
   }
 
-  private TPipeTransferResp 
handleTransferTabletBinary(PipeTransferTabletBinaryReq req) {
+  private TPipeTransferResp handleTransferTabletBinary(final 
PipeTransferTabletBinaryReq req) {
     final InsertBaseStatement statement = req.constructStatement();
     return new TPipeTransferResp(
         statement.isEmpty()
@@ -221,7 +225,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
             : executeStatementAndClassifyExceptions(statement));
   }
 
-  private TPipeTransferResp handleTransferTabletRaw(PipeTransferTabletRawReq 
req) {
+  private TPipeTransferResp handleTransferTabletRaw(final 
PipeTransferTabletRawReq req) {
     final InsertTabletStatement statement = req.constructStatement();
     return new TPipeTransferResp(
         statement.isEmpty()
@@ -229,7 +233,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
             : executeStatementAndClassifyExceptions(statement));
   }
 
-  private TPipeTransferResp 
handleTransferTabletBatch(PipeTransferTabletBatchReq req) {
+  private TPipeTransferResp handleTransferTabletBatch(final 
PipeTransferTabletBatchReq req) {
     final Pair<InsertRowsStatement, InsertMultiTabletsStatement> statementPair 
=
         req.constructStatements();
     return new TPipeTransferResp(
@@ -256,13 +260,14 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
   }
 
   @Override
-  protected TSStatus loadFileV1(PipeTransferFileSealReqV1 req, String 
fileAbsolutePath)
+  protected TSStatus loadFileV1(final PipeTransferFileSealReqV1 req, final 
String fileAbsolutePath)
       throws FileNotFoundException {
     return loadTsFile(fileAbsolutePath);
   }
 
   @Override
-  protected TSStatus loadFileV2(PipeTransferFileSealReqV2 req, List<String> 
fileAbsolutePaths)
+  protected TSStatus loadFileV2(
+      final PipeTransferFileSealReqV2 req, final List<String> 
fileAbsolutePaths)
       throws IOException, IllegalPathException {
     return req instanceof PipeTransferTsFileSealWithModReq
         // TsFile's absolute path will be the second element
@@ -270,7 +275,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
         : loadSchemaSnapShot(req.getParameters(), fileAbsolutePaths);
   }
 
-  private TSStatus loadTsFile(String fileAbsolutePath) throws 
FileNotFoundException {
+  private TSStatus loadTsFile(final String fileAbsolutePath) throws 
FileNotFoundException {
     final LoadTsFileStatement statement = new 
LoadTsFileStatement(fileAbsolutePath);
 
     statement.setDeleteAfterLoad(true);
@@ -281,7 +286,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
   }
 
   private TSStatus loadSchemaSnapShot(
-      Map<String, String> parameters, List<String> fileAbsolutePaths)
+      final Map<String, String> parameters, final List<String> 
fileAbsolutePaths)
       throws IllegalPathException, IOException {
     final SRStatementGenerator generator =
         SchemaRegionSnapshotParser.translate2Statements(
@@ -291,19 +296,30 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
     final Set<StatementType> executionTypes =
         PipeSchemaRegionSnapshotEvent.getStatementTypeSet(
             parameters.get(ColumnHeaderConstant.TYPE));
+
+    // Clear to avoid previous exceptions
+    batchVisitor.clear();
     final List<TSStatus> results = new ArrayList<>();
     while (generator.hasNext()) {
-      final Statement statement = generator.next();
-      if (executionTypes.contains(statement.getType())) {
-        // The statements do not contain AlterLogicalViewStatements
-        // Here we apply the statements as many as possible
-        results.add(executeStatementAndClassifyExceptions(statement));
+      final Statement originalStatement = generator.next();
+      if (!executionTypes.contains(originalStatement.getType())) {
+        continue;
       }
+
+      // The statements do not contain AlterLogicalViewStatements
+      // Here we apply the statements as many as possible
+      // Even if there are failed statements
+      batchVisitor
+          .process(originalStatement, null)
+          .ifPresent(statement -> 
results.add(executeStatementAndClassifyExceptions(statement)));
     }
+    batchVisitor.getRemainBatches().stream()
+        .filter(Optional::isPresent)
+        .forEach(statement -> 
results.add(executeStatementAndClassifyExceptions(statement.get())));
     return PipeReceiverStatusHandler.getPriorStatus(results);
   }
 
-  private TPipeTransferResp handleTransferSchemaPlan(PipeTransferPlanNodeReq 
req) {
+  private TPipeTransferResp handleTransferSchemaPlan(final 
PipeTransferPlanNodeReq req) {
     // We may be able to skip the alter logical view's exception parsing 
because
     // the "AlterLogicalViewNode" is itself idempotent
     return req.getPlanNode() instanceof AlterLogicalViewNode
@@ -315,7 +331,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
                 new PipePlanToStatementVisitor().process(req.getPlanNode(), 
null)));
   }
 
-  private TPipeTransferResp handleTransferConfigPlan(TPipeTransferReq req) {
+  private TPipeTransferResp handleTransferConfigPlan(final TPipeTransferReq 
req) {
     return ClusterConfigTaskExecutor.getInstance()
         .handleTransferConfigPlan(getConfigReceiverId(), req);
   }
@@ -333,7 +349,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
     return configReceiverId.get();
   }
 
-  private TSStatus executeStatementAndClassifyExceptions(Statement statement) {
+  private TSStatus executeStatementAndClassifyExceptions(final Statement 
statement) {
     try {
       final TSStatus result = executeStatement(statement);
       if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -346,7 +362,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
             result);
         return statement.accept(statusVisitor, result);
       }
-    } catch (Exception e) {
+    } catch (final Exception e) {
       LOGGER.warn(
           "Receiver id = {}: Exception encountered while executing statement 
{}: ",
           receiverId.get(),
@@ -382,7 +398,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
     if (Objects.nonNull(configReceiverId.get())) {
       try {
         
ClusterConfigTaskExecutor.getInstance().handlePipeConfigClientExit(configReceiverId.get());
-      } catch (Exception e) {
+      } catch (final Exception e) {
         LOGGER.warn("Failed to handle config client (id = {}) exit", 
configReceiverId.get(), e);
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
index 117eae0a689..4412eccf18e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
@@ -46,32 +46,36 @@ import org.apache.iotdb.rpc.TSStatusCode;
  */
 public class PipeStatementTSStatusVisitor extends StatementVisitor<TSStatus, 
TSStatus> {
   @Override
-  public TSStatus visitNode(StatementNode node, TSStatus context) {
+  public TSStatus visitNode(final StatementNode node, final TSStatus context) {
     return context;
   }
 
   @Override
-  public TSStatus visitInsertTablet(InsertTabletStatement 
insertTabletStatement, TSStatus context) {
+  public TSStatus visitInsertTablet(
+      final InsertTabletStatement insertTabletStatement, final TSStatus 
context) {
     return visitInsertBase(insertTabletStatement, context);
   }
 
   @Override
-  public TSStatus visitInsertRow(InsertRowStatement insertRowStatement, 
TSStatus context) {
+  public TSStatus visitInsertRow(
+      final InsertRowStatement insertRowStatement, final TSStatus context) {
     return visitInsertBase(insertRowStatement, context);
   }
 
   @Override
-  public TSStatus visitInsertRows(InsertRowsStatement insertRowsStatement, 
TSStatus context) {
+  public TSStatus visitInsertRows(
+      final InsertRowsStatement insertRowsStatement, final TSStatus context) {
     return visitInsertBase(insertRowsStatement, context);
   }
 
   @Override
   public TSStatus visitInsertMultiTablets(
-      InsertMultiTabletsStatement insertMultiTabletsStatement, TSStatus 
context) {
+      final InsertMultiTabletsStatement insertMultiTabletsStatement, final 
TSStatus context) {
     return visitInsertBase(insertMultiTabletsStatement, context);
   }
 
-  private TSStatus visitInsertBase(InsertBaseStatement insertBaseStatement, 
TSStatus context) {
+  private TSStatus visitInsertBase(
+      final InsertBaseStatement insertBaseStatement, final TSStatus context) {
     if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
           .setMessage(context.getMessage());
@@ -80,17 +84,18 @@ public class PipeStatementTSStatusVisitor extends 
StatementVisitor<TSStatus, TSS
   }
 
   @Override
-  public TSStatus visitCreateTimeseries(CreateTimeSeriesStatement statement, 
TSStatus context) {
+  public TSStatus visitCreateTimeseries(
+      final CreateTimeSeriesStatement statement, final TSStatus context) {
     return visitGeneralCreateTimeSeries(statement, context);
   }
 
   @Override
   public TSStatus visitCreateAlignedTimeseries(
-      CreateAlignedTimeSeriesStatement statement, TSStatus context) {
+      final CreateAlignedTimeSeriesStatement statement, final TSStatus 
context) {
     return visitGeneralCreateTimeSeries(statement, context);
   }
 
-  private TSStatus visitGeneralCreateTimeSeries(Statement statement, TSStatus 
context) {
+  private TSStatus visitGeneralCreateTimeSeries(final Statement statement, 
final TSStatus context) {
     if (context.getCode() == 
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()
         || context.getCode() == 
TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
@@ -106,28 +111,31 @@ public class PipeStatementTSStatusVisitor extends 
StatementVisitor<TSStatus, TSS
 
   @Override
   public TSStatus visitCreateMultiTimeseries(
-      CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, TSStatus 
context) {
+      final CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, 
final TSStatus context) {
     return visitGeneralCreateMultiTimeseries(createMultiTimeSeriesStatement, 
context);
   }
 
   @Override
   public TSStatus visitInternalCreateTimeseries(
-      InternalCreateTimeSeriesStatement internalCreateTimeSeriesStatement, 
TSStatus context) {
+      final InternalCreateTimeSeriesStatement 
internalCreateTimeSeriesStatement,
+      final TSStatus context) {
     return 
visitGeneralCreateMultiTimeseries(internalCreateTimeSeriesStatement, context);
   }
 
   @Override
   public TSStatus visitInternalCreateMultiTimeSeries(
-      InternalCreateMultiTimeSeriesStatement 
internalCreateMultiTimeSeriesStatement,
-      TSStatus context) {
+      final InternalCreateMultiTimeSeriesStatement 
internalCreateMultiTimeSeriesStatement,
+      final TSStatus context) {
     return 
visitGeneralCreateMultiTimeseries(internalCreateMultiTimeSeriesStatement, 
context);
   }
 
-  private TSStatus visitGeneralCreateMultiTimeseries(Statement statement, 
TSStatus context) {
+  private TSStatus visitGeneralCreateMultiTimeseries(
+      final Statement statement, final TSStatus context) {
     if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
-      for (TSStatus status : context.getSubStatus()) {
+      for (final TSStatus status : context.getSubStatus()) {
         if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-            && status.getCode() != 
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
+            && status.getCode() != 
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()
+            && status.getCode() != 
TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) {
           if (status.getCode() == 
TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode()) {
             return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
                 .setMessage(context.getMessage());
@@ -146,7 +154,7 @@ public class PipeStatementTSStatusVisitor extends 
StatementVisitor<TSStatus, TSS
 
   @Override
   public TSStatus visitAlterTimeseries(
-      AlterTimeSeriesStatement alterTimeSeriesStatement, TSStatus context) {
+      final AlterTimeSeriesStatement alterTimeSeriesStatement, final TSStatus 
context) {
     if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) {
       if (context.getMessage().contains("already")) {
         return new TSStatus(
@@ -165,12 +173,12 @@ public class PipeStatementTSStatusVisitor extends 
StatementVisitor<TSStatus, TSS
 
   @Override
   public TSStatus visitCreateLogicalView(
-      CreateLogicalViewStatement createLogicalViewStatement, TSStatus context) 
{
+      final CreateLogicalViewStatement createLogicalViewStatement, final 
TSStatus context) {
     if (context.getCode() == 
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
           .setMessage(context.getMessage());
     } else if (context.getCode() == 
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
-      for (TSStatus status : context.getSubStatus()) {
+      for (final TSStatus status : context.getSubStatus()) {
         if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
             && status.getCode() != 
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
           return visitStatement(createLogicalViewStatement, context);
@@ -184,18 +192,18 @@ public class PipeStatementTSStatusVisitor extends 
StatementVisitor<TSStatus, TSS
 
   @Override
   public TSStatus visitActivateTemplate(
-      ActivateTemplateStatement activateTemplateStatement, TSStatus context) {
+      final ActivateTemplateStatement activateTemplateStatement, final 
TSStatus context) {
     return visitGeneralActivateTemplate(activateTemplateStatement, context);
   }
 
   @Override
   public TSStatus visitBatchActivateTemplate(
-      BatchActivateTemplateStatement batchActivateTemplateStatement, TSStatus 
context) {
+      final BatchActivateTemplateStatement batchActivateTemplateStatement, 
final TSStatus context) {
     return visitGeneralActivateTemplate(batchActivateTemplateStatement, 
context);
   }
 
   private TSStatus visitGeneralActivateTemplate(
-      Statement activateTemplateStatement, TSStatus context) {
+      final Statement activateTemplateStatement, final TSStatus context) {
     if (context.getCode() == TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
           .setMessage(context.getMessage());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementToBatchVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementToBatchVisitor.java
new file mode 100644
index 00000000000..213730a6c90
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementToBatchVisitor.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.receiver.visitor;
+
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.MeasurementGroup;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+import 
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateTimeSeriesStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.ActivateTemplateStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.BatchActivateTemplateStatement;
+
+import org.apache.tsfile.utils.Pair;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * This visitor will convert {@link Statement}s to batch and is stateful, and 
can only be used in a
+ * single thread. It only guarantees the correctness when is used by the 
schema snapshot and shall
+ * not be used for other purposes.
+ *
+ * <p>When the visitor is called, it shall encounter no:
+ *
+ * <p>1. {@link StatementNode}s other than {@link Statement}s
+ *
+ * <p>2. TimeSeries with identical path.
+ *
+ * <p>3. Alter to the timeSeries and templates in batch.
+ */
+public class PipeStatementToBatchVisitor extends 
StatementVisitor<Optional<Statement>, Void> {
+
+  private static final int MAX_SCHEMA_BATCH_SIZE =
+      PipeConfig.getInstance().getPipeSnapshotExecutionMaxBatchSize();
+
+  private final List<CreateTimeSeriesStatement> createTimeSeriesStatements = 
new ArrayList<>();
+
+  private final List<CreateAlignedTimeSeriesStatement> 
createAlignedTimeSeriesStatements =
+      new ArrayList<>();
+
+  private final List<ActivateTemplateStatement> activateTemplateStatements = 
new ArrayList<>();
+
+  @Override
+  public Optional<Statement> visitNode(final StatementNode statement, final 
Void context) {
+    return Optional.of((Statement) statement);
+  }
+
+  @Override
+  public Optional<Statement> visitCreateTimeseries(
+      final CreateTimeSeriesStatement statement, final Void context) {
+    createTimeSeriesStatements.add(statement);
+    return createTimeSeriesStatements.size() + 
createAlignedTimeSeriesStatements.size()
+            >= MAX_SCHEMA_BATCH_SIZE
+        ? Optional.of(getTimeSeriesBatchStatement())
+        : Optional.empty();
+  }
+
+  @Override
+  public Optional<Statement> visitCreateAlignedTimeseries(
+      final CreateAlignedTimeSeriesStatement statement, final Void context) {
+    createAlignedTimeSeriesStatements.add(statement);
+    return createTimeSeriesStatements.size() + 
createAlignedTimeSeriesStatements.size()
+            >= MAX_SCHEMA_BATCH_SIZE
+        ? Optional.of(getTimeSeriesBatchStatement())
+        : Optional.empty();
+  }
+
+  private InternalCreateMultiTimeSeriesStatement getTimeSeriesBatchStatement() 
{
+    final InternalCreateMultiTimeSeriesStatement 
internalCreateMultiTimeSeriesStatement =
+        new InternalCreateMultiTimeSeriesStatement(new HashMap<>());
+    createTimeSeriesStatements.forEach(
+        statement ->
+            addNonAlignedTimeSeriesToBatchStatement(
+                statement, internalCreateMultiTimeSeriesStatement));
+    createAlignedTimeSeriesStatements.forEach(
+        statement ->
+            addAlignedTimeSeriesToBatchStatement(
+                statement, internalCreateMultiTimeSeriesStatement));
+    createTimeSeriesStatements.clear();
+    createAlignedTimeSeriesStatements.clear();
+    return internalCreateMultiTimeSeriesStatement;
+  }
+
+  private void addNonAlignedTimeSeriesToBatchStatement(
+      final CreateTimeSeriesStatement statement,
+      final InternalCreateMultiTimeSeriesStatement 
internalCreateMultiTimeSeriesStatement) {
+    final MeasurementGroup group =
+        internalCreateMultiTimeSeriesStatement
+            .getDeviceMap()
+            .computeIfAbsent(
+                statement.getPath().getDevicePath(),
+                devicePath -> new Pair<>(false, new MeasurementGroup()))
+            .getRight();
+    group.addMeasurement(
+        statement.getPath().getMeasurement(),
+        statement.getDataType(),
+        statement.getEncoding(),
+        statement.getCompressor());
+    group.addAttributes(statement.getAttributes());
+    group.addTags(statement.getTags());
+    group.addProps(statement.getProps());
+    group.addAlias(statement.getAlias());
+  }
+
+  private void addAlignedTimeSeriesToBatchStatement(
+      final CreateAlignedTimeSeriesStatement statement,
+      final InternalCreateMultiTimeSeriesStatement 
internalCreateMultiTimeSeriesStatement) {
+    final MeasurementGroup group =
+        internalCreateMultiTimeSeriesStatement
+            .getDeviceMap()
+            .computeIfAbsent(
+                statement.getDevicePath(), devicePath -> new Pair<>(true, new 
MeasurementGroup()))
+            .getRight();
+    for (int i = 0; i < statement.getMeasurements().size(); ++i) {
+      group.addMeasurement(
+          statement.getMeasurements().get(i),
+          statement.getDataTypes().get(i),
+          statement.getEncodings().get(i),
+          statement.getCompressors().get(i));
+      group.addProps(new HashMap<>());
+    }
+    statement.getTagsList().forEach(group::addTags);
+    statement.getAttributesList().forEach(group::addAttributes);
+    statement.getAliasList().forEach(group::addAlias);
+  }
+
+  @Override
+  public Optional<Statement> visitActivateTemplate(
+      final ActivateTemplateStatement activateTemplateStatement, final Void 
context) {
+    activateTemplateStatements.add(activateTemplateStatement);
+    return activateTemplateStatements.size() >= MAX_SCHEMA_BATCH_SIZE
+        ? Optional.of(getTemplateBatchStatement())
+        : Optional.empty();
+  }
+
+  private BatchActivateTemplateStatement getTemplateBatchStatement() {
+    final BatchActivateTemplateStatement batchActivateTemplateStatement =
+        new BatchActivateTemplateStatement(
+            activateTemplateStatements.stream()
+                .map(ActivateTemplateStatement::getPath)
+                .collect(Collectors.toList()));
+    activateTemplateStatements.clear();
+    return batchActivateTemplateStatement;
+  }
+
+  public List<Optional<Statement>> getRemainBatches() {
+    return Arrays.asList(
+        !(createTimeSeriesStatements.isEmpty() && 
createAlignedTimeSeriesStatements.isEmpty())
+            ? Optional.of(getTimeSeriesBatchStatement())
+            : Optional.empty(),
+        !activateTemplateStatements.isEmpty()
+            ? Optional.of(getTemplateBatchStatement())
+            : Optional.empty());
+  }
+
+  public void clear() {
+    createTimeSeriesStatements.clear();
+    createAlignedTimeSeriesStatements.clear();
+    activateTemplateStatements.clear();
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 29ce653606c..5057ea05b98 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -227,6 +227,7 @@ public class CommonConfig {
   private long pipeMemoryExpanderIntervalSeconds = (long) 3 * 60; // 3Min
   private float pipeLeaderCacheMemoryUsagePercentage = 0.1F;
   private long pipeListeningQueueTransferSnapshotThreshold = 1000;
+  private int pipeSnapshotExecutionMaxBatchSize = 1000;
 
   private int subscriptionSubtaskExecutorMaxThreadNum =
       Math.min(5, Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
@@ -961,6 +962,14 @@ public class CommonConfig {
     this.pipeListeningQueueTransferSnapshotThreshold = 
pipeListeningQueueTransferSnapshotThreshold;
   }
 
+  public int getPipeSnapshotExecutionMaxBatchSize() {
+    return pipeSnapshotExecutionMaxBatchSize;
+  }
+
+  public void setPipeSnapshotExecutionMaxBatchSize(int 
pipeSnapshotExecutionMaxBatchSize) {
+    this.pipeSnapshotExecutionMaxBatchSize = pipeSnapshotExecutionMaxBatchSize;
+  }
+
   public int getSubscriptionSubtaskExecutorMaxThreadNum() {
     return subscriptionSubtaskExecutorMaxThreadNum;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 4e347ccd41a..f64a79e36f4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -516,6 +516,11 @@ public class CommonDescriptor {
             properties.getProperty(
                 "pipe_listening_queue_transfer_snapshot_threshold",
                 
String.valueOf(config.getPipeListeningQueueTransferSnapshotThreshold()))));
+    config.setPipeSnapshotExecutionMaxBatchSize(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_snapshot_execution_max_batch_size",
+                
String.valueOf(config.getPipeSnapshotExecutionMaxBatchSize()))));
   }
 
   private void loadSubscriptionProps(Properties properties) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 88405d64c93..6fab25c9b70 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -135,6 +135,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeListeningQueueTransferSnapshotThreshold();
   }
 
+  public int getPipeSnapshotExecutionMaxBatchSize() {
+    return COMMON_CONFIG.getPipeSnapshotExecutionMaxBatchSize();
+  }
+
   /////////////////////////////// Meta Consistency 
///////////////////////////////
 
   public boolean isSeperatedPipeHeartbeatEnabled() {
@@ -291,6 +295,7 @@ public class PipeConfig {
     LOGGER.info(
         "PipeListeningQueueTransferSnapshotThreshold: {}",
         getPipeListeningQueueTransferSnapshotThreshold());
+    LOGGER.info("PipeSnapshotExecutionMaxBatchSize: {}", 
getPipeSnapshotExecutionMaxBatchSize());
 
     LOGGER.info("PipeAsyncConnectorSelectorNumber: {}", 
getPipeAsyncConnectorSelectorNumber());
     LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}", 
getPipeAsyncConnectorMaxClientNumber());


Reply via email to