This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 638be8ae212 Pipe: Added batch to schema snapshot execution in template
activation and timeseries creation & Fixed the bug that the
"CreateMultiTimeSeries" group with alias is not idempotent (#12380)
638be8ae212 is described below
commit 638be8ae212ae7fff87a34d391a946705cf5749a
Author: Caideyipi <[email protected]>
AuthorDate: Wed Apr 24 12:09:20 2024 +0800
Pipe: Added batch to schema snapshot execution in template activation and
timeseries creation & Fixed the bug that the "CreateMultiTimeSeries" group with
alias is not idempotent (#12380)
---
.../schema/PipeSchemaRegionSnapshotEvent.java | 55 +++---
.../protocol/thrift/IoTDBDataNodeReceiver.java | 56 ++++---
.../visitor/PipeStatementTSStatusVisitor.java | 52 +++---
.../visitor/PipeStatementToBatchVisitor.java | 185 +++++++++++++++++++++
.../apache/iotdb/commons/conf/CommonConfig.java | 9 +
.../iotdb/commons/conf/CommonDescriptor.java | 5 +
.../iotdb/commons/pipe/config/PipeConfig.java | 5 +
7 files changed, 303 insertions(+), 64 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
index 541f0d4638e..e984c0c6f09 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
@@ -55,10 +55,20 @@ public class PipeSchemaRegionSnapshotEvent extends
PipeSnapshotEvent {
PLAN_NODE_2_STATEMENT_TYPE_MAP.put(
PlanNodeType.CREATE_ALIGNED_TIME_SERIES.getNodeType(),
StatementType.CREATE_ALIGNED_TIME_SERIES);
+ PLAN_NODE_2_STATEMENT_TYPE_MAP.put(
+ PlanNodeType.INTERNAL_CREATE_MULTI_TIMESERIES.getNodeType(),
+ StatementType.INTERNAL_CREATE_MULTI_TIMESERIES);
+
PLAN_NODE_2_STATEMENT_TYPE_MAP.put(
PlanNodeType.ACTIVATE_TEMPLATE.getNodeType(),
StatementType.ACTIVATE_TEMPLATE);
+ PLAN_NODE_2_STATEMENT_TYPE_MAP.put(
+ PlanNodeType.BATCH_ACTIVATE_TEMPLATE.getNodeType(),
StatementType.BATCH_ACTIVATE_TEMPLATE);
+
PLAN_NODE_2_STATEMENT_TYPE_MAP.put(
PlanNodeType.CREATE_LOGICAL_VIEW.getNodeType(),
StatementType.CREATE_LOGICAL_VIEW);
+ // For logical view
+ PLAN_NODE_2_STATEMENT_TYPE_MAP.put(
+ PlanNodeType.ALTER_TIME_SERIES.getNodeType(),
StatementType.ALTER_TIME_SERIES);
}
public PipeSchemaRegionSnapshotEvent() {
@@ -67,17 +77,17 @@ public class PipeSchemaRegionSnapshotEvent extends
PipeSnapshotEvent {
}
public PipeSchemaRegionSnapshotEvent(
- String mTreeSnapshotPath, String tagLogSnapshotPath, String
databaseName) {
+ final String mTreeSnapshotPath, final String tagLogSnapshotPath, final
String databaseName) {
this(mTreeSnapshotPath, tagLogSnapshotPath, databaseName, null, null,
null);
}
public PipeSchemaRegionSnapshotEvent(
- String mTreeSnapshotPath,
- String tagLogSnapshotPath,
- String databaseName,
- String pipeName,
- PipeTaskMeta pipeTaskMeta,
- PipePattern pattern) {
+ final String mTreeSnapshotPath,
+ final String tagLogSnapshotPath,
+ final String databaseName,
+ final String pipeName,
+ final PipeTaskMeta pipeTaskMeta,
+ final PipePattern pattern) {
super(pipeName, pipeTaskMeta, pattern, PipeResourceManager.snapshot());
this.mTreeSnapshotPath = mTreeSnapshotPath;
this.tagLogSnapshotPath = Objects.nonNull(tagLogSnapshotPath) ?
tagLogSnapshotPath : "";
@@ -97,14 +107,14 @@ public class PipeSchemaRegionSnapshotEvent extends
PipeSnapshotEvent {
}
@Override
- public boolean internallyIncreaseResourceReferenceCount(String
holderMessage) {
+ public boolean internallyIncreaseResourceReferenceCount(final String
holderMessage) {
try {
mTreeSnapshotPath =
resourceManager.increaseSnapshotReference(mTreeSnapshotPath);
if (!tagLogSnapshotPath.isEmpty()) {
tagLogSnapshotPath =
resourceManager.increaseSnapshotReference(tagLogSnapshotPath);
}
return true;
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.warn(
String.format(
"Increase reference count for mTree snapshot %s or tLog %s
error. Holder Message: %s",
@@ -115,14 +125,14 @@ public class PipeSchemaRegionSnapshotEvent extends
PipeSnapshotEvent {
}
@Override
- public boolean internallyDecreaseResourceReferenceCount(String
holderMessage) {
+ public boolean internallyDecreaseResourceReferenceCount(final String
holderMessage) {
try {
resourceManager.decreaseSnapshotReference(mTreeSnapshotPath);
if (!tagLogSnapshotPath.isEmpty()) {
resourceManager.decreaseSnapshotReference(tagLogSnapshotPath);
}
return true;
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.warn(
String.format(
"Decrease reference count for mTree snapshot %s or tLog %s
error. Holder Message: %s",
@@ -134,18 +144,18 @@ public class PipeSchemaRegionSnapshotEvent extends
PipeSnapshotEvent {
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- String pipeName,
- PipeTaskMeta pipeTaskMeta,
- PipePattern pattern,
- long startTime,
- long endTime) {
+ final String pipeName,
+ final PipeTaskMeta pipeTaskMeta,
+ final PipePattern pattern,
+ final long startTime,
+ final long endTime) {
return new PipeSchemaRegionSnapshotEvent(
mTreeSnapshotPath, tagLogSnapshotPath, databaseName, pipeName,
pipeTaskMeta, pattern);
}
@Override
public ByteBuffer serializeToByteBuffer() {
- ByteBuffer result =
+ final ByteBuffer result =
ByteBuffer.allocate(
Byte.BYTES
+ 3 * Integer.BYTES
@@ -160,7 +170,7 @@ public class PipeSchemaRegionSnapshotEvent extends
PipeSnapshotEvent {
}
@Override
- public void deserializeFromByteBuffer(ByteBuffer buffer) {
+ public void deserializeFromByteBuffer(final ByteBuffer buffer) {
mTreeSnapshotPath = ReadWriteIOUtils.readString(buffer);
tagLogSnapshotPath = ReadWriteIOUtils.readString(buffer);
databaseName = ReadWriteIOUtils.readString(buffer);
@@ -168,22 +178,23 @@ public class PipeSchemaRegionSnapshotEvent extends
PipeSnapshotEvent {
/////////////////////////////// Type parsing ///////////////////////////////
- public static boolean needTransferSnapshot(Set<PlanNodeType>
listenedTypeSet) {
+ public static boolean needTransferSnapshot(final Set<PlanNodeType>
listenedTypeSet) {
final Set<Short> types = new
HashSet<>(PLAN_NODE_2_STATEMENT_TYPE_MAP.keySet());
types.retainAll(
listenedTypeSet.stream().map(PlanNodeType::getNodeType).collect(Collectors.toSet()));
return !types.isEmpty();
}
- public void confineTransferredTypes(Set<PlanNodeType> listenedTypeSet) {
+ public void confineTransferredTypes(final Set<PlanNodeType> listenedTypeSet)
{
final Set<Short> types = new
HashSet<>(PLAN_NODE_2_STATEMENT_TYPE_MAP.keySet());
types.retainAll(
listenedTypeSet.stream().map(PlanNodeType::getNodeType).collect(Collectors.toSet()));
transferredTypes = types;
}
- public static Set<StatementType> getStatementTypeSet(String sealTypes) {
- Map<Short, StatementType> statementTypeMap = new
HashMap<>(PLAN_NODE_2_STATEMENT_TYPE_MAP);
+ public static Set<StatementType> getStatementTypeSet(final String sealTypes)
{
+ final Map<Short, StatementType> statementTypeMap =
+ new HashMap<>(PLAN_NODE_2_STATEMENT_TYPE_MAP);
statementTypeMap
.keySet()
.retainAll(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index c47ba81f5c9..ea8a1efc618 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -50,6 +50,7 @@ import
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionSnapshotEven
import org.apache.iotdb.db.pipe.receiver.visitor.PipePlanToStatementVisitor;
import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementExceptionVisitor;
import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementTSStatusVisitor;
+import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementToBatchVisitor;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
@@ -89,6 +90,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -106,6 +108,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
private final PipeStatementTSStatusVisitor statusVisitor = new
PipeStatementTSStatusVisitor();
private final PipeStatementExceptionVisitor exceptionVisitor =
new PipeStatementExceptionVisitor();
+ private final PipeStatementToBatchVisitor batchVisitor = new
PipeStatementToBatchVisitor();
// Used for data transfer: confignode (cluster A) -> datanode (cluster B) ->
confignode (cluster
// B).
@@ -120,7 +123,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
folderManager =
new FolderManager(
Arrays.asList(RECEIVER_FILE_BASE_DIRS),
DirectoryStrategyType.SEQUENCE_STRATEGY);
- } catch (DiskSpaceInsufficientException e) {
+ } catch (final DiskSpaceInsufficientException e) {
LOGGER.error(
"Fail to create pipe receiver file folders allocation strategy
because all disks of folders are full.",
e);
@@ -198,14 +201,15 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
receiverId.get(),
status);
return new TPipeTransferResp(status);
- } catch (IOException e) {
+ } catch (final IOException e) {
final String error = String.format("Serialization error during pipe
receiving, %s", e);
LOGGER.warn("Receiver id = {}: {}", receiverId.get(), error, e);
return new TPipeTransferResp(RpcUtils.getStatus(TSStatusCode.PIPE_ERROR,
error));
}
}
- private TPipeTransferResp
handleTransferTabletInsertNode(PipeTransferTabletInsertNodeReq req) {
+ private TPipeTransferResp handleTransferTabletInsertNode(
+ final PipeTransferTabletInsertNodeReq req) {
final InsertBaseStatement statement = req.constructStatement();
return new TPipeTransferResp(
statement.isEmpty()
@@ -213,7 +217,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
: executeStatementAndClassifyExceptions(statement));
}
- private TPipeTransferResp
handleTransferTabletBinary(PipeTransferTabletBinaryReq req) {
+ private TPipeTransferResp handleTransferTabletBinary(final
PipeTransferTabletBinaryReq req) {
final InsertBaseStatement statement = req.constructStatement();
return new TPipeTransferResp(
statement.isEmpty()
@@ -221,7 +225,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
: executeStatementAndClassifyExceptions(statement));
}
- private TPipeTransferResp handleTransferTabletRaw(PipeTransferTabletRawReq
req) {
+ private TPipeTransferResp handleTransferTabletRaw(final
PipeTransferTabletRawReq req) {
final InsertTabletStatement statement = req.constructStatement();
return new TPipeTransferResp(
statement.isEmpty()
@@ -229,7 +233,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
: executeStatementAndClassifyExceptions(statement));
}
- private TPipeTransferResp
handleTransferTabletBatch(PipeTransferTabletBatchReq req) {
+ private TPipeTransferResp handleTransferTabletBatch(final
PipeTransferTabletBatchReq req) {
final Pair<InsertRowsStatement, InsertMultiTabletsStatement> statementPair
=
req.constructStatements();
return new TPipeTransferResp(
@@ -256,13 +260,14 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
}
@Override
- protected TSStatus loadFileV1(PipeTransferFileSealReqV1 req, String
fileAbsolutePath)
+ protected TSStatus loadFileV1(final PipeTransferFileSealReqV1 req, final
String fileAbsolutePath)
throws FileNotFoundException {
return loadTsFile(fileAbsolutePath);
}
@Override
- protected TSStatus loadFileV2(PipeTransferFileSealReqV2 req, List<String>
fileAbsolutePaths)
+ protected TSStatus loadFileV2(
+ final PipeTransferFileSealReqV2 req, final List<String>
fileAbsolutePaths)
throws IOException, IllegalPathException {
return req instanceof PipeTransferTsFileSealWithModReq
// TsFile's absolute path will be the second element
@@ -270,7 +275,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
: loadSchemaSnapShot(req.getParameters(), fileAbsolutePaths);
}
- private TSStatus loadTsFile(String fileAbsolutePath) throws
FileNotFoundException {
+ private TSStatus loadTsFile(final String fileAbsolutePath) throws
FileNotFoundException {
final LoadTsFileStatement statement = new
LoadTsFileStatement(fileAbsolutePath);
statement.setDeleteAfterLoad(true);
@@ -281,7 +286,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
}
private TSStatus loadSchemaSnapShot(
- Map<String, String> parameters, List<String> fileAbsolutePaths)
+ final Map<String, String> parameters, final List<String>
fileAbsolutePaths)
throws IllegalPathException, IOException {
final SRStatementGenerator generator =
SchemaRegionSnapshotParser.translate2Statements(
@@ -291,19 +296,30 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
final Set<StatementType> executionTypes =
PipeSchemaRegionSnapshotEvent.getStatementTypeSet(
parameters.get(ColumnHeaderConstant.TYPE));
+
+ // Clear to avoid previous exceptions
+ batchVisitor.clear();
final List<TSStatus> results = new ArrayList<>();
while (generator.hasNext()) {
- final Statement statement = generator.next();
- if (executionTypes.contains(statement.getType())) {
- // The statements do not contain AlterLogicalViewStatements
- // Here we apply the statements as many as possible
- results.add(executeStatementAndClassifyExceptions(statement));
+ final Statement originalStatement = generator.next();
+ if (!executionTypes.contains(originalStatement.getType())) {
+ continue;
}
+
+ // The statements do not contain AlterLogicalViewStatements
+ // Here we apply the statements as many as possible
+ // Even if there are failed statements
+ batchVisitor
+ .process(originalStatement, null)
+ .ifPresent(statement ->
results.add(executeStatementAndClassifyExceptions(statement)));
}
+ batchVisitor.getRemainBatches().stream()
+ .filter(Optional::isPresent)
+ .forEach(statement ->
results.add(executeStatementAndClassifyExceptions(statement.get())));
return PipeReceiverStatusHandler.getPriorStatus(results);
}
- private TPipeTransferResp handleTransferSchemaPlan(PipeTransferPlanNodeReq
req) {
+ private TPipeTransferResp handleTransferSchemaPlan(final
PipeTransferPlanNodeReq req) {
// We may be able to skip the alter logical view's exception parsing
because
// the "AlterLogicalViewNode" is itself idempotent
return req.getPlanNode() instanceof AlterLogicalViewNode
@@ -315,7 +331,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
new PipePlanToStatementVisitor().process(req.getPlanNode(),
null)));
}
- private TPipeTransferResp handleTransferConfigPlan(TPipeTransferReq req) {
+ private TPipeTransferResp handleTransferConfigPlan(final TPipeTransferReq
req) {
return ClusterConfigTaskExecutor.getInstance()
.handleTransferConfigPlan(getConfigReceiverId(), req);
}
@@ -333,7 +349,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
return configReceiverId.get();
}
- private TSStatus executeStatementAndClassifyExceptions(Statement statement) {
+ private TSStatus executeStatementAndClassifyExceptions(final Statement
statement) {
try {
final TSStatus result = executeStatement(statement);
if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -346,7 +362,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
result);
return statement.accept(statusVisitor, result);
}
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.warn(
"Receiver id = {}: Exception encountered while executing statement
{}: ",
receiverId.get(),
@@ -382,7 +398,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
if (Objects.nonNull(configReceiverId.get())) {
try {
ClusterConfigTaskExecutor.getInstance().handlePipeConfigClientExit(configReceiverId.get());
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.warn("Failed to handle config client (id = {}) exit",
configReceiverId.get(), e);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
index 117eae0a689..4412eccf18e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
@@ -46,32 +46,36 @@ import org.apache.iotdb.rpc.TSStatusCode;
*/
public class PipeStatementTSStatusVisitor extends StatementVisitor<TSStatus,
TSStatus> {
@Override
- public TSStatus visitNode(StatementNode node, TSStatus context) {
+ public TSStatus visitNode(final StatementNode node, final TSStatus context) {
return context;
}
@Override
- public TSStatus visitInsertTablet(InsertTabletStatement
insertTabletStatement, TSStatus context) {
+ public TSStatus visitInsertTablet(
+ final InsertTabletStatement insertTabletStatement, final TSStatus
context) {
return visitInsertBase(insertTabletStatement, context);
}
@Override
- public TSStatus visitInsertRow(InsertRowStatement insertRowStatement,
TSStatus context) {
+ public TSStatus visitInsertRow(
+ final InsertRowStatement insertRowStatement, final TSStatus context) {
return visitInsertBase(insertRowStatement, context);
}
@Override
- public TSStatus visitInsertRows(InsertRowsStatement insertRowsStatement,
TSStatus context) {
+ public TSStatus visitInsertRows(
+ final InsertRowsStatement insertRowsStatement, final TSStatus context) {
return visitInsertBase(insertRowsStatement, context);
}
@Override
public TSStatus visitInsertMultiTablets(
- InsertMultiTabletsStatement insertMultiTabletsStatement, TSStatus
context) {
+ final InsertMultiTabletsStatement insertMultiTabletsStatement, final
TSStatus context) {
return visitInsertBase(insertMultiTabletsStatement, context);
}
- private TSStatus visitInsertBase(InsertBaseStatement insertBaseStatement,
TSStatus context) {
+ private TSStatus visitInsertBase(
+ final InsertBaseStatement insertBaseStatement, final TSStatus context) {
if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) {
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
@@ -80,17 +84,18 @@ public class PipeStatementTSStatusVisitor extends
StatementVisitor<TSStatus, TSS
}
@Override
- public TSStatus visitCreateTimeseries(CreateTimeSeriesStatement statement,
TSStatus context) {
+ public TSStatus visitCreateTimeseries(
+ final CreateTimeSeriesStatement statement, final TSStatus context) {
return visitGeneralCreateTimeSeries(statement, context);
}
@Override
public TSStatus visitCreateAlignedTimeseries(
- CreateAlignedTimeSeriesStatement statement, TSStatus context) {
+ final CreateAlignedTimeSeriesStatement statement, final TSStatus
context) {
return visitGeneralCreateTimeSeries(statement, context);
}
- private TSStatus visitGeneralCreateTimeSeries(Statement statement, TSStatus
context) {
+ private TSStatus visitGeneralCreateTimeSeries(final Statement statement,
final TSStatus context) {
if (context.getCode() ==
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()
|| context.getCode() ==
TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) {
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
@@ -106,28 +111,31 @@ public class PipeStatementTSStatusVisitor extends
StatementVisitor<TSStatus, TSS
@Override
public TSStatus visitCreateMultiTimeseries(
- CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, TSStatus
context) {
+ final CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement,
final TSStatus context) {
return visitGeneralCreateMultiTimeseries(createMultiTimeSeriesStatement,
context);
}
@Override
public TSStatus visitInternalCreateTimeseries(
- InternalCreateTimeSeriesStatement internalCreateTimeSeriesStatement,
TSStatus context) {
+ final InternalCreateTimeSeriesStatement
internalCreateTimeSeriesStatement,
+ final TSStatus context) {
return
visitGeneralCreateMultiTimeseries(internalCreateTimeSeriesStatement, context);
}
@Override
public TSStatus visitInternalCreateMultiTimeSeries(
- InternalCreateMultiTimeSeriesStatement
internalCreateMultiTimeSeriesStatement,
- TSStatus context) {
+ final InternalCreateMultiTimeSeriesStatement
internalCreateMultiTimeSeriesStatement,
+ final TSStatus context) {
return
visitGeneralCreateMultiTimeseries(internalCreateMultiTimeSeriesStatement,
context);
}
- private TSStatus visitGeneralCreateMultiTimeseries(Statement statement,
TSStatus context) {
+ private TSStatus visitGeneralCreateMultiTimeseries(
+ final Statement statement, final TSStatus context) {
if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
- for (TSStatus status : context.getSubStatus()) {
+ for (final TSStatus status : context.getSubStatus()) {
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
- && status.getCode() !=
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
+ && status.getCode() !=
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()
+ && status.getCode() !=
TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) {
if (status.getCode() ==
TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode()) {
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
@@ -146,7 +154,7 @@ public class PipeStatementTSStatusVisitor extends
StatementVisitor<TSStatus, TSS
@Override
public TSStatus visitAlterTimeseries(
- AlterTimeSeriesStatement alterTimeSeriesStatement, TSStatus context) {
+ final AlterTimeSeriesStatement alterTimeSeriesStatement, final TSStatus
context) {
if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) {
if (context.getMessage().contains("already")) {
return new TSStatus(
@@ -165,12 +173,12 @@ public class PipeStatementTSStatusVisitor extends
StatementVisitor<TSStatus, TSS
@Override
public TSStatus visitCreateLogicalView(
- CreateLogicalViewStatement createLogicalViewStatement, TSStatus context)
{
+ final CreateLogicalViewStatement createLogicalViewStatement, final
TSStatus context) {
if (context.getCode() ==
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
} else if (context.getCode() ==
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
- for (TSStatus status : context.getSubStatus()) {
+ for (final TSStatus status : context.getSubStatus()) {
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& status.getCode() !=
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
return visitStatement(createLogicalViewStatement, context);
@@ -184,18 +192,18 @@ public class PipeStatementTSStatusVisitor extends
StatementVisitor<TSStatus, TSS
@Override
public TSStatus visitActivateTemplate(
- ActivateTemplateStatement activateTemplateStatement, TSStatus context) {
+ final ActivateTemplateStatement activateTemplateStatement, final
TSStatus context) {
return visitGeneralActivateTemplate(activateTemplateStatement, context);
}
@Override
public TSStatus visitBatchActivateTemplate(
- BatchActivateTemplateStatement batchActivateTemplateStatement, TSStatus
context) {
+ final BatchActivateTemplateStatement batchActivateTemplateStatement,
final TSStatus context) {
return visitGeneralActivateTemplate(batchActivateTemplateStatement,
context);
}
private TSStatus visitGeneralActivateTemplate(
- Statement activateTemplateStatement, TSStatus context) {
+ final Statement activateTemplateStatement, final TSStatus context) {
if (context.getCode() == TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementToBatchVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementToBatchVisitor.java
new file mode 100644
index 00000000000..213730a6c90
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementToBatchVisitor.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.receiver.visitor;
+
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.MeasurementGroup;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+import
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateTimeSeriesStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.ActivateTemplateStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.BatchActivateTemplateStatement;
+
+import org.apache.tsfile.utils.Pair;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * This visitor will convert {@link Statement}s to batch and is stateful, and
can only be used in a
+ * single thread. It only guarantees the correctness when is used by the
schema snapshot and shall
+ * not be used for other purposes.
+ *
+ * <p>When the visitor is called, it shall encounter no:
+ *
+ * <p>1. {@link StatementNode}s other than {@link Statement}s
+ *
+ * <p>2. TimeSeries with identical path.
+ *
+ * <p>3. Alter to the timeSeries and templates in batch.
+ */
+public class PipeStatementToBatchVisitor extends
StatementVisitor<Optional<Statement>, Void> {
+
+ private static final int MAX_SCHEMA_BATCH_SIZE =
+ PipeConfig.getInstance().getPipeSnapshotExecutionMaxBatchSize();
+
+ private final List<CreateTimeSeriesStatement> createTimeSeriesStatements =
new ArrayList<>();
+
+ private final List<CreateAlignedTimeSeriesStatement>
createAlignedTimeSeriesStatements =
+ new ArrayList<>();
+
+ private final List<ActivateTemplateStatement> activateTemplateStatements =
new ArrayList<>();
+
+ @Override
+ public Optional<Statement> visitNode(final StatementNode statement, final
Void context) {
+ return Optional.of((Statement) statement);
+ }
+
+ @Override
+ public Optional<Statement> visitCreateTimeseries(
+ final CreateTimeSeriesStatement statement, final Void context) {
+ createTimeSeriesStatements.add(statement);
+ return createTimeSeriesStatements.size() +
createAlignedTimeSeriesStatements.size()
+ >= MAX_SCHEMA_BATCH_SIZE
+ ? Optional.of(getTimeSeriesBatchStatement())
+ : Optional.empty();
+ }
+
+ @Override
+ public Optional<Statement> visitCreateAlignedTimeseries(
+ final CreateAlignedTimeSeriesStatement statement, final Void context) {
+ createAlignedTimeSeriesStatements.add(statement);
+ return createTimeSeriesStatements.size() +
createAlignedTimeSeriesStatements.size()
+ >= MAX_SCHEMA_BATCH_SIZE
+ ? Optional.of(getTimeSeriesBatchStatement())
+ : Optional.empty();
+ }
+
+ private InternalCreateMultiTimeSeriesStatement getTimeSeriesBatchStatement()
{
+ final InternalCreateMultiTimeSeriesStatement
internalCreateMultiTimeSeriesStatement =
+ new InternalCreateMultiTimeSeriesStatement(new HashMap<>());
+ createTimeSeriesStatements.forEach(
+ statement ->
+ addNonAlignedTimeSeriesToBatchStatement(
+ statement, internalCreateMultiTimeSeriesStatement));
+ createAlignedTimeSeriesStatements.forEach(
+ statement ->
+ addAlignedTimeSeriesToBatchStatement(
+ statement, internalCreateMultiTimeSeriesStatement));
+ createTimeSeriesStatements.clear();
+ createAlignedTimeSeriesStatements.clear();
+ return internalCreateMultiTimeSeriesStatement;
+ }
+
+ private void addNonAlignedTimeSeriesToBatchStatement(
+ final CreateTimeSeriesStatement statement,
+ final InternalCreateMultiTimeSeriesStatement
internalCreateMultiTimeSeriesStatement) {
+ final MeasurementGroup group =
+ internalCreateMultiTimeSeriesStatement
+ .getDeviceMap()
+ .computeIfAbsent(
+ statement.getPath().getDevicePath(),
+ devicePath -> new Pair<>(false, new MeasurementGroup()))
+ .getRight();
+ group.addMeasurement(
+ statement.getPath().getMeasurement(),
+ statement.getDataType(),
+ statement.getEncoding(),
+ statement.getCompressor());
+ group.addAttributes(statement.getAttributes());
+ group.addTags(statement.getTags());
+ group.addProps(statement.getProps());
+ group.addAlias(statement.getAlias());
+ }
+
+ private void addAlignedTimeSeriesToBatchStatement(
+ final CreateAlignedTimeSeriesStatement statement,
+ final InternalCreateMultiTimeSeriesStatement
internalCreateMultiTimeSeriesStatement) {
+ final MeasurementGroup group =
+ internalCreateMultiTimeSeriesStatement
+ .getDeviceMap()
+ .computeIfAbsent(
+ statement.getDevicePath(), devicePath -> new Pair<>(true, new
MeasurementGroup()))
+ .getRight();
+ for (int i = 0; i < statement.getMeasurements().size(); ++i) {
+ group.addMeasurement(
+ statement.getMeasurements().get(i),
+ statement.getDataTypes().get(i),
+ statement.getEncodings().get(i),
+ statement.getCompressors().get(i));
+ group.addProps(new HashMap<>());
+ }
+ statement.getTagsList().forEach(group::addTags);
+ statement.getAttributesList().forEach(group::addAttributes);
+ statement.getAliasList().forEach(group::addAlias);
+ }
+
+ @Override
+ public Optional<Statement> visitActivateTemplate(
+ final ActivateTemplateStatement activateTemplateStatement, final Void
context) {
+ activateTemplateStatements.add(activateTemplateStatement);
+ return activateTemplateStatements.size() >= MAX_SCHEMA_BATCH_SIZE
+ ? Optional.of(getTemplateBatchStatement())
+ : Optional.empty();
+ }
+
+ private BatchActivateTemplateStatement getTemplateBatchStatement() {
+ final BatchActivateTemplateStatement batchActivateTemplateStatement =
+ new BatchActivateTemplateStatement(
+ activateTemplateStatements.stream()
+ .map(ActivateTemplateStatement::getPath)
+ .collect(Collectors.toList()));
+ activateTemplateStatements.clear();
+ return batchActivateTemplateStatement;
+ }
+
+ public List<Optional<Statement>> getRemainBatches() {
+ return Arrays.asList(
+ !(createTimeSeriesStatements.isEmpty() &&
createAlignedTimeSeriesStatements.isEmpty())
+ ? Optional.of(getTimeSeriesBatchStatement())
+ : Optional.empty(),
+ !activateTemplateStatements.isEmpty()
+ ? Optional.of(getTemplateBatchStatement())
+ : Optional.empty());
+ }
+
+ public void clear() {
+ createTimeSeriesStatements.clear();
+ createAlignedTimeSeriesStatements.clear();
+ activateTemplateStatements.clear();
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 29ce653606c..5057ea05b98 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -227,6 +227,7 @@ public class CommonConfig {
private long pipeMemoryExpanderIntervalSeconds = (long) 3 * 60; // 3Min
private float pipeLeaderCacheMemoryUsagePercentage = 0.1F;
private long pipeListeningQueueTransferSnapshotThreshold = 1000;
+ private int pipeSnapshotExecutionMaxBatchSize = 1000;
private int subscriptionSubtaskExecutorMaxThreadNum =
Math.min(5, Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
@@ -961,6 +962,14 @@ public class CommonConfig {
this.pipeListeningQueueTransferSnapshotThreshold =
pipeListeningQueueTransferSnapshotThreshold;
}
+ public int getPipeSnapshotExecutionMaxBatchSize() {
+ return pipeSnapshotExecutionMaxBatchSize;
+ }
+
+ public void setPipeSnapshotExecutionMaxBatchSize(int
pipeSnapshotExecutionMaxBatchSize) {
+ this.pipeSnapshotExecutionMaxBatchSize = pipeSnapshotExecutionMaxBatchSize;
+ }
+
public int getSubscriptionSubtaskExecutorMaxThreadNum() {
return subscriptionSubtaskExecutorMaxThreadNum;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 4e347ccd41a..f64a79e36f4 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -516,6 +516,11 @@ public class CommonDescriptor {
properties.getProperty(
"pipe_listening_queue_transfer_snapshot_threshold",
String.valueOf(config.getPipeListeningQueueTransferSnapshotThreshold()))));
+ config.setPipeSnapshotExecutionMaxBatchSize(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_snapshot_execution_max_batch_size",
+
String.valueOf(config.getPipeSnapshotExecutionMaxBatchSize()))));
}
private void loadSubscriptionProps(Properties properties) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 88405d64c93..6fab25c9b70 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -135,6 +135,10 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeListeningQueueTransferSnapshotThreshold();
}
+ public int getPipeSnapshotExecutionMaxBatchSize() {
+ return COMMON_CONFIG.getPipeSnapshotExecutionMaxBatchSize();
+ }
+
/////////////////////////////// Meta Consistency
///////////////////////////////
public boolean isSeperatedPipeHeartbeatEnabled() {
@@ -291,6 +295,7 @@ public class PipeConfig {
LOGGER.info(
"PipeListeningQueueTransferSnapshotThreshold: {}",
getPipeListeningQueueTransferSnapshotThreshold());
+ LOGGER.info("PipeSnapshotExecutionMaxBatchSize: {}",
getPipeSnapshotExecutionMaxBatchSize());
LOGGER.info("PipeAsyncConnectorSelectorNumber: {}",
getPipeAsyncConnectorSelectorNumber());
LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}",
getPipeAsyncConnectorMaxClientNumber());