This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 502f54a5beb Pipe: Added the CreateMultiTimeSeries with merge logic and 
disabled timeSeries existence check at SchemaExecutionVisitor for plans 
transferred by pipe (#14317)
502f54a5beb is described below

commit 502f54a5beba7040809777940a4f92313cd25be6
Author: Caideyipi <[email protected]>
AuthorDate: Thu Dec 5 12:08:11 2024 +0800

    Pipe: Added the CreateMultiTimeSeries with merge logic and disabled 
timeSeries existence check at SchemaExecutionVisitor for plans transferred by 
pipe (#14317)
---
 .../schemaregion/SchemaExecutionVisitor.java       | 102 ++++----
 .../execution/executor/RegionWriteExecutor.java    | 268 ++++++++++-----------
 2 files changed, 187 insertions(+), 183 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
index c816f8760fa..cfcbfb07e1b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
@@ -148,9 +148,11 @@ public class SchemaExecutionVisitor extends 
PlanVisitor<TSStatus, ISchemaRegion>
       // todo implement batch creation of one device in SchemaRegion
       for (int i = 0; i < size; i++) {
         try {
-          schemaRegion.createTimeSeries(
-              transformToCreateTimeSeriesPlan(devicePath, measurementGroup, 
i), -1);
-        } catch (MetadataException e) {
+          final ICreateTimeSeriesPlan createTimeSeriesPlan =
+              transformToCreateTimeSeriesPlan(devicePath, measurementGroup, i);
+          ((CreateTimeSeriesPlanImpl) 
createTimeSeriesPlan).setWithMerge(node.isGeneratedByPipe());
+          schemaRegion.createTimeSeries(createTimeSeriesPlan, -1);
+        } catch (final MetadataException e) {
           logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, 
e);
           failingStatus.add(RpcUtils.getStatus(e.getErrorCode(), 
e.getMessage()));
         }
@@ -196,7 +198,7 @@ public class SchemaExecutionVisitor extends 
PlanVisitor<TSStatus, ISchemaRegion>
           schemaRegion,
           alreadyExistingTimeSeries,
           failingStatus,
-          false);
+          node.isGeneratedByPipe());
     } else {
       executeInternalCreateTimeSeries(
           devicePath,
@@ -204,7 +206,7 @@ public class SchemaExecutionVisitor extends 
PlanVisitor<TSStatus, ISchemaRegion>
           schemaRegion,
           alreadyExistingTimeSeries,
           failingStatus,
-          false);
+          node.isGeneratedByPipe());
     }
 
     if (!failingStatus.isEmpty()) {
@@ -389,7 +391,8 @@ public class SchemaExecutionVisitor extends 
PlanVisitor<TSStatus, ISchemaRegion>
   }
 
   @Override
