This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 858a8caecac Pipe: Enhanced the pipe transferred create timeseries
logic to allow merging tags / attributes (#14261) (#14302)
858a8caecac is described below
commit 858a8caecac9bc850251c4ea68fb4603e99e0ee7
Author: Caideyipi <[email protected]>
AuthorDate: Wed Dec 4 15:03:31 2024 +0800
Pipe: Enhanced the pipe transferred create timeseries logic to allow
merging tags / attributes (#14261) (#14302)
---
.../it/autocreate/IoTDBPipeAutoConflictIT.java | 51 ++++++++++++++++++++++
.../schemaregion/SchemaExecutionVisitor.java | 36 ++++++++++-----
.../schemaregion/impl/SchemaRegionMemoryImpl.java | 7 ++-
3 files changed, 82 insertions(+), 12 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoConflictIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoConflictIT.java
index c00814cab0d..a33e0403a28 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoConflictIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoConflictIT.java
@@ -37,6 +37,7 @@ import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -328,4 +329,54 @@ public class IoTDBPipeAutoConflictIT extends
AbstractPipeDualAutoIT {
TestUtils.assertDataEventuallyOnEnv(
receiverEnv, "select s1 from root.db.d1", "Time,root.db.d1.s1,",
expectedResSet);
}
+
+ @Test
+ public void testAutoManualCreateRace() throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ final String receiverIp = receiverDataNode.getIp();
+ final int receiverPort = receiverDataNode.getPort();
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> processorAttributes = new HashMap<>();
+ final Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("extractor.inclusion", "all");
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+
+ final TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("testPipe", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("testPipe").getCode());
+
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ receiverEnv, "create timeSeries root.ln.wf01.wt01.status with
datatype=BOOLEAN")) {
+ return;
+ }
+
+ if (!TestUtils.tryExecuteNonQueryWithRetry(
+ senderEnv,
+ "create timeSeries root.ln.wf01.wt01.status with datatype=BOOLEAN
tags (tag3=v3) attributes (attr4=v4)")) {
+ return;
+ }
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "show timeSeries",
+
"Timeseries,Alias,Database,DataType,Encoding,Compression,Tags,Attributes,Deadband,DeadbandParameters,ViewType,",
+ Collections.singleton(
+
"root.ln.wf01.wt01.status,null,root.ln,BOOLEAN,RLE,LZ4,{\"tag3\":\"v3\"},{\"attr4\":\"v4\"},null,null,BASE,"));
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
index 21f358c2533..4f4acb3e0f1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
@@ -86,10 +86,11 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
private static final Logger logger =
LoggerFactory.getLogger(SchemaExecutionVisitor.class);
@Override
- public TSStatus visitCreateTimeSeries(CreateTimeSeriesNode node,
ISchemaRegion schemaRegion) {
+ public TSStatus visitCreateTimeSeries(
+ final CreateTimeSeriesNode node, final ISchemaRegion schemaRegion) {
try {
schemaRegion.createTimeSeries(node, -1);
- } catch (MetadataException e) {
+ } catch (final MetadataException e) {
logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
@@ -98,10 +99,25 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
@Override
public TSStatus visitCreateAlignedTimeSeries(
- CreateAlignedTimeSeriesNode node, ISchemaRegion schemaRegion) {
+ final CreateAlignedTimeSeriesNode node, final ISchemaRegion
schemaRegion) {
try {
- schemaRegion.createAlignedTimeSeries(node);
- } catch (MetadataException e) {
+ if (node.isGeneratedByPipe()) {
+ final ICreateAlignedTimeSeriesPlan plan =
+ SchemaRegionWritePlanFactory.getCreateAlignedTimeSeriesPlan(
+ node.getDevicePath(),
+ node.getMeasurements(),
+ node.getDataTypes(),
+ node.getEncodings(),
+ node.getCompressors(),
+ node.getAliasList(),
+ node.getTagsList(),
+ node.getAttributesList());
+ ((CreateAlignedTimeSeriesPlanImpl) plan).setWithMerge(true);
+ schemaRegion.createAlignedTimeSeries(plan);
+ } else {
+ schemaRegion.createAlignedTimeSeries(node);
+ }
+ } catch (final MetadataException e) {
logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
@@ -110,13 +126,13 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
@Override
public TSStatus visitCreateMultiTimeSeries(
- CreateMultiTimeSeriesNode node, ISchemaRegion schemaRegion) {
- Map<PartialPath, MeasurementGroup> measurementGroupMap =
node.getMeasurementGroupMap();
- List<TSStatus> failingStatus = new ArrayList<>();
+ final CreateMultiTimeSeriesNode node, final ISchemaRegion schemaRegion) {
+ final Map<PartialPath, MeasurementGroup> measurementGroupMap =
node.getMeasurementGroupMap();
+ final List<TSStatus> failingStatus = new ArrayList<>();
PartialPath devicePath;
MeasurementGroup measurementGroup;
int size;
- for (Map.Entry<PartialPath, MeasurementGroup> entry :
measurementGroupMap.entrySet()) {
+ for (final Map.Entry<PartialPath, MeasurementGroup> entry :
measurementGroupMap.entrySet()) {
devicePath = entry.getKey();
measurementGroup = entry.getValue();
size = measurementGroup.getMeasurements().size();
@@ -139,7 +155,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
}
private ICreateTimeSeriesPlan transformToCreateTimeSeriesPlan(
- PartialPath devicePath, MeasurementGroup measurementGroup, int index) {
+ final PartialPath devicePath, final MeasurementGroup measurementGroup,
final int index) {
return SchemaRegionWritePlanFactory.getCreateTimeSeriesPlan(
devicePath.concatNode(measurementGroup.getMeasurements().get(index)),
measurementGroup.getDataTypes().get(index),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
index c6c705c8ea6..c2717e6dd19 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
@@ -39,6 +39,7 @@ import
org.apache.iotdb.db.exception.metadata.SchemaDirCreationFailureException;
import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
import org.apache.iotdb.db.exception.metadata.SeriesOverflowException;
import org.apache.iotdb.db.queryengine.common.schematree.ClusterSchemaTree;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.schemaengine.metric.ISchemaRegionMetric;
import org.apache.iotdb.db.schemaengine.metric.SchemaRegionMemMetric;
import org.apache.iotdb.db.schemaengine.rescon.DataNodeSchemaQuotaManager;
@@ -583,8 +584,10 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
plan.getCompressor(),
plan.getProps(),
plan.getAlias(),
- (plan instanceof CreateTimeSeriesPlanImpl
- && ((CreateTimeSeriesPlanImpl) plan).isWithMerge()));
+ plan instanceof CreateTimeSeriesPlanImpl
+ && ((CreateTimeSeriesPlanImpl) plan).isWithMerge()
+ || plan instanceof CreateTimeSeriesNode
+ && ((CreateTimeSeriesNode) plan).isGeneratedByPipe());
// Should merge
if (Objects.isNull(leafMNode)) {