This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 858a8caecac Pipe: Enhanced the pipe transferred create timeseries 
logic to allow merging tags / attributes (#14261) (#14302)
858a8caecac is described below

commit 858a8caecac9bc850251c4ea68fb4603e99e0ee7
Author: Caideyipi <[email protected]>
AuthorDate: Wed Dec 4 15:03:31 2024 +0800

    Pipe: Enhanced the pipe transferred create timeseries logic to allow 
merging tags / attributes (#14261) (#14302)
---
 .../it/autocreate/IoTDBPipeAutoConflictIT.java     | 51 ++++++++++++++++++++++
 .../schemaregion/SchemaExecutionVisitor.java       | 36 ++++++++++-----
 .../schemaregion/impl/SchemaRegionMemoryImpl.java  |  7 ++-
 3 files changed, 82 insertions(+), 12 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoConflictIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoConflictIT.java
index c00814cab0d..a33e0403a28 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoConflictIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoConflictIT.java
@@ -37,6 +37,7 @@ import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -328,4 +329,54 @@ public class IoTDBPipeAutoConflictIT extends 
AbstractPipeDualAutoIT {
     TestUtils.assertDataEventuallyOnEnv(
         receiverEnv, "select s1 from root.db.d1", "Time,root.db.d1.s1,", 
expectedResSet);
   }
+
+  @Test
+  public void testAutoManualCreateRace() throws Exception {
+    final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    final String receiverIp = receiverDataNode.getIp();
+    final int receiverPort = receiverDataNode.getPort();
+
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      final Map<String, String> extractorAttributes = new HashMap<>();
+      final Map<String, String> processorAttributes = new HashMap<>();
+      final Map<String, String> connectorAttributes = new HashMap<>();
+
+      extractorAttributes.put("extractor.inclusion", "all");
+
+      connectorAttributes.put("connector", "iotdb-thrift-connector");
+      connectorAttributes.put("connector.ip", receiverIp);
+      connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
+
+      final TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("testPipe", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("testPipe").getCode());
+
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          receiverEnv, "create timeSeries root.ln.wf01.wt01.status with 
datatype=BOOLEAN")) {
+        return;
+      }
+
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv,
+          "create timeSeries root.ln.wf01.wt01.status with datatype=BOOLEAN 
tags (tag3=v3) attributes (attr4=v4)")) {
+        return;
+      }
+
+      TestUtils.assertDataEventuallyOnEnv(
+          receiverEnv,
+          "show timeSeries",
+          
"Timeseries,Alias,Database,DataType,Encoding,Compression,Tags,Attributes,Deadband,DeadbandParameters,ViewType,",
+          Collections.singleton(
+              
"root.ln.wf01.wt01.status,null,root.ln,BOOLEAN,RLE,LZ4,{\"tag3\":\"v3\"},{\"attr4\":\"v4\"},null,null,BASE,"));
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
index 21f358c2533..4f4acb3e0f1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
@@ -86,10 +86,11 @@ public class SchemaExecutionVisitor extends 
PlanVisitor<TSStatus, ISchemaRegion>
   private static final Logger logger = 
LoggerFactory.getLogger(SchemaExecutionVisitor.class);
 
   @Override
-  public TSStatus visitCreateTimeSeries(CreateTimeSeriesNode node, 
ISchemaRegion schemaRegion) {
+  public TSStatus visitCreateTimeSeries(
+      final CreateTimeSeriesNode node, final ISchemaRegion schemaRegion) {
     try {
       schemaRegion.createTimeSeries(node, -1);
-    } catch (MetadataException e) {
+    } catch (final MetadataException e) {
       logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
       return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
     }
@@ -98,10 +99,25 @@ public class SchemaExecutionVisitor extends 
PlanVisitor<TSStatus, ISchemaRegion>
 
   @Override
   public TSStatus visitCreateAlignedTimeSeries(
-      CreateAlignedTimeSeriesNode node, ISchemaRegion schemaRegion) {
+      final CreateAlignedTimeSeriesNode node, final ISchemaRegion 
schemaRegion) {
     try {
-      schemaRegion.createAlignedTimeSeries(node);
-    } catch (MetadataException e) {
+      if (node.isGeneratedByPipe()) {
+        final ICreateAlignedTimeSeriesPlan plan =
+            SchemaRegionWritePlanFactory.getCreateAlignedTimeSeriesPlan(
+                node.getDevicePath(),
+                node.getMeasurements(),
+                node.getDataTypes(),
+                node.getEncodings(),
+                node.getCompressors(),
+                node.getAliasList(),
+                node.getTagsList(),
+                node.getAttributesList());
+        ((CreateAlignedTimeSeriesPlanImpl) plan).setWithMerge(true);
+        schemaRegion.createAlignedTimeSeries(plan);
+      } else {
+        schemaRegion.createAlignedTimeSeries(node);
+      }
+    } catch (final MetadataException e) {
       logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
       return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
     }
@@ -110,13 +126,13 @@ public class SchemaExecutionVisitor extends 
PlanVisitor<TSStatus, ISchemaRegion>
 
   @Override
   public TSStatus visitCreateMultiTimeSeries(
-      CreateMultiTimeSeriesNode node, ISchemaRegion schemaRegion) {
-    Map<PartialPath, MeasurementGroup> measurementGroupMap = 
node.getMeasurementGroupMap();
-    List<TSStatus> failingStatus = new ArrayList<>();
+      final CreateMultiTimeSeriesNode node, final ISchemaRegion schemaRegion) {
+    final Map<PartialPath, MeasurementGroup> measurementGroupMap = 
node.getMeasurementGroupMap();
+    final List<TSStatus> failingStatus = new ArrayList<>();
     PartialPath devicePath;
     MeasurementGroup measurementGroup;
     int size;
-    for (Map.Entry<PartialPath, MeasurementGroup> entry : 
measurementGroupMap.entrySet()) {
+    for (final Map.Entry<PartialPath, MeasurementGroup> entry : 
measurementGroupMap.entrySet()) {
       devicePath = entry.getKey();
       measurementGroup = entry.getValue();
       size = measurementGroup.getMeasurements().size();
@@ -139,7 +155,7 @@ public class SchemaExecutionVisitor extends 
PlanVisitor<TSStatus, ISchemaRegion>
   }
 
   private ICreateTimeSeriesPlan transformToCreateTimeSeriesPlan(
-      PartialPath devicePath, MeasurementGroup measurementGroup, int index) {
+      final PartialPath devicePath, final MeasurementGroup measurementGroup, 
final int index) {
     return SchemaRegionWritePlanFactory.getCreateTimeSeriesPlan(
         devicePath.concatNode(measurementGroup.getMeasurements().get(index)),
         measurementGroup.getDataTypes().get(index),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
index c6c705c8ea6..c2717e6dd19 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
@@ -39,6 +39,7 @@ import 
org.apache.iotdb.db.exception.metadata.SchemaDirCreationFailureException;
 import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
 import org.apache.iotdb.db.exception.metadata.SeriesOverflowException;
 import org.apache.iotdb.db.queryengine.common.schematree.ClusterSchemaTree;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
 import org.apache.iotdb.db.schemaengine.metric.ISchemaRegionMetric;
 import org.apache.iotdb.db.schemaengine.metric.SchemaRegionMemMetric;
 import org.apache.iotdb.db.schemaengine.rescon.DataNodeSchemaQuotaManager;
@@ -583,8 +584,10 @@ public class SchemaRegionMemoryImpl implements 
ISchemaRegion {
               plan.getCompressor(),
               plan.getProps(),
               plan.getAlias(),
-              (plan instanceof CreateTimeSeriesPlanImpl
-                  && ((CreateTimeSeriesPlanImpl) plan).isWithMerge()));
+              plan instanceof CreateTimeSeriesPlanImpl
+                      && ((CreateTimeSeriesPlanImpl) plan).isWithMerge()
+                  || plan instanceof CreateTimeSeriesNode
+                      && ((CreateTimeSeriesNode) plan).isGeneratedByPipe());
 
       // Should merge
       if (Objects.isNull(leafMNode)) {

Reply via email to