-  public TSStatus visitAlterTimeSeries(AlterTimeSeriesNode node, ISchemaRegion 
schemaRegion) {
+  public TSStatus visitAlterTimeSeries(
+      final AlterTimeSeriesNode node, final ISchemaRegion schemaRegion) {
     try {
       switch (node.getAlterType()) {
         case RENAME:
@@ -425,12 +428,14 @@ public class SchemaExecutionVisitor extends 
PlanVisitor<TSStatus, ISchemaRegion>
   }
 
   @Override
-  public TSStatus visitActivateTemplate(ActivateTemplateNode node, 
ISchemaRegion schemaRegion) {
+  public TSStatus visitActivateTemplate(
+      final ActivateTemplateNode node, final ISchemaRegion schemaRegion) {
     try {
-      Template template = 
ClusterTemplateManager.getInstance().getTemplate(node.getTemplateId());
+      final Template template =
+          
ClusterTemplateManager.getInstance().getTemplate(node.getTemplateId());
       schemaRegion.activateSchemaTemplate(node, template);
       return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
-    } catch (MetadataException e) {
+    } catch (final MetadataException e) {
       logger.error(e.getMessage(), e);
       return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
     }
@@ -438,16 +443,17 @@ public class SchemaExecutionVisitor extends 
PlanVisitor<TSStatus, ISchemaRegion>
 
   @Override
   public TSStatus visitBatchActivateTemplate(
-      BatchActivateTemplateNode node, ISchemaRegion schemaRegion) {
-    for (Map.Entry<PartialPath, Pair<Integer, Integer>> entry :
+      final BatchActivateTemplateNode node, final ISchemaRegion schemaRegion) {
+    for (final Map.Entry<PartialPath, Pair<Integer, Integer>> entry :
         node.getTemplateActivationMap().entrySet()) {
-      Template template = 
ClusterTemplateManager.getInstance().getTemplate(entry.getValue().left);
+      final Template template =
+          
ClusterTemplateManager.getInstance().getTemplate(entry.getValue().left);
       try {
         schemaRegion.activateSchemaTemplate(
             SchemaRegionWritePlanFactory.getActivateTemplateInClusterPlan(
                 entry.getKey(), entry.getValue().right, entry.getValue().left),
             template);
-      } catch (MetadataException e) {
+      } catch (final MetadataException e) {
         logger.error(e.getMessage(), e);
         return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
       }
@@ -457,21 +463,22 @@ public class SchemaExecutionVisitor extends 
PlanVisitor<TSStatus, ISchemaRegion>
 
   @Override
   public TSStatus visitInternalBatchActivateTemplate(
-      InternalBatchActivateTemplateNode node, ISchemaRegion schemaRegion) {
-    for (Map.Entry<PartialPath, Pair<Integer, Integer>> entry :
+      final InternalBatchActivateTemplateNode node, final ISchemaRegion 
schemaRegion) {
+    for (final Map.Entry<PartialPath, Pair<Integer, Integer>> entry :
         node.getTemplateActivationMap().entrySet()) {
-      Template template = 
ClusterTemplateManager.getInstance().getTemplate(entry.getValue().left);
+      final Template template =
+          
ClusterTemplateManager.getInstance().getTemplate(entry.getValue().left);
       try {
         schemaRegion.activateSchemaTemplate(
             SchemaRegionWritePlanFactory.getActivateTemplateInClusterPlan(
                 entry.getKey(), entry.getValue().right, entry.getValue().left),
             template);
-      } catch (TemplateIsInUseException e) {
+      } catch (final TemplateIsInUseException e) {
         logger.info(
             String.format(
                 "Device Template has already been activated on path %s, 
there's no need to activate again.",
                 entry.getKey()));
-      } catch (MetadataException e) {
+      } catch (final MetadataException e) {
         logger.error(e.getMessage(), e);
         return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
       }
@@ -498,22 +505,23 @@ public class SchemaExecutionVisitor extends 
PlanVisitor<TSStatus, ISchemaRegion>
 
   @Override
   public TSStatus visitRollbackSchemaBlackList(
-      RollbackSchemaBlackListNode node, ISchemaRegion schemaRegion) {
+      final RollbackSchemaBlackListNode node, final ISchemaRegion 
schemaRegion) {
     try {
       schemaRegion.rollbackSchemaBlackList(node.getPatternTree());
       return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
-    } catch (MetadataException e) {
+    } catch (final MetadataException e) {
       logger.error(e.getMessage(), e);
       return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
     }
   }
 
   @Override
-  public TSStatus visitDeleteTimeseries(DeleteTimeSeriesNode node, 
ISchemaRegion schemaRegion) {
+  public TSStatus visitDeleteTimeseries(
+      final DeleteTimeSeriesNode node, final ISchemaRegion schemaRegion) {
     try {
       schemaRegion.deleteTimeseriesInBlackList(node.getPatternTree());
       return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
-    } catch (MetadataException e) {
+    } catch (final MetadataException e) {
       logger.error(e.getMessage(), e);
       return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
     }
@@ -521,11 +529,12 @@ public class SchemaExecutionVisitor extends 
PlanVisitor<TSStatus, ISchemaRegion>
 
   @Override
   public TSStatus visitPreDeactivateTemplate(
-      PreDeactivateTemplateNode node, ISchemaRegion schemaRegion) {
+      final PreDeactivateTemplateNode node, final ISchemaRegion schemaRegion) {
     try {
-      long preDeactivateNum = 
schemaRegion.constructSchemaBlackListWithTemplate(node);
-      return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, 
String.valueOf(preDeactivateNum));
-    } catch (MetadataException e) {
+      return RpcUtils.getStatus(
+          TSStatusCode.SUCCESS_STATUS,
+          
String.valueOf(schemaRegion.constructSchemaBlackListWithTemplate(node)));
+    } catch (final MetadataException e) {
       logger.error(e.getMessage(), e);
       return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
     }
@@ -533,18 +542,19 @@ public class SchemaExecutionVisitor extends 
PlanVisitor<TSStatus, ISchemaRegion>
 
   @Override
   public TSStatus visitRollbackPreDeactivateTemplate(
-      RollbackPreDeactivateTemplateNode node, ISchemaRegion schemaRegion) {
+      final RollbackPreDeactivateTemplateNode node, final ISchemaRegion 
schemaRegion) {
     try {
       schemaRegion.rollbackSchemaBlackListWithTemplate(node);
       return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
-    } catch (MetadataException e) {
+    } catch (final MetadataException e) {
       logger.error(e.getMessage(), e);
       return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
     }
   }
 
   @Override
-  public TSStatus visitDeactivateTemplate(DeactivateTemplateNode node, 
ISchemaRegion schemaRegion) {
+  public TSStatus visitDeactivateTemplate(
+      final DeactivateTemplateNode node, final ISchemaRegion schemaRegion) {
     try {
       schemaRegion.deactivateTemplateInBlackList(node);
       return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
@@ -555,15 +565,17 @@ public class SchemaExecutionVisitor extends 
PlanVisitor<TSStatus, ISchemaRegion>
   }
 
   @Override
-  public TSStatus visitCreateLogicalView(CreateLogicalViewNode node, 
ISchemaRegion schemaRegion) {
-    Map<PartialPath, ViewExpression> viewPathToSourceMap = 
node.getViewPathToSourceExpressionMap();
-    List<TSStatus> failingStatus = new ArrayList<>();
-    for (Map.Entry<PartialPath, ViewExpression> entry : 
viewPathToSourceMap.entrySet()) {
+  public TSStatus visitCreateLogicalView(
+      final CreateLogicalViewNode node, final ISchemaRegion schemaRegion) {
+    final Map<PartialPath, ViewExpression> viewPathToSourceMap =
+        node.getViewPathToSourceExpressionMap();
+    final List<TSStatus> failingStatus = new ArrayList<>();
+    for (final Map.Entry<PartialPath, ViewExpression> entry : 
viewPathToSourceMap.entrySet()) {
       try {
         schemaRegion.createLogicalView(
             SchemaRegionWritePlanFactory.getCreateLogicalViewPlan(
                 entry.getKey(), entry.getValue()));
-      } catch (MetadataException e) {
+      } catch (final MetadataException e) {
         logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
         failingStatus.add(RpcUtils.getStatus(e.getErrorCode(), 
e.getMessage()));
       }
@@ -575,14 +587,15 @@ public class SchemaExecutionVisitor extends 
PlanVisitor<TSStatus, ISchemaRegion>
   }
 
   @Override
-  public TSStatus visitAlterLogicalView(AlterLogicalViewNode node, 
ISchemaRegion schemaRegion) {
-    Map<PartialPath, ViewExpression> viewPathToSourceMap = 
node.getViewPathToSourceMap();
-    List<TSStatus> failingStatus = new ArrayList<>();
-    for (Map.Entry<PartialPath, ViewExpression> entry : 
viewPathToSourceMap.entrySet()) {
+  public TSStatus visitAlterLogicalView(
+      final AlterLogicalViewNode node, final ISchemaRegion schemaRegion) {
+    final Map<PartialPath, ViewExpression> viewPathToSourceMap = 
node.getViewPathToSourceMap();
+    final List<TSStatus> failingStatus = new ArrayList<>();
+    for (final Map.Entry<PartialPath, ViewExpression> entry : 
viewPathToSourceMap.entrySet()) {
       try {
         schemaRegion.alterLogicalView(
             
SchemaRegionWritePlanFactory.getAlterLogicalViewPlan(entry.getKey(), 
entry.getValue()));
-      } catch (MetadataException e) {
+      } catch (final MetadataException e) {
         logger.warn("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
         failingStatus.add(RpcUtils.getStatus(e.getErrorCode(), 
e.getMessage()));
       }
@@ -595,10 +608,11 @@ public class SchemaExecutionVisitor extends 
PlanVisitor<TSStatus, ISchemaRegion>
 
   @Override
   public TSStatus visitConstructLogicalViewBlackList(
-      ConstructLogicalViewBlackListNode node, ISchemaRegion schemaRegion) {
+      final ConstructLogicalViewBlackListNode node, final ISchemaRegion 
schemaRegion) {
     try {
-      long preDeletedNum = 
schemaRegion.constructLogicalViewBlackList(node.getPatternTree());
-      return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, 
String.valueOf(preDeletedNum));
+      return RpcUtils.getStatus(
+          TSStatusCode.SUCCESS_STATUS,
+          
String.valueOf(schemaRegion.constructLogicalViewBlackList(node.getPatternTree())));
     } catch (MetadataException e) {
       logger.error(e.getMessage(), e);
       return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
@@ -607,11 +621,11 @@ public class SchemaExecutionVisitor extends 
PlanVisitor<TSStatus, ISchemaRegion>
 
   @Override
   public TSStatus visitRollbackLogicalViewBlackList(
-      RollbackLogicalViewBlackListNode node, ISchemaRegion schemaRegion) {
+      final RollbackLogicalViewBlackListNode node, final ISchemaRegion 
schemaRegion) {
     try {
       schemaRegion.rollbackLogicalViewBlackList(node.getPatternTree());
       return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
-    } catch (MetadataException e) {
+    } catch (final MetadataException e) {
       logger.error(e.getMessage(), e);
       return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
index cbfe1682e19..cef49d96c3f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
@@ -162,7 +162,8 @@ public class RegionWriteExecutor {
       extends PlanVisitor<RegionExecutionResult, 
WritePlanNodeExecutionContext> {
 
     @Override
-    public RegionExecutionResult visitPlan(PlanNode node, 
WritePlanNodeExecutionContext context) {
+    public RegionExecutionResult visitPlan(
+        final PlanNode node, final WritePlanNodeExecutionContext context) {
 
       if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
         return RegionExecutionResult.create(
@@ -174,12 +175,12 @@ public class RegionWriteExecutor {
       }
 
       try {
-        TSStatus status = 
executePlanNodeInConsensusLayer(context.getRegionId(), node);
+        final TSStatus status = 
executePlanNodeInConsensusLayer(context.getRegionId(), node);
         return RegionExecutionResult.create(
             TSStatusCode.SUCCESS_STATUS.getStatusCode() == status.getCode(),
             status.getMessage(),
             status);
-      } catch (ConsensusException e) {
+      } catch (final ConsensusException e) {
         LOGGER.warn("Failed in the write API executing the consensus layer due 
to: ", e);
         TSStatus status = 
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
         if (e instanceof ConsensusGroupNotExistException) {
@@ -189,8 +190,8 @@ public class RegionWriteExecutor {
       }
     }
 
-    private TSStatus executePlanNodeInConsensusLayer(ConsensusGroupId groupId, 
PlanNode planNode)
-        throws ConsensusException {
+    private TSStatus executePlanNodeInConsensusLayer(
+        final ConsensusGroupId groupId, final PlanNode planNode) throws 
ConsensusException {
       if (groupId instanceof DataRegionId) {
         return dataRegionConsensus.write(groupId, planNode);
       } else {
@@ -200,64 +201,65 @@ public class RegionWriteExecutor {
 
     @Override
     public RegionExecutionResult visitInsertRow(
-        InsertRowNode node, WritePlanNodeExecutionContext context) {
+        final InsertRowNode node, final WritePlanNodeExecutionContext context) 
{
       return executeDataInsert(node, context);
     }
 
     @Override
     public RegionExecutionResult visitInsertTablet(
-        InsertTabletNode node, WritePlanNodeExecutionContext context) {
+        final InsertTabletNode node, final WritePlanNodeExecutionContext 
context) {
       return executeDataInsert(node, context);
     }
 
     @Override
     public RegionExecutionResult visitRelationalInsertTablet(
-        RelationalInsertTabletNode node, WritePlanNodeExecutionContext 
context) {
+        final RelationalInsertTabletNode node, final 
WritePlanNodeExecutionContext context) {
       return executeDataInsert(node, context);
     }
 
     @Override
     public RegionExecutionResult visitInsertRows(
-        InsertRowsNode node, WritePlanNodeExecutionContext context) {
+        final InsertRowsNode node, final WritePlanNodeExecutionContext 
context) {
       return executeDataInsert(node, context);
     }
 
     @Override
     public RegionExecutionResult visitInsertMultiTablets(
-        InsertMultiTabletsNode node, WritePlanNodeExecutionContext context) {
+        final InsertMultiTabletsNode node, final WritePlanNodeExecutionContext 
context) {
       return executeDataInsert(node, context);
     }
 
     @Override
     public RegionExecutionResult visitInsertRowsOfOneDevice(
-        InsertRowsOfOneDeviceNode node, WritePlanNodeExecutionContext context) 
{
+        final InsertRowsOfOneDeviceNode node, final 
WritePlanNodeExecutionContext context) {
       return executeDataInsert(node, context);
     }
 
     @Override
     public RegionExecutionResult visitPipeEnrichedInsertNode(
-        PipeEnrichedInsertNode node, WritePlanNodeExecutionContext context) {
+        final PipeEnrichedInsertNode node, final WritePlanNodeExecutionContext 
context) {
       return executeDataInsert(node, context);
     }
 
     private RegionExecutionResult executeDataInsert(
-        InsertNode insertNode, WritePlanNodeExecutionContext context) {
+        final InsertNode insertNode, final WritePlanNodeExecutionContext 
context) {
       if (context.getRegionWriteValidationRWLock() == null) {
-        String message = "Failed to get the lock of the region because the 
region is not existed.";
+        final String message =
+            "Failed to get the lock of the region because the region is not 
existed.";
         return RegionExecutionResult.create(
             false, message, 
RpcUtils.getStatus(TSStatusCode.NO_AVAILABLE_REGION_GROUP, message));
       }
 
       context.getRegionWriteValidationRWLock().readLock().lock();
       try {
-        TSStatus status = fireTriggerAndInsert(context.getRegionId(), 
insertNode);
+        final TSStatus status = fireTriggerAndInsert(context.getRegionId(), 
insertNode);
         return RegionExecutionResult.create(
             TSStatusCode.SUCCESS_STATUS.getStatusCode() == status.getCode(),
             status.message,
             status);
       } catch (ConsensusException e) {
         LOGGER.warn("Failed in the write API executing the consensus layer due 
to: ", e);
-        TSStatus status = RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_ERROR, 
e.toString());
+        final TSStatus status = 
RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_ERROR, e.toString());
         if (e instanceof ConsensusGroupNotExistException) {
           
status.setCode(TSStatusCode.NO_AVAILABLE_REGION_GROUP.getStatusCode());
         }
@@ -267,8 +269,8 @@ public class RegionWriteExecutor {
       }
     }
 
-    private TSStatus fireTriggerAndInsert(ConsensusGroupId groupId, InsertNode 
insertNode)
-        throws ConsensusException {
+    private TSStatus fireTriggerAndInsert(
+        final ConsensusGroupId groupId, final InsertNode insertNode) throws 
ConsensusException {
       long triggerCostTime = 0;
       TSStatus status;
       long startTime = System.nanoTime();
@@ -281,7 +283,7 @@ public class RegionWriteExecutor {
                 TSStatusCode.TRIGGER_FIRE_ERROR.getStatusCode(),
                 "Failed to complete the insertion because trigger error before 
the insertion.");
       } else {
-        long startWriteTime = System.nanoTime();
+        final long startWriteTime = System.nanoTime();
         status = dataRegionConsensus.write(groupId, insertNode);
         
PERFORMANCE_OVERVIEW_METRICS.recordScheduleStorageCost(System.nanoTime() - 
startWriteTime);
 
@@ -304,7 +306,7 @@ public class RegionWriteExecutor {
 
     @Override
     public RegionExecutionResult visitPipeEnrichedDeleteDataNode(
-        PipeEnrichedDeleteDataNode node, WritePlanNodeExecutionContext 
context) {
+        final PipeEnrichedDeleteDataNode node, final 
WritePlanNodeExecutionContext context) {
       // data deletion should block data insertion, especially when executed 
for deleting timeseries
       context.getRegionWriteValidationRWLock().writeLock().lock();
       try {
@@ -316,7 +318,7 @@ public class RegionWriteExecutor {
 
     @Override
     public RegionExecutionResult visitDeleteData(
-        DeleteDataNode node, WritePlanNodeExecutionContext context) {
+        final DeleteDataNode node, final WritePlanNodeExecutionContext 
context) {
       // data deletion don't need to block data insertion, but there are some 
creation operation
       // require write lock on data region.
       context.getRegionWriteValidationRWLock().writeLock().lock();
@@ -329,7 +331,7 @@ public class RegionWriteExecutor {
 
     @Override
     public RegionExecutionResult visitDeleteData(
-        RelationalDeleteDataNode node, WritePlanNodeExecutionContext context) {
+        final RelationalDeleteDataNode node, final 
WritePlanNodeExecutionContext context) {
       // data deletion don't need to block data insertion, but there are some 
creation operation
       // require write lock on data region.
       context.getRegionWriteValidationRWLock().writeLock().lock();
@@ -342,35 +344,34 @@ public class RegionWriteExecutor {
 
     @Override
     public RegionExecutionResult visitCreateTimeSeries(
-        CreateTimeSeriesNode node, WritePlanNodeExecutionContext context) {
+        final CreateTimeSeriesNode node, final WritePlanNodeExecutionContext 
context) {
       return executeCreateTimeSeries(node, context, false);
     }
 
     private RegionExecutionResult executeCreateTimeSeries(
-        CreateTimeSeriesNode node,
-        WritePlanNodeExecutionContext context,
-        boolean receivedFromPipe) {
-      ISchemaRegion schemaRegion =
+        final CreateTimeSeriesNode node,
+        final WritePlanNodeExecutionContext context,
+        final boolean receivedFromPipe) {
+      final ISchemaRegion schemaRegion =
           schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId());
-      RegionExecutionResult result =
+      final RegionExecutionResult result =
           checkQuotaBeforeCreatingTimeSeries(schemaRegion, 
node.getPath().getDevicePath(), 1);
       if (result != null) {
         return result;
       }
-      if 
(CONFIG.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS))
 {
+      if 
(CONFIG.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)
+          && !receivedFromPipe) {
         context.getRegionWriteValidationRWLock().writeLock().lock();
         try {
-          Map<Integer, MetadataException> failingMeasurementMap =
+          final Map<Integer, MetadataException> failingMeasurementMap =
               schemaRegion.checkMeasurementExistence(
                   node.getPath().getDevicePath(),
                   Collections.singletonList(node.getPath().getMeasurement()),
                   Collections.singletonList(node.getAlias()));
           if (failingMeasurementMap.isEmpty()) {
-            return receivedFromPipe
-                ? super.visitPipeEnrichedWritePlanNode(new 
PipeEnrichedWritePlanNode(node), context)
-                : super.visitCreateTimeSeries(node, context);
+            return super.visitCreateTimeSeries(node, context);
           } else {
-            MetadataException metadataException = failingMeasurementMap.get(0);
+            final MetadataException metadataException = 
failingMeasurementMap.get(0);
             LOGGER.warn(METADATA_ERROR_MSG, metadataException);
             return RegionExecutionResult.create(
                 false,
@@ -390,34 +391,34 @@ public class RegionWriteExecutor {
 
     @Override
     public RegionExecutionResult visitCreateAlignedTimeSeries(
-        CreateAlignedTimeSeriesNode node, WritePlanNodeExecutionContext 
context) {
+        final CreateAlignedTimeSeriesNode node, final 
WritePlanNodeExecutionContext context) {
       return executeCreateAlignedTimeSeries(node, context, false);
     }
 
     private RegionExecutionResult executeCreateAlignedTimeSeries(
-        CreateAlignedTimeSeriesNode node,
-        WritePlanNodeExecutionContext context,
-        boolean receivedFromPipe) {
-      ISchemaRegion schemaRegion =
+        final CreateAlignedTimeSeriesNode node,
+        final WritePlanNodeExecutionContext context,
+        final boolean receivedFromPipe) {
+      final ISchemaRegion schemaRegion =
           schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId());
-      RegionExecutionResult result =
+      final RegionExecutionResult result =
           checkQuotaBeforeCreatingTimeSeries(
               schemaRegion, node.getDevicePath(), 
node.getMeasurements().size());
       if (result != null) {
         return result;
       }
-      if 
(CONFIG.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS))
 {
+      if 
(CONFIG.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)
+          && !receivedFromPipe) {
         context.getRegionWriteValidationRWLock().writeLock().lock();
         try {
-          Map<Integer, MetadataException> failingMeasurementMap =
+          final Map<Integer, MetadataException> failingMeasurementMap =
               schemaRegion.checkMeasurementExistence(
                   node.getDevicePath(), node.getMeasurements(), 
node.getAliasList());
           if (failingMeasurementMap.isEmpty()) {
-            return receivedFromPipe
-                ? super.visitPipeEnrichedWritePlanNode(new 
PipeEnrichedWritePlanNode(node), context)
-                : super.visitCreateAlignedTimeSeries(node, context);
+            return super.visitCreateAlignedTimeSeries(node, context);
           } else {
-            MetadataException metadataException = 
failingMeasurementMap.values().iterator().next();
+            final MetadataException metadataException =
+                failingMeasurementMap.values().iterator().next();
             LOGGER.warn(METADATA_ERROR_MSG, metadataException);
             return RegionExecutionResult.create(
                 false,
@@ -437,18 +438,18 @@ public class RegionWriteExecutor {
 
     @Override
     public RegionExecutionResult visitCreateMultiTimeSeries(
-        CreateMultiTimeSeriesNode node, WritePlanNodeExecutionContext context) 
{
+        final CreateMultiTimeSeriesNode node, final 
WritePlanNodeExecutionContext context) {
       return executeCreateMultiTimeSeries(node, context, false);
     }
 
     private RegionExecutionResult executeCreateMultiTimeSeries(
-        CreateMultiTimeSeriesNode node,
-        WritePlanNodeExecutionContext context,
-        boolean receivedFromPipe) {
-      ISchemaRegion schemaRegion =
+        final CreateMultiTimeSeriesNode node,
+        final WritePlanNodeExecutionContext context,
+        final boolean receivedFromPipe) {
+      final ISchemaRegion schemaRegion =
           schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId());
       RegionExecutionResult result;
-      for (Map.Entry<PartialPath, MeasurementGroup> entry :
+      for (final Map.Entry<PartialPath, MeasurementGroup> entry :
           node.getMeasurementGroupMap().entrySet()) {
         result =
             checkQuotaBeforeCreatingTimeSeries(
@@ -457,29 +458,30 @@ public class RegionWriteExecutor {
           return result;
         }
       }
-      if 
(CONFIG.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS))
 {
+      if 
(CONFIG.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)
+          && !receivedFromPipe) {
         context.getRegionWriteValidationRWLock().writeLock().lock();
         try {
-          List<TSStatus> failingStatus = new ArrayList<>();
-          Map<PartialPath, MeasurementGroup> measurementGroupMap = 
node.getMeasurementGroupMap();
-          List<PartialPath> emptyDeviceList = new ArrayList<>();
+          final List<TSStatus> failingStatus = new ArrayList<>();
+          final Map<PartialPath, MeasurementGroup> measurementGroupMap =
+              node.getMeasurementGroupMap();
+          final List<PartialPath> emptyDeviceList = new ArrayList<>();
 
           checkMeasurementExistence(
               measurementGroupMap, schemaRegion, failingStatus, 
emptyDeviceList);
 
-          for (PartialPath emptyDevice : emptyDeviceList) {
+          for (final PartialPath emptyDevice : emptyDeviceList) {
             measurementGroupMap.remove(emptyDevice);
           }
 
-          RegionExecutionResult failingResult =
-              registerTimeSeries(
-                  measurementGroupMap, node, context, failingStatus, 
receivedFromPipe);
+          final RegionExecutionResult failingResult =
+              registerTimeSeries(measurementGroupMap, node, context, 
failingStatus);
 
           if (failingResult != null) {
             return failingResult;
           }
 
-          TSStatus status = RpcUtils.getStatus(failingStatus);
+          final TSStatus status = RpcUtils.getStatus(failingStatus);
           return RegionExecutionResult.create(false, status.getMessage(), 
status);
         } finally {
           context.getRegionWriteValidationRWLock().writeLock().unlock();
@@ -492,12 +494,12 @@ public class RegionWriteExecutor {
     }
 
     private void checkMeasurementExistence(
-        Map<PartialPath, MeasurementGroup> measurementGroupMap,
-        ISchemaRegion schemaRegion,
-        List<TSStatus> failingStatus,
-        List<PartialPath> emptyDeviceList) {
-      for (Map.Entry<PartialPath, MeasurementGroup> entry : 
measurementGroupMap.entrySet()) {
-        Map<Integer, MetadataException> failingMeasurementMap =
+        final Map<PartialPath, MeasurementGroup> measurementGroupMap,
+        final ISchemaRegion schemaRegion,
+        final List<TSStatus> failingStatus,
+        final List<PartialPath> emptyDeviceList) {
+      for (final Map.Entry<PartialPath, MeasurementGroup> entry : 
measurementGroupMap.entrySet()) {
+        final Map<Integer, MetadataException> failingMeasurementMap =
             schemaRegion.checkMeasurementExistence(
                 entry.getKey(),
                 entry.getValue().getMeasurements(),
@@ -506,7 +508,7 @@ public class RegionWriteExecutor {
           continue;
         }
 
-        for (Map.Entry<Integer, MetadataException> failingMeasurement :
+        for (final Map.Entry<Integer, MetadataException> failingMeasurement :
             failingMeasurementMap.entrySet()) {
           LOGGER.warn(METADATA_ERROR_MSG, failingMeasurement.getValue());
           failingStatus.add(
@@ -523,22 +525,19 @@ public class RegionWriteExecutor {
     }
 
     private RegionExecutionResult registerTimeSeries(
-        Map<PartialPath, MeasurementGroup> measurementGroupMap,
-        CreateMultiTimeSeriesNode node,
-        WritePlanNodeExecutionContext context,
-        List<TSStatus> failingStatus,
-        boolean receivedFromPipe) {
+        final Map<PartialPath, MeasurementGroup> measurementGroupMap,
+        final CreateMultiTimeSeriesNode node,
+        final WritePlanNodeExecutionContext context,
+        final List<TSStatus> failingStatus) {
       if (!measurementGroupMap.isEmpty()) {
         // try registering the rest timeseries
-        RegionExecutionResult executionResult =
-            receivedFromPipe
-                ? super.visitPipeEnrichedWritePlanNode(new 
PipeEnrichedWritePlanNode(node), context)
-                : super.visitCreateMultiTimeSeries(node, context);
+        final RegionExecutionResult executionResult =
+            super.visitCreateMultiTimeSeries(node, context);
         if (failingStatus.isEmpty()) {
           return executionResult;
         }
 
-        TSStatus executionStatus = executionResult.getStatus();
+        final TSStatus executionStatus = executionResult.getStatus();
         if (executionStatus.getCode() == 
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
           failingStatus.addAll(executionStatus.getSubStatus());
         } else if (executionStatus.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -550,36 +549,37 @@ public class RegionWriteExecutor {
 
     @Override
     public RegionExecutionResult visitInternalCreateTimeSeries(
-        InternalCreateTimeSeriesNode node, WritePlanNodeExecutionContext 
context) {
+        final InternalCreateTimeSeriesNode node, final 
WritePlanNodeExecutionContext context) {
       return executeInternalCreateTimeSeries(node, context, false);
     }
 
     private RegionExecutionResult executeInternalCreateTimeSeries(
-        InternalCreateTimeSeriesNode node,
-        WritePlanNodeExecutionContext context,
-        boolean receivedFromPipe) {
-      ISchemaRegion schemaRegion =
+        final InternalCreateTimeSeriesNode node,
+        final WritePlanNodeExecutionContext context,
+        final boolean receivedFromPipe) {
+      final ISchemaRegion schemaRegion =
           schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId());
-      RegionExecutionResult result =
+      final RegionExecutionResult result =
           checkQuotaBeforeCreatingTimeSeries(
               schemaRegion, node.getDevicePath(), 
node.getMeasurementGroup().size());
       if (result != null) {
         return result;
       }
-      if 
(CONFIG.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS))
 {
+      if 
(CONFIG.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)
+          && !receivedFromPipe) {
         context.getRegionWriteValidationRWLock().writeLock().lock();
         try {
-          List<TSStatus> failingStatus = new ArrayList<>();
-          List<TSStatus> alreadyExistingStatus = new ArrayList<>();
-          MeasurementGroup measurementGroup = node.getMeasurementGroup();
-          Map<Integer, MetadataException> failingMeasurementMap =
+          final List<TSStatus> failingStatus = new ArrayList<>();
+          final List<TSStatus> alreadyExistingStatus = new ArrayList<>();
+          final MeasurementGroup measurementGroup = node.getMeasurementGroup();
+          final Map<Integer, MetadataException> failingMeasurementMap =
               schemaRegion.checkMeasurementExistence(
                   node.getDevicePath(),
                   measurementGroup.getMeasurements(),
                   measurementGroup.getAliasList());
           MetadataException metadataException;
           // filter failed measurement and keep the rest for execution
-          for (Map.Entry<Integer, MetadataException> failingMeasurement :
+          for (final Map.Entry<Integer, MetadataException> failingMeasurement :
               failingMeasurementMap.entrySet()) {
             metadataException = failingMeasurement.getValue();
             if (metadataException.getErrorCode()
@@ -592,7 +592,7 @@ public class RegionWriteExecutor {
                           ((MeasurementAlreadyExistException) 
metadataException)
                               .getMeasurementPath())));
             } else {
-              int errorCode = metadataException.getErrorCode();
+              final int errorCode = metadataException.getErrorCode();
               if (errorCode != TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()
                   || errorCode != 
TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) {
                 LOGGER.warn(METADATA_ERROR_MSG, metadataException);
@@ -605,10 +605,7 @@ public class RegionWriteExecutor {
           measurementGroup.removeMeasurements(failingMeasurementMap.keySet());
 
           return processExecutionResultOfInternalCreateSchema(
-              receivedFromPipe
-                  ? super.visitPipeEnrichedWritePlanNode(
-                      new PipeEnrichedWritePlanNode(node), context)
-                  : super.visitInternalCreateTimeSeries(node, context),
+              super.visitInternalCreateTimeSeries(node, context),
               failingStatus,
               alreadyExistingStatus);
         } finally {
@@ -623,7 +620,7 @@ public class RegionWriteExecutor {
 
     @Override
     public RegionExecutionResult visitInternalCreateMultiTimeSeries(
-        InternalCreateMultiTimeSeriesNode node, WritePlanNodeExecutionContext 
context) {
+        final InternalCreateMultiTimeSeriesNode node, final 
WritePlanNodeExecutionContext context) {
       return executeInternalCreateMultiTimeSeries(node, context, false);
     }
 
@@ -643,56 +640,49 @@ public class RegionWriteExecutor {
           return result;
         }
       }
-      if 
(CONFIG.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS))
 {
+      if 
(CONFIG.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)
+          && !receivedFromPipe) {
         context.getRegionWriteValidationRWLock().writeLock().lock();
         try {
           final List<TSStatus> failingStatus = new ArrayList<>();
           final List<TSStatus> alreadyExistingStatus = new ArrayList<>();
 
-          // Do not filter measurements if the node is generated by pipe
-          // Because pipe may use the InternalCreateMultiTimeSeriesNode to 
transfer historical data
-          // And the alias/tags/attributes may need to be updated for existing 
time series
-          if (!receivedFromPipe) {
-            MeasurementGroup measurementGroup;
-            Map<Integer, MetadataException> failingMeasurementMap;
-            MetadataException metadataException;
-            for (final Map.Entry<PartialPath, Pair<Boolean, MeasurementGroup>> 
deviceEntry :
-                node.getDeviceMap().entrySet()) {
-              measurementGroup = deviceEntry.getValue().right;
-              failingMeasurementMap =
-                  schemaRegion.checkMeasurementExistence(
-                      deviceEntry.getKey(),
-                      measurementGroup.getMeasurements(),
-                      measurementGroup.getAliasList());
-              // filter failed measurement and keep the rest for execution
-              for (final Map.Entry<Integer, MetadataException> 
failingMeasurement :
-                  failingMeasurementMap.entrySet()) {
-                metadataException = failingMeasurement.getValue();
-                if (metadataException.getErrorCode()
-                    == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
-                  // There's no need to internal create time series.
-                  alreadyExistingStatus.add(
-                      RpcUtils.getStatus(
-                          metadataException.getErrorCode(),
-                          MeasurementPath.transformDataToString(
-                              ((MeasurementAlreadyExistException) 
metadataException)
-                                  .getMeasurementPath())));
-                } else {
-                  LOGGER.warn(METADATA_ERROR_MSG, metadataException);
-                  failingStatus.add(
-                      RpcUtils.getStatus(
-                          metadataException.getErrorCode(), 
metadataException.getMessage()));
-                }
+          MeasurementGroup measurementGroup;
+          Map<Integer, MetadataException> failingMeasurementMap;
+          MetadataException metadataException;
+          for (final Map.Entry<PartialPath, Pair<Boolean, MeasurementGroup>> 
deviceEntry :
+              node.getDeviceMap().entrySet()) {
+            measurementGroup = deviceEntry.getValue().right;
+            failingMeasurementMap =
+                schemaRegion.checkMeasurementExistence(
+                    deviceEntry.getKey(),
+                    measurementGroup.getMeasurements(),
+                    measurementGroup.getAliasList());
+            // filter failed measurement and keep the rest for execution
+            for (final Map.Entry<Integer, MetadataException> 
failingMeasurement :
+                failingMeasurementMap.entrySet()) {
+              metadataException = failingMeasurement.getValue();
+              if (metadataException.getErrorCode()
+                  == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
+                // There's no need to internal create time series.
+                alreadyExistingStatus.add(
+                    RpcUtils.getStatus(
+                        metadataException.getErrorCode(),
+                        MeasurementPath.transformDataToString(
+                            ((MeasurementAlreadyExistException) 
metadataException)
+                                .getMeasurementPath())));
+              } else {
+                LOGGER.warn(METADATA_ERROR_MSG, metadataException);
+                failingStatus.add(
+                    RpcUtils.getStatus(
+                        metadataException.getErrorCode(), 
metadataException.getMessage()));
               }
-              
measurementGroup.removeMeasurements(failingMeasurementMap.keySet());
             }
+            
measurementGroup.removeMeasurements(failingMeasurementMap.keySet());
           }
 
           return processExecutionResultOfInternalCreateSchema(
-              receivedFromPipe
-                  ? super.visitPipeEnrichedWritePlanNode(
-                      new PipeEnrichedWritePlanNode(node), context)
-                  : super.visitInternalCreateMultiTimeSeries(node, context),
+              super.visitInternalCreateMultiTimeSeries(node, context),
               failingStatus,
               alreadyExistingStatus);
         } finally {
@@ -938,9 +928,9 @@ public class RegionWriteExecutor {
     }
 
     private RegionExecutionResult executeCreateLogicalView(
-        CreateLogicalViewNode node,
-        WritePlanNodeExecutionContext context,
-        boolean receivedFromPipe) {
+        final CreateLogicalViewNode node,
+        final WritePlanNodeExecutionContext context,
+        final boolean receivedFromPipe) {
       ISchemaRegion schemaRegion =
           schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId());
       if 
(CONFIG.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS))
 {


Reply via email to