This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 837711dcae7 Pipe: Implemented schema batch for meta transfer (#17665)
837711dcae7 is described below
commit 837711dcae7d2e04a15dcf68ad910fb2cd988417
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 30 10:07:50 2026 +0800
Pipe: Implemented schema batch for meta transfer (#17665)
* Schema
* fix
* fix
* Update PipeSchemaRegionSinkMetricsTest.java
* spt
* Address schema batch review comments
* Avoid extra copy in plan node airgap transfer
* Fix pipe plan node byte buffer test
---
.../agent/task/subtask/sink/PipeSinkSubtask.java | 13 +
.../metric/schema/PipeSchemaRegionSinkMetrics.java | 58 +++
.../batch/PipeSchemaRegionWritePlanEventBatch.java | 557 +++++++++++++++++++++
.../evolvable/request/PipeTransferPlanNodeReq.java | 28 +-
.../airgap/IoTDBSchemaRegionAirGapSink.java | 182 ++++++-
.../thrift/sync/IoTDBSchemaRegionSink.java | 187 ++++++-
.../schema/PipeSchemaRegionSinkMetricsTest.java | 154 ++++++
.../pipe/sink/PipeDataNodeThriftRequestTest.java | 38 ++
.../db/pipe/sink/PipeSchemaRegionSinkTest.java | 386 ++++++++++++++
.../PipeSchemaRegionWritePlanEventBatchTest.java | 464 +++++++++++++++++
.../commons/pipe/sink/protocol/IoTDBSink.java | 8 +
.../iotdb/commons/service/metric/enums/Metric.java | 2 +
12 files changed, 2040 insertions(+), 37 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 10b746778e3..95884b17789 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -348,6 +348,12 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
}
}
+ public void setSchemaBatchSizeHistogram(Histogram schemaBatchSizeHistogram) {
+ if (outputPipeSink instanceof IoTDBSink) {
+ ((IoTDBSink)
outputPipeSink).setSchemaBatchSizeHistogram(schemaBatchSizeHistogram);
+ }
+ }
+
public void setTsFileBatchSizeHistogram(Histogram tsFileBatchSizeHistogram) {
if (outputPipeSink instanceof IoTDBSink) {
((IoTDBSink)
outputPipeSink).setTsFileBatchSizeHistogram(tsFileBatchSizeHistogram);
@@ -361,6 +367,13 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
}
}
+ public void setSchemaBatchTimeIntervalHistogram(Histogram
schemaBatchTimeIntervalHistogram) {
+ if (outputPipeSink instanceof IoTDBSink) {
+ ((IoTDBSink) outputPipeSink)
+
.setSchemaBatchTimeIntervalHistogram(schemaBatchTimeIntervalHistogram);
+ }
+ }
+
public void setTsFileBatchTimeIntervalHistogram(Histogram
tsFileBatchTimeIntervalHistogram) {
if (outputPipeSink instanceof IoTDBSink) {
((IoTDBSink) outputPipeSink)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java
index 17ff545251c..869b2616e60 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.i18n.DataNodePipeMessages;
import org.apache.iotdb.db.pipe.agent.task.subtask.sink.PipeSinkSubtask;
import org.apache.iotdb.metrics.AbstractMetricService;
import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Histogram;
import org.apache.iotdb.metrics.type.Rate;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.metrics.utils.MetricType;
@@ -57,6 +58,7 @@ public class PipeSchemaRegionSinkMetrics implements
IMetricSet {
private void createMetrics(final String taskID) {
createRate(taskID);
+ createHistogram(taskID);
}
private void createRate(final String taskID) {
@@ -73,6 +75,38 @@ public class PipeSchemaRegionSinkMetrics implements
IMetricSet {
String.valueOf(connector.getCreationTime())));
}
+ private void createHistogram(final String taskID) {
+ final PipeSinkSubtask connector = connectorMap.get(taskID);
+
+ final Histogram schemaBatchSizeHistogram =
+ metricService.getOrCreateHistogram(
+ Metric.PIPE_SCHEMA_BATCH_SIZE.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ connector.getAttributeSortedString(),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime()));
+ connector.setSchemaBatchSizeHistogram(schemaBatchSizeHistogram);
+
+ final Histogram schemaBatchTimeIntervalHistogram =
+ metricService.getOrCreateHistogram(
+ Metric.PIPE_SCHEMA_BATCH_TIME_COST.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ connector.getAttributeSortedString(),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime()));
+
connector.setSchemaBatchTimeIntervalHistogram(schemaBatchTimeIntervalHistogram);
+
+ final Histogram schemaBatchEventSizeHistogram =
+ metricService.getOrCreateHistogram(
+ Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ connector.getAttributeSortedString());
+ connector.setEventSizeHistogram(schemaBatchEventSizeHistogram);
+ }
+
@Override
public void unbindFrom(final AbstractMetricService metricService) {
ImmutableSet.copyOf(connectorMap.keySet()).forEach(this::deregister);
@@ -83,6 +117,7 @@ public class PipeSchemaRegionSinkMetrics implements
IMetricSet {
private void removeMetrics(final String taskID) {
removeRate(taskID);
+ removeHistogram(taskID);
}
private void removeRate(final String taskID) {
@@ -98,6 +133,29 @@ public class PipeSchemaRegionSinkMetrics implements
IMetricSet {
schemaRateMap.remove(taskID);
}
+ private void removeHistogram(final String taskID) {
+ final PipeSinkSubtask connector = connectorMap.get(taskID);
+ metricService.remove(
+ MetricType.HISTOGRAM,
+ Metric.PIPE_SCHEMA_BATCH_SIZE.toString(),
+ Tag.NAME.toString(),
+ connector.getAttributeSortedString(),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime()));
+ metricService.remove(
+ MetricType.HISTOGRAM,
+ Metric.PIPE_SCHEMA_BATCH_TIME_COST.toString(),
+ Tag.NAME.toString(),
+ connector.getAttributeSortedString(),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime()));
+ metricService.remove(
+ MetricType.HISTOGRAM,
+ Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
+ Tag.NAME.toString(),
+ connector.getAttributeSortedString());
+ }
+
//////////////////////////// Register & deregister (pipe integration)
////////////////////////////
public void register(final PipeSinkSubtask pipeSinkSubtask) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeSchemaRegionWritePlanEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeSchemaRegionWritePlanEventBatch.java
new file mode 100644
index 00000000000..77623301955
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeSchemaRegionWritePlanEventBatch.java
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.payload.evolvable.batch;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.i18n.DataNodePipeMessages;
+import
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.ActivateTemplateNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.BatchActivateTemplateNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateAlignedTimeSeriesNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateMultiTimeSeriesNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateTimeSeriesNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.InternalBatchActivateTemplateNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.InternalCreateMultiTimeSeriesNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.InternalCreateTimeSeriesNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.MeasurementGroup;
+import org.apache.iotdb.metrics.impl.DoNothingHistogram;
+import org.apache.iotdb.metrics.type.Histogram;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_BATCH_DELAY_MS_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_BATCH_DELAY_SECONDS_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_BATCH_DELAY_MS_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_BATCH_DELAY_SECONDS_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_BATCH_SIZE_KEY;
+
+public class PipeSchemaRegionWritePlanEventBatch implements AutoCloseable {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(PipeSchemaRegionWritePlanEventBatch.class);
+
+ private static final PlanNodeId EMPTY_PLAN_NODE_ID = new PlanNodeId("");
+
+ private final int maxDelayInMs;
+ private final long maxBatchSizeInBytes;
+ private final PipeMemoryBlock allocatedMemoryBlock;
+
+ private final List<EnrichedEvent> events = new ArrayList<>();
+
+ private final Map<PartialPath, Pair<Boolean, MeasurementGroup>> deviceMap =
new HashMap<>();
+ private final Map<PartialPath, Pair<Integer, Integer>> templateActivationMap
= new HashMap<>();
+
+ private BatchType batchType = BatchType.NONE;
+ private String pipeName;
+ private long creationTime;
+
+ private long firstEventProcessingTime = Long.MIN_VALUE;
+ private PlanNode cachedPlanNode;
+ private ByteBuffer cachedSerializedPlanNode;
+
+ private volatile boolean isClosed = false;
+
+ private Histogram batchSizeHistogram = new DoNothingHistogram();
+ private Histogram batchTimeIntervalHistogram = new DoNothingHistogram();
+ private Histogram eventSizeHistogram = new DoNothingHistogram();
+
+ public PipeSchemaRegionWritePlanEventBatch(final PipeParameters parameters) {
+ final Integer requestMaxDelayInMillis =
+ parameters.getIntByKeys(CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY,
SINK_IOTDB_BATCH_DELAY_MS_KEY);
+ if (Objects.isNull(requestMaxDelayInMillis)) {
+ final int requestMaxDelayConfig =
+ parameters.getIntOrDefault(
+ Arrays.asList(
+ CONNECTOR_IOTDB_BATCH_DELAY_SECONDS_KEY,
SINK_IOTDB_BATCH_DELAY_SECONDS_KEY),
+ CONNECTOR_IOTDB_BATCH_DELAY_MS_DEFAULT_VALUE);
+ maxDelayInMs = requestMaxDelayConfig < 0 ? Integer.MAX_VALUE :
requestMaxDelayConfig;
+ } else {
+ maxDelayInMs = requestMaxDelayInMillis < 0 ? Integer.MAX_VALUE :
requestMaxDelayInMillis;
+ }
+
+ maxBatchSizeInBytes =
+ parameters.getLongOrDefault(
+ Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY,
SINK_IOTDB_BATCH_SIZE_KEY),
+ CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE);
+ allocatedMemoryBlock =
PipeDataNodeResourceManager.memory().forceAllocate(maxBatchSizeInBytes);
+ }
+
+ public synchronized boolean onEvent(final PipeSchemaRegionWritePlanEvent
event) {
+ if (isClosed || !canBatch(event)) {
+ return false;
+ }
+
+ if (events.isEmpty() || !Objects.equals(events.get(events.size() - 1),
event)) {
+ if
(!event.increaseReferenceCount(PipeSchemaRegionWritePlanEventBatch.class.getName()))
{
+
LOGGER.warn(DataNodePipeMessages.CANNOT_INCREASE_REFERENCE_COUNT_FOR_EVENT_IGNORE,
event);
+ return true;
+ }
+
+ try {
+ if (Objects.isNull(pipeName)) {
+ pipeName = event.getPipeName();
+ creationTime = event.getCreationTime();
+ }
+ appendPlanNode(event.getPlanNode());
+ invalidateCachedPlanNode();
+ events.add(event);
+ } catch (final Exception e) {
+
event.decreaseReferenceCount(PipeSchemaRegionWritePlanEventBatch.class.getName(),
false);
+ throw e;
+ }
+
+ if (firstEventProcessingTime == Long.MIN_VALUE) {
+ firstEventProcessingTime = System.currentTimeMillis();
+ }
+ }
+
+ return true;
+ }
+
+ private boolean canBatch(final PipeSchemaRegionWritePlanEvent event) {
+ final BatchType eventBatchType = resolveBatchType(event.getPlanNode());
+ if (eventBatchType == BatchType.NONE ||
containsNonEmptyProps(event.getPlanNode())) {
+ return false;
+ }
+
+ if (events.isEmpty()) {
+ return true;
+ }
+
+ return Objects.equals(pipeName, event.getPipeName())
+ && creationTime == event.getCreationTime()
+ && batchType == eventBatchType
+ && !hasAlignmentConflict(event.getPlanNode());
+ }
+
+ private BatchType resolveBatchType(final PlanNode planNode) {
+ switch (planNode.getType()) {
+ case CREATE_TIME_SERIES:
+ case CREATE_ALIGNED_TIME_SERIES:
+ case CREATE_MULTI_TIME_SERIES:
+ case INTERNAL_CREATE_TIME_SERIES:
+ case INTERNAL_CREATE_MULTI_TIMESERIES:
+ return BatchType.TIMESERIES;
+ case ACTIVATE_TEMPLATE:
+ case BATCH_ACTIVATE_TEMPLATE:
+ case INTERNAL_BATCH_ACTIVATE_TEMPLATE:
+ return BatchType.TEMPLATE_ACTIVATE;
+ default:
+ return BatchType.NONE;
+ }
+ }
+
+ private boolean containsNonEmptyProps(final PlanNode planNode) {
+ switch (planNode.getType()) {
+ case CREATE_TIME_SERIES:
+ return hasNonEmptyProps(((CreateTimeSeriesNode) planNode).getProps());
+ case CREATE_MULTI_TIME_SERIES:
+ return ((CreateMultiTimeSeriesNode) planNode)
+ .getMeasurementGroupMap().values().stream()
+
.anyMatch(PipeSchemaRegionWritePlanEventBatch::hasNonEmptyProps);
+ case INTERNAL_CREATE_TIME_SERIES:
+ return hasNonEmptyProps(((InternalCreateTimeSeriesNode)
planNode).getMeasurementGroup());
+ case INTERNAL_CREATE_MULTI_TIMESERIES:
+ return ((InternalCreateMultiTimeSeriesNode) planNode)
+ .getDeviceMap().values().stream()
+ .map(Pair::getRight)
+
.anyMatch(PipeSchemaRegionWritePlanEventBatch::hasNonEmptyProps);
+ default:
+ return false;
+ }
+ }
+
+ private static boolean hasNonEmptyProps(final MeasurementGroup
measurementGroup) {
+ return Objects.nonNull(measurementGroup.getPropsList())
+ && measurementGroup.getPropsList().stream()
+ .anyMatch(PipeSchemaRegionWritePlanEventBatch::hasNonEmptyProps);
+ }
+
+ private static boolean hasNonEmptyProps(final Map<String, String> props) {
+ return Objects.nonNull(props) && !props.isEmpty();
+ }
+
+ private boolean hasAlignmentConflict(final PlanNode planNode) {
+ switch (planNode.getType()) {
+ case CREATE_TIME_SERIES:
+ return hasAlignmentConflict(
+ ((CreateTimeSeriesNode) planNode).getPath().getDevicePath(),
false);
+ case CREATE_ALIGNED_TIME_SERIES:
+ return hasAlignmentConflict(((CreateAlignedTimeSeriesNode)
planNode).getDevicePath(), true);
+ case CREATE_MULTI_TIME_SERIES:
+ return ((CreateMultiTimeSeriesNode) planNode)
+ .getMeasurementGroupMap().keySet().stream()
+ .anyMatch(devicePath -> hasAlignmentConflict(devicePath,
false));
+ case INTERNAL_CREATE_TIME_SERIES:
+ return hasAlignmentConflict(
+ ((InternalCreateTimeSeriesNode) planNode).getDevicePath(),
+ ((InternalCreateTimeSeriesNode) planNode).isAligned());
+ case INTERNAL_CREATE_MULTI_TIMESERIES:
+ return ((InternalCreateMultiTimeSeriesNode) planNode)
+ .getDeviceMap().entrySet().stream()
+ .anyMatch(
+ entry -> hasAlignmentConflict(entry.getKey(),
entry.getValue().getLeft()));
+ default:
+ return false;
+ }
+ }
+
+ private boolean hasAlignmentConflict(final PartialPath devicePath, final
boolean isAligned) {
+ final Pair<Boolean, MeasurementGroup> existing = deviceMap.get(devicePath);
+ return Objects.nonNull(existing) && !Objects.equals(existing.getLeft(),
isAligned);
+ }
+
+ private void appendPlanNode(final PlanNode planNode) {
+ if (batchType == BatchType.NONE) {
+ batchType = resolveBatchType(planNode);
+ }
+
+ switch (planNode.getType()) {
+ case CREATE_TIME_SERIES:
+ appendCreateTimeSeriesNode((CreateTimeSeriesNode) planNode);
+ break;
+ case CREATE_ALIGNED_TIME_SERIES:
+ appendCreateAlignedTimeSeriesNode((CreateAlignedTimeSeriesNode)
planNode);
+ break;
+ case CREATE_MULTI_TIME_SERIES:
+ appendCreateMultiTimeSeriesNode((CreateMultiTimeSeriesNode) planNode);
+ break;
+ case INTERNAL_CREATE_TIME_SERIES:
+ appendInternalCreateTimeSeriesNode((InternalCreateTimeSeriesNode)
planNode);
+ break;
+ case INTERNAL_CREATE_MULTI_TIMESERIES:
+
appendInternalCreateMultiTimeSeriesNode((InternalCreateMultiTimeSeriesNode)
planNode);
+ break;
+ case ACTIVATE_TEMPLATE:
+ appendActivateTemplateNode((ActivateTemplateNode) planNode);
+ break;
+ case BATCH_ACTIVATE_TEMPLATE:
+ appendBatchActivateTemplateNode((BatchActivateTemplateNode) planNode);
+ break;
+ case INTERNAL_BATCH_ACTIVATE_TEMPLATE:
+
appendInternalBatchActivateTemplateNode((InternalBatchActivateTemplateNode)
planNode);
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported schema plan node " +
planNode.getType());
+ }
+ }
+
+ private void appendCreateTimeSeriesNode(final CreateTimeSeriesNode node) {
+ appendMeasurement(
+ node.getPath().getDevicePath(),
+ false,
+ node.getPath().getMeasurement(),
+ node.getDataType(),
+ node.getEncoding(),
+ node.getCompressor(),
+ node.getAlias(),
+ node.getTags(),
+ node.getAttributes());
+ }
+
+ private void appendCreateAlignedTimeSeriesNode(final
CreateAlignedTimeSeriesNode node) {
+ for (int i = 0; i < node.getMeasurements().size(); ++i) {
+ appendMeasurement(
+ node.getDevicePath(),
+ true,
+ node.getMeasurements().get(i),
+ node.getDataTypes().get(i),
+ node.getEncodings().get(i),
+ node.getCompressors().get(i),
+ Objects.nonNull(node.getAliasList()) ? node.getAliasList().get(i) :
null,
+ Objects.nonNull(node.getTagsList()) ? node.getTagsList().get(i) :
null,
+ Objects.nonNull(node.getAttributesList()) ?
node.getAttributesList().get(i) : null);
+ }
+ }
+
+ private void appendCreateMultiTimeSeriesNode(final CreateMultiTimeSeriesNode
node) {
+ node.getMeasurementGroupMap()
+ .forEach(
+ (devicePath, measurementGroup) ->
+ appendMeasurementGroup(devicePath, false, measurementGroup));
+ }
+
+ private void appendInternalCreateTimeSeriesNode(final
InternalCreateTimeSeriesNode node) {
+ appendMeasurementGroup(node.getDevicePath(), node.isAligned(),
node.getMeasurementGroup());
+ }
+
+ private void appendInternalCreateMultiTimeSeriesNode(
+ final InternalCreateMultiTimeSeriesNode node) {
+ node.getDeviceMap()
+ .forEach(
+ (devicePath, isAlignedAndMeasurementGroup) ->
+ appendMeasurementGroup(
+ devicePath,
+ isAlignedAndMeasurementGroup.getLeft(),
+ isAlignedAndMeasurementGroup.getRight()));
+ }
+
+ private void appendMeasurementGroup(
+ final PartialPath devicePath,
+ final boolean isAligned,
+ final MeasurementGroup measurementGroup) {
+ for (int i = 0; i < measurementGroup.size(); ++i) {
+ appendMeasurement(
+ devicePath,
+ isAligned,
+ measurementGroup.getMeasurements().get(i),
+ measurementGroup.getDataTypes().get(i),
+ measurementGroup.getEncodings().get(i),
+ measurementGroup.getCompressors().get(i),
+ Objects.nonNull(measurementGroup.getAliasList())
+ ? measurementGroup.getAliasList().get(i)
+ : null,
+ Objects.nonNull(measurementGroup.getTagsList())
+ ? measurementGroup.getTagsList().get(i)
+ : null,
+ Objects.nonNull(measurementGroup.getAttributesList())
+ ? measurementGroup.getAttributesList().get(i)
+ : null);
+ }
+ }
+
+ private void appendMeasurement(
+ final PartialPath devicePath,
+ final boolean isAligned,
+ final String measurement,
+ final TSDataType dataType,
+ final TSEncoding encoding,
+ final CompressionType compressor,
+ final String alias,
+ final Map<String, String> tags,
+ final Map<String, String> attributes) {
+ final MeasurementGroup group =
+ deviceMap
+ .computeIfAbsent(devicePath, key -> new Pair<>(isAligned, new
MeasurementGroup()))
+ .getRight();
+ if (group.addMeasurement(measurement, dataType, encoding, compressor)) {
+ group.addAlias(alias);
+ group.addTags(tags);
+ group.addAttributes(attributes);
+ }
+ }
+
+ private void appendActivateTemplateNode(final ActivateTemplateNode node) {
+ templateActivationMap.putIfAbsent(
+ node.getActivatePath(), new Pair<>(node.getTemplateId(),
node.getTemplateSetLevel()));
+ }
+
+ private void appendBatchActivateTemplateNode(final BatchActivateTemplateNode
node) {
+
node.getTemplateActivationMap().forEach(templateActivationMap::putIfAbsent);
+ }
+
+ private void appendInternalBatchActivateTemplateNode(
+ final InternalBatchActivateTemplateNode node) {
+
node.getTemplateActivationMap().forEach(templateActivationMap::putIfAbsent);
+ }
+
+ public synchronized boolean shouldEmit() {
+ if (events.isEmpty() || firstEventProcessingTime == Long.MIN_VALUE) {
+ return false;
+ }
+ return getSerializedPlanNodeSize() >= maxBatchSizeInBytes
+ || System.currentTimeMillis() - firstEventProcessingTime >=
maxDelayInMs;
+ }
+
+ public synchronized void recordBatchMetrics() {
+ if (events.isEmpty() || firstEventProcessingTime == Long.MIN_VALUE) {
+ return;
+ }
+ batchTimeIntervalHistogram.update(System.currentTimeMillis() -
firstEventProcessingTime);
+ batchSizeHistogram.update(getSerializedPlanNodeSize());
+ eventSizeHistogram.update(events.size());
+ }
+
+ public synchronized PlanNode toPlanNode() {
+ if (Objects.nonNull(cachedPlanNode)) {
+ return cachedPlanNode;
+ }
+
+ switch (batchType) {
+ case TIMESERIES:
+ cachedPlanNode =
+ new InternalCreateMultiTimeSeriesNode(EMPTY_PLAN_NODE_ID, new
HashMap<>(deviceMap));
+ return cachedPlanNode;
+ case TEMPLATE_ACTIVATE:
+ cachedPlanNode =
+ new BatchActivateTemplateNode(EMPTY_PLAN_NODE_ID, new
HashMap<>(templateActivationMap));
+ return cachedPlanNode;
+ default:
+ throw new IllegalStateException("Cannot build schema batch plan node
from empty batch.");
+ }
+ }
+
+ public synchronized ByteBuffer toPlanNodeByteBuffer() {
+ return getSerializedPlanNode().duplicate();
+ }
+
+ private long getSerializedPlanNodeSize() {
+ return getSerializedPlanNode().remaining();
+ }
+
+ private ByteBuffer getSerializedPlanNode() {
+ if (Objects.isNull(cachedSerializedPlanNode)) {
+ cachedSerializedPlanNode = toPlanNode().serializeToByteBuffer();
+ }
+ return cachedSerializedPlanNode;
+ }
+
+ private void invalidateCachedPlanNode() {
+ cachedPlanNode = null;
+ cachedSerializedPlanNode = null;
+ }
+
+ public synchronized void onSuccess() {
+ events.clear();
+ deviceMap.clear();
+ templateActivationMap.clear();
+ batchType = BatchType.NONE;
+ pipeName = null;
+ creationTime = 0;
+ invalidateCachedPlanNode();
+ firstEventProcessingTime = Long.MIN_VALUE;
+ }
+
+ public synchronized void decreaseEventsReferenceCount(
+ final String holderMessage, final boolean shouldReport) {
+ events.forEach(event -> event.decreaseReferenceCount(holderMessage,
shouldReport));
+ }
+
+ public synchronized void discardEventsOfPipe(
+ final String pipeNameToDrop, final long creationTimeToDrop, final int
regionId) {
+ final boolean removed =
+ events.removeIf(
+ event -> {
+ if (pipeNameToDrop.equals(event.getPipeName())
+ && creationTimeToDrop == event.getCreationTime()
+ && regionId == event.getRegionId()) {
+
event.clearReferenceCount(PipeSchemaRegionWritePlanEventBatch.class.getName());
+ return true;
+ }
+ return false;
+ });
+ if (removed) {
+ rebuildFromEvents();
+ }
+ }
+
+ private void rebuildFromEvents() {
+ deviceMap.clear();
+ templateActivationMap.clear();
+ batchType = BatchType.NONE;
+ pipeName = null;
+ creationTime = 0;
+ invalidateCachedPlanNode();
+
+ if (events.isEmpty()) {
+ firstEventProcessingTime = Long.MIN_VALUE;
+ return;
+ }
+
+ // After a partial discard, the enqueue timestamp of the oldest remaining
event is unknown.
+ // Reset the emit window conservatively to avoid flushing immediately
because of removed events.
+ firstEventProcessingTime = System.currentTimeMillis();
+ batchType = resolveBatchType(((PipeSchemaRegionWritePlanEvent)
events.get(0)).getPlanNode());
+
+ for (final EnrichedEvent event : events) {
+ final PipeSchemaRegionWritePlanEvent schemaEvent =
(PipeSchemaRegionWritePlanEvent) event;
+ if (Objects.isNull(pipeName)) {
+ pipeName = schemaEvent.getPipeName();
+ creationTime = schemaEvent.getCreationTime();
+ }
+ appendPlanNode(schemaEvent.getPlanNode());
+ }
+ invalidateCachedPlanNode();
+ }
+
+ public synchronized boolean isEmpty() {
+ return events.isEmpty();
+ }
+
+ public synchronized int size() {
+ return events.size();
+ }
+
+ public synchronized String getPipeName() {
+ return pipeName;
+ }
+
+ public synchronized long getCreationTime() {
+ return creationTime;
+ }
+
+ public void setBatchSizeHistogram(final Histogram batchSizeHistogram) {
+ if (Objects.nonNull(batchSizeHistogram)) {
+ this.batchSizeHistogram = batchSizeHistogram;
+ }
+ }
+
+ public void setBatchTimeIntervalHistogram(final Histogram
batchTimeIntervalHistogram) {
+ if (Objects.nonNull(batchTimeIntervalHistogram)) {
+ this.batchTimeIntervalHistogram = batchTimeIntervalHistogram;
+ }
+ }
+
+ public void setEventSizeHistogram(final Histogram eventSizeHistogram) {
+ if (Objects.nonNull(eventSizeHistogram)) {
+ this.eventSizeHistogram = eventSizeHistogram;
+ }
+ }
+
+ @Override
+ public synchronized void close() {
+ isClosed = true;
+ events.forEach(
+ event ->
event.clearReferenceCount(PipeSchemaRegionWritePlanEventBatch.class.getName()));
+ events.clear();
+ deviceMap.clear();
+ templateActivationMap.clear();
+ invalidateCachedPlanNode();
+ allocatedMemoryBlock.close();
+ }
+
+ private enum BatchType {
+ NONE,
+ TIMESERIES,
+ TEMPLATE_ACTIVATE
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferPlanNodeReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferPlanNodeReq.java
index 8f7dc24e3e8..ed47df267a1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferPlanNodeReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferPlanNodeReq.java
@@ -25,12 +25,12 @@ import
org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode;
import
org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
-import org.apache.tsfile.utils.BytesUtils;
import org.apache.tsfile.utils.PublicBAOS;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Objects;
public class PipeTransferPlanNodeReq extends TPipeTransferReq {
@@ -48,13 +48,18 @@ public class PipeTransferPlanNodeReq extends
TPipeTransferReq {
/////////////////////////////// Thrift ///////////////////////////////
public static PipeTransferPlanNodeReq toTPipeTransferReq(final PlanNode
planNode) {
+ return toTPipeTransferReq(planNode, planNode.serializeToByteBuffer());
+ }
+
+ public static PipeTransferPlanNodeReq toTPipeTransferReq(
+ final PlanNode planNode, final ByteBuffer serializedPlanNode) {
final PipeTransferPlanNodeReq req = new PipeTransferPlanNodeReq();
req.planNode = planNode;
req.version = IoTDBSinkRequestVersion.VERSION_1.getVersion();
req.type = PipeRequestType.TRANSFER_PLAN_NODE.getType();
- req.body = planNode.serializeToByteBuffer();
+ req.body = serializedPlanNode.duplicate();
return req;
}
@@ -73,15 +78,30 @@ public class PipeTransferPlanNodeReq extends
TPipeTransferReq {
/////////////////////////////// Air Gap ///////////////////////////////
public static byte[] toTPipeTransferBytes(final PlanNode planNode) throws
IOException {
+ return toTPipeTransferBytes(planNode.serializeToByteBuffer());
+ }
+
+ public static byte[] toTPipeTransferBytes(final ByteBuffer
serializedPlanNode)
+ throws IOException {
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
ReadWriteIOUtils.write(IoTDBSinkRequestVersion.VERSION_1.getVersion(),
outputStream);
ReadWriteIOUtils.write(PipeRequestType.TRANSFER_PLAN_NODE.getType(),
outputStream);
- return BytesUtils.concatByteArray(
- byteArrayOutputStream.toByteArray(),
planNode.serializeToByteBuffer().array());
+ return concatBytes(byteArrayOutputStream, serializedPlanNode);
}
}
+ private static byte[] concatBytes(
+ final PublicBAOS byteArrayOutputStream, final ByteBuffer byteBuffer) {
+ final ByteBuffer duplicateBuffer = byteBuffer.duplicate();
+ final int headerSize = byteArrayOutputStream.size();
+ final int bodySize = duplicateBuffer.remaining();
+ final byte[] result = new byte[headerSize + bodySize];
+ System.arraycopy(byteArrayOutputStream.getBuf(), 0, result, 0, headerSize);
+ duplicateBuffer.get(result, headerSize, bodySize);
+ return result;
+ }
+
/////////////////////////////// Object ///////////////////////////////
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
index a67654ea9d8..dabfc1ea8a6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
@@ -21,15 +21,20 @@ package org.apache.iotdb.db.pipe.sink.protocol.airgap;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionSnapshotEvent;
import
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
+import
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeSchemaRegionWritePlanEventBatch;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferPlanNodeReq;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferSchemaSnapshotPieceReq;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferSchemaSnapshotSealReq;
+import org.apache.iotdb.metrics.type.Histogram;
import org.apache.iotdb.pipe.api.annotation.TableModel;
import org.apache.iotdb.pipe.api.annotation.TreeModel;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
@@ -42,6 +47,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Objects;
@TreeModel
@@ -50,6 +56,19 @@ public class IoTDBSchemaRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBSchemaRegionAirGapSink.class);
+ private PipeSchemaRegionWritePlanEventBatch schemaRegionWritePlanEventBatch;
+
+ @Override
+ public void customize(
+ final PipeParameters parameters, final PipeConnectorRuntimeConfiguration
configuration)
+ throws Exception {
+ super.customize(parameters, configuration);
+
+ if (isTabletBatchModeEnabled) {
+ schemaRegionWritePlanEventBatch = new
PipeSchemaRegionWritePlanEventBatch(parameters);
+ }
+ }
+
@Override
public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws
Exception {
throw new UnsupportedOperationException(
@@ -69,14 +88,22 @@ public class IoTDBSchemaRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
try {
if (event instanceof PipeSchemaRegionWritePlanEvent) {
- doTransferWrapper(socket, (PipeSchemaRegionWritePlanEvent) event);
+ if (isTabletBatchModeEnabled &&
Objects.nonNull(schemaRegionWritePlanEventBatch)) {
+ doTransferWithBatch(socket, (PipeSchemaRegionWritePlanEvent) event);
+ } else {
+ doTransferWrapper(socket, (PipeSchemaRegionWritePlanEvent) event);
+ }
} else if (event instanceof PipeSchemaRegionSnapshotEvent) {
+ flushBatchedEventsIfNecessary(socket);
doTransferWrapper(socket, (PipeSchemaRegionSnapshotEvent) event);
- } else if (!(event instanceof PipeHeartbeatEvent)) {
- LOGGER.warn(
- DataNodePipeMessages
-
.IOTDBSCHEMAREGIONAIRGAPSINK_DOES_NOT_SUPPORT_TRANSFERRING_GENERIC_EVENT,
- event);
+ } else {
+ flushBatchedEventsIfNecessary(socket);
+ if (!(event instanceof PipeHeartbeatEvent)) {
+ LOGGER.warn(
+ DataNodePipeMessages
+
.IOTDBSCHEMAREGIONAIRGAPSINK_DOES_NOT_SUPPORT_TRANSFERRING_GENERIC_EVENT,
+ event);
+ }
}
} catch (final IOException e) {
isSocketAlive.set(socketIndex, false);
@@ -89,6 +116,58 @@ public class IoTDBSchemaRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
}
}
+ private void doTransferWithBatch(
+ final AirGapSocket socket, final PipeSchemaRegionWritePlanEvent event)
+ throws PipeException, IOException {
+ if (tryTransferInBatch(socket, event)) {
+ return;
+ }
+
+ doTransferWrapper(socket, event);
+ }
+
+ private boolean tryTransferInBatch(
+ final AirGapSocket socket, final PipeSchemaRegionWritePlanEvent event)
+ throws PipeException, IOException {
+ if (tryAppendToBatchAndFlushIfNecessary(socket, event)) {
+ return true;
+ }
+
+ if (schemaRegionWritePlanEventBatch.isEmpty()) {
+ return false;
+ }
+
+ flushBatchedEventsIfNecessary(socket);
+ return tryAppendToBatchAndFlushIfNecessary(socket, event);
+ }
+
+ private boolean tryAppendToBatchAndFlushIfNecessary(
+ final AirGapSocket socket, final PipeSchemaRegionWritePlanEvent event)
+ throws PipeException, IOException {
+ if (!schemaRegionWritePlanEventBatch.onEvent(event)) {
+ return false;
+ }
+
+ if (schemaRegionWritePlanEventBatch.shouldEmit()) {
+ flushBatchedEventsIfNecessary(socket);
+ }
+ return true;
+ }
+
+ private void flushBatchedEventsIfNecessary(final AirGapSocket socket)
+ throws PipeException, IOException {
+ if (Objects.isNull(schemaRegionWritePlanEventBatch)
+ || schemaRegionWritePlanEventBatch.isEmpty()) {
+ return;
+ }
+
+ schemaRegionWritePlanEventBatch.recordBatchMetrics();
+ doTransfer(socket, schemaRegionWritePlanEventBatch);
+ schemaRegionWritePlanEventBatch.decreaseEventsReferenceCount(
+ IoTDBSchemaRegionAirGapSink.class.getName(), true);
+ schemaRegionWritePlanEventBatch.onSuccess();
+ }
+
private void doTransferWrapper(
final AirGapSocket socket,
final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent)
@@ -110,21 +189,60 @@ public class IoTDBSchemaRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
final AirGapSocket socket,
final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent)
throws PipeException, IOException {
- if (!send(
+ doTransfer(
+ socket,
+ pipeSchemaRegionWritePlanEvent.getPlanNode(),
pipeSchemaRegionWritePlanEvent.getPipeName(),
pipeSchemaRegionWritePlanEvent.getCreationTime(),
+ pipeSchemaRegionWritePlanEvent.toString());
+ }
+
+ private void doTransfer(
+ final AirGapSocket socket, final PipeSchemaRegionWritePlanEventBatch
batch)
+ throws PipeException, IOException {
+ final PlanNode planNode = batch.toPlanNode();
+ doTransfer(
+ socket,
+ planNode,
+ batch.toPlanNodeByteBuffer(),
+ batch.getPipeName(),
+ batch.getCreationTime(),
+ planNode.toString());
+ }
+
+ private void doTransfer(
+ final AirGapSocket socket,
+ final PlanNode planNode,
+ final String pipeName,
+ final long creationTime,
+ final String eventDescription)
+ throws PipeException, IOException {
+ doTransfer(socket, planNode, null, pipeName, creationTime,
eventDescription);
+ }
+
+ private void doTransfer(
+ final AirGapSocket socket,
+ final PlanNode planNode,
+ final ByteBuffer serializedPlanNode,
+ final String pipeName,
+ final long creationTime,
+ final String eventDescription)
+ throws PipeException, IOException {
+ if (!send(
+ pipeName,
+ creationTime,
socket,
- PipeTransferPlanNodeReq.toTPipeTransferBytes(
- pipeSchemaRegionWritePlanEvent.getPlanNode()))) {
+ Objects.nonNull(serializedPlanNode)
+ ? PipeTransferPlanNodeReq.toTPipeTransferBytes(serializedPlanNode)
+ : PipeTransferPlanNodeReq.toTPipeTransferBytes(planNode))) {
final String errorMessage =
String.format(
- "Transfer data node write plan %s error. Socket: %s.",
- pipeSchemaRegionWritePlanEvent.getPlanNode().getType(), socket);
+ "Transfer data node write plan %s error. Socket: %s.",
planNode.getType(), socket);
receiverStatusHandler.handle(
new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(errorMessage),
errorMessage,
- pipeSchemaRegionWritePlanEvent.toString(),
+ eventDescription,
true);
}
}
@@ -218,4 +336,44 @@ public class IoTDBSchemaRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
final String fileName, final long position, final byte[] payLoad) throws
IOException {
return PipeTransferSchemaSnapshotPieceReq.toTPipeTransferBytes(fileName,
position, payLoad);
}
+
+ @Override
+ public synchronized void discardEventsOfPipe(
+ final String pipeNameToDrop, final long creationTimeToDrop, final int
regionId) {
+ if (Objects.nonNull(schemaRegionWritePlanEventBatch)) {
+ schemaRegionWritePlanEventBatch.discardEventsOfPipe(
+ pipeNameToDrop, creationTimeToDrop, regionId);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (Objects.nonNull(schemaRegionWritePlanEventBatch)) {
+ schemaRegionWritePlanEventBatch.close();
+ }
+ super.close();
+ }
+
+ @Override
+ public void setSchemaBatchSizeHistogram(final Histogram
schemaBatchSizeHistogram) {
+ if (Objects.nonNull(schemaRegionWritePlanEventBatch)) {
+
schemaRegionWritePlanEventBatch.setBatchSizeHistogram(schemaBatchSizeHistogram);
+ }
+ }
+
+ @Override
+ public void setSchemaBatchTimeIntervalHistogram(
+ final Histogram schemaBatchTimeIntervalHistogram) {
+ if (Objects.nonNull(schemaRegionWritePlanEventBatch)) {
+ schemaRegionWritePlanEventBatch.setBatchTimeIntervalHistogram(
+ schemaBatchTimeIntervalHistogram);
+ }
+ }
+
+ @Override
+ public void setBatchEventSizeHistogram(final Histogram eventSizeHistogram) {
+ if (Objects.nonNull(schemaRegionWritePlanEventBatch)) {
+
schemaRegionWritePlanEventBatch.setEventSizeHistogram(eventSizeHistogram);
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java
index aa84702e6f1..6df7a470262 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java
@@ -22,15 +22,20 @@ package org.apache.iotdb.db.pipe.sink.protocol.thrift.sync;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFilePieceReq;
+import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionSnapshotEvent;
import
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
+import
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeSchemaRegionWritePlanEventBatch;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferPlanNodeReq;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferSchemaSnapshotPieceReq;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferSchemaSnapshotSealReq;
+import org.apache.iotdb.metrics.type.Histogram;
import org.apache.iotdb.pipe.api.annotation.TableModel;
import org.apache.iotdb.pipe.api.annotation.TreeModel;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
@@ -46,6 +51,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Objects;
@@ -55,6 +61,19 @@ public class IoTDBSchemaRegionSink extends
IoTDBDataNodeSyncSink {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBSchemaRegionSink.class);
+ private PipeSchemaRegionWritePlanEventBatch schemaRegionWritePlanEventBatch;
+
+ @Override
+ public void customize(
+ final PipeParameters parameters, final PipeConnectorRuntimeConfiguration
configuration)
+ throws Exception {
+ super.customize(parameters, configuration);
+
+ if (isTabletBatchModeEnabled) {
+ schemaRegionWritePlanEventBatch = new
PipeSchemaRegionWritePlanEventBatch(parameters);
+ }
+ }
+
@Override
public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws
Exception {
throw new UnsupportedOperationException(
@@ -70,15 +89,71 @@ public class IoTDBSchemaRegionSink extends
IoTDBDataNodeSyncSink {
@Override
public void transfer(final Event event) throws Exception {
if (event instanceof PipeSchemaRegionWritePlanEvent) {
- doTransferWrapper((PipeSchemaRegionWritePlanEvent) event);
+ if (isTabletBatchModeEnabled &&
Objects.nonNull(schemaRegionWritePlanEventBatch)) {
+ doTransferWithBatch((PipeSchemaRegionWritePlanEvent) event);
+ } else {
+ doTransferWrapper((PipeSchemaRegionWritePlanEvent) event);
+ }
} else if (event instanceof PipeSchemaRegionSnapshotEvent) {
+ flushBatchedEventsIfNecessary();
doTransferWrapper((PipeSchemaRegionSnapshotEvent) event);
- } else if (!(event instanceof PipeHeartbeatEvent)) {
- LOGGER.warn(
- DataNodePipeMessages
-
.IOTDBSCHEMAREGIONCONNECTOR_DOES_NOT_SUPPORT_TRANSFERRING_GENERIC_EVENT,
- event);
+ } else {
+ flushBatchedEventsIfNecessary();
+ if (!(event instanceof PipeHeartbeatEvent)) {
+ LOGGER.warn(
+ DataNodePipeMessages
+
.IOTDBSCHEMAREGIONCONNECTOR_DOES_NOT_SUPPORT_TRANSFERRING_GENERIC_EVENT,
+ event);
+ }
+ }
+ }
+
+ private void doTransferWithBatch(final PipeSchemaRegionWritePlanEvent event)
+ throws PipeException {
+ if (tryTransferInBatch(event)) {
+ return;
+ }
+
+ doTransferWrapper(event);
+ }
+
+ private boolean tryTransferInBatch(final PipeSchemaRegionWritePlanEvent
event)
+ throws PipeException {
+ if (tryAppendToBatchAndFlushIfNecessary(event)) {
+ return true;
+ }
+
+ if (schemaRegionWritePlanEventBatch.isEmpty()) {
+ return false;
}
+
+ flushBatchedEventsIfNecessary();
+ return tryAppendToBatchAndFlushIfNecessary(event);
+ }
+
+ private boolean tryAppendToBatchAndFlushIfNecessary(final
PipeSchemaRegionWritePlanEvent event)
+ throws PipeException {
+ if (!schemaRegionWritePlanEventBatch.onEvent(event)) {
+ return false;
+ }
+
+ if (schemaRegionWritePlanEventBatch.shouldEmit()) {
+ flushBatchedEventsIfNecessary();
+ }
+ return true;
+ }
+
+ private void flushBatchedEventsIfNecessary() throws PipeException {
+ if (Objects.isNull(schemaRegionWritePlanEventBatch)
+ || schemaRegionWritePlanEventBatch.isEmpty()) {
+ return;
+ }
+
+ schemaRegionWritePlanEventBatch.recordBatchMetrics();
+ doTransfer(schemaRegionWritePlanEventBatch);
+ schemaRegionWritePlanEventBatch.decreaseEventsReferenceCount(
+ IoTDBSchemaRegionSink.class.getName(), true);
+ schemaRegionWritePlanEventBatch.onSuccess();
}
private void doTransferWrapper(
@@ -98,44 +173,74 @@ public class IoTDBSchemaRegionSink extends
IoTDBDataNodeSyncSink {
private void doTransfer(final PipeSchemaRegionWritePlanEvent
pipeSchemaRegionWritePlanEvent)
throws PipeException {
+ doTransfer(
+ pipeSchemaRegionWritePlanEvent.getPlanNode(),
+ pipeSchemaRegionWritePlanEvent.getPipeName(),
+ pipeSchemaRegionWritePlanEvent.getCreationTime(),
+ pipeSchemaRegionWritePlanEvent.getPlanNode().toString());
+ LOGGER.info(
+ DataNodePipeMessages.SUCCESSFULLY_TRANSFERRED_SCHEMA_EVENT,
pipeSchemaRegionWritePlanEvent);
+ }
+
+ private void doTransfer(final PipeSchemaRegionWritePlanEventBatch batch)
throws PipeException {
+ final PlanNode planNode = batch.toPlanNode();
+ doTransfer(
+ planNode,
+ batch.toPlanNodeByteBuffer(),
+ batch.getPipeName(),
+ batch.getCreationTime(),
+ planNode.toString());
+ LOGGER.info("Successfully transferred batched schema events, batch size
{}.", batch.size());
+ }
+
+ private void doTransfer(
+ final PlanNode planNode,
+ final String pipeName,
+ final long creationTime,
+ final String eventDescription)
+ throws PipeException {
+ doTransfer(planNode, null, pipeName, creationTime, eventDescription);
+ }
+
+ private void doTransfer(
+ final PlanNode planNode,
+ final ByteBuffer serializedPlanNode,
+ final String pipeName,
+ final long creationTime,
+ final String eventDescription)
+ throws PipeException {
final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
clientManager.getClient();
final TPipeTransferResp resp;
try {
final TPipeTransferReq req =
compressIfNeeded(
- PipeTransferPlanNodeReq.toTPipeTransferReq(
- pipeSchemaRegionWritePlanEvent.getPlanNode()));
+ Objects.nonNull(serializedPlanNode)
+ ? PipeTransferPlanNodeReq.toTPipeTransferReq(planNode,
serializedPlanNode)
+ : PipeTransferPlanNodeReq.toTPipeTransferReq(planNode));
rateLimitIfNeeded(
- pipeSchemaRegionWritePlanEvent.getPipeName(),
- pipeSchemaRegionWritePlanEvent.getCreationTime(),
- clientAndStatus.getLeft().getEndPoint(),
- req.getBody().length);
+ pipeName, creationTime, clientAndStatus.getLeft().getEndPoint(),
req.getBody().length);
resp = clientAndStatus.getLeft().pipeTransfer(req);
} catch (final Exception e) {
clientAndStatus.setRight(false);
throw new PipeConnectionException(
String.format(
"Network error when transfer schema region write plan %s,
because %s.",
- pipeSchemaRegionWritePlanEvent.getPlanNode().getType(),
e.getMessage()),
+ planNode.getType(), e.getMessage()),
e);
}
final TSStatus status = resp.getStatus();
- // Only handle the failed statuses to avoid string format performance
overhead
- if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()
- && resp.getStatus().getCode() !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && status.getCode() !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
receiverStatusHandler.handle(
status,
String.format(
"Transfer data node write plan %s error, result status %s.",
- pipeSchemaRegionWritePlanEvent.getPlanNode().getType(), status),
- pipeSchemaRegionWritePlanEvent.getPlanNode().toString(),
+ planNode.getType(), status),
+ eventDescription,
true);
}
-
- LOGGER.info(
- DataNodePipeMessages.SUCCESSFULLY_TRANSFERRED_SCHEMA_EVENT,
pipeSchemaRegionWritePlanEvent);
}
private void doTransferWrapper(final PipeSchemaRegionSnapshotEvent
pipeSchemaRegionSnapshotEvent)
@@ -249,6 +354,46 @@ public class IoTDBSchemaRegionSink extends
IoTDBDataNodeSyncSink {
return PipeTransferSchemaSnapshotPieceReq.toTPipeTransferReq(fileName,
position, payLoad);
}
+ @Override
+ public synchronized void discardEventsOfPipe(
+ final String pipeNameToDrop, final long creationTimeToDrop, final int
regionId) {
+ if (Objects.nonNull(schemaRegionWritePlanEventBatch)) {
+ schemaRegionWritePlanEventBatch.discardEventsOfPipe(
+ pipeNameToDrop, creationTimeToDrop, regionId);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (Objects.nonNull(schemaRegionWritePlanEventBatch)) {
+ schemaRegionWritePlanEventBatch.close();
+ }
+ super.close();
+ }
+
+ @Override
+ public void setSchemaBatchSizeHistogram(final Histogram
schemaBatchSizeHistogram) {
+ if (Objects.nonNull(schemaRegionWritePlanEventBatch)) {
+
schemaRegionWritePlanEventBatch.setBatchSizeHistogram(schemaBatchSizeHistogram);
+ }
+ }
+
+ @Override
+ public void setSchemaBatchTimeIntervalHistogram(
+ final Histogram schemaBatchTimeIntervalHistogram) {
+ if (Objects.nonNull(schemaRegionWritePlanEventBatch)) {
+ schemaRegionWritePlanEventBatch.setBatchTimeIntervalHistogram(
+ schemaBatchTimeIntervalHistogram);
+ }
+ }
+
+ @Override
+ public void setBatchEventSizeHistogram(final Histogram eventSizeHistogram) {
+ if (Objects.nonNull(schemaRegionWritePlanEventBatch)) {
+
schemaRegionWritePlanEventBatch.setEventSizeHistogram(eventSizeHistogram);
+ }
+ }
+
@Override
protected void mayLimitRateAndRecordIO(final long requiredBytes) {
// Do nothing
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetricsTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetricsTest.java
new file mode 100644
index 00000000000..744e640189c
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetricsTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.metric.schema;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.pipe.agent.task.subtask.sink.PipeSinkSubtask;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.type.Histogram;
+import org.apache.iotdb.metrics.type.Rate;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class PipeSchemaRegionSinkMetricsTest {
+
+ @Test
+ public void testRegisterAndDeregisterCreateAndRemoveHistograms() throws
Exception {
+ final String taskId = "schema-task-" + System.nanoTime();
+ boolean deregistered = false;
+ final AbstractMetricService metricService =
Mockito.mock(AbstractMetricService.class);
+ final PipeSinkSubtask subtask = Mockito.mock(PipeSinkSubtask.class);
+ final Rate rate = Mockito.mock(Rate.class);
+ final Histogram batchSizeHistogram = Mockito.mock(Histogram.class);
+ final Histogram batchTimeHistogram = Mockito.mock(Histogram.class);
+ final Histogram eventSizeHistogram = Mockito.mock(Histogram.class);
+
+ when(subtask.getTaskID()).thenReturn(taskId);
+ when(subtask.getAttributeSortedString()).thenReturn("schema_test");
+ when(subtask.getCreationTime()).thenReturn(1L);
+ when(metricService.getOrCreateRate(
+ eq(Metric.PIPE_CONNECTOR_SCHEMA_TRANSFER.toString()),
+ eq(MetricLevel.IMPORTANT),
+ eq(Tag.NAME.toString()),
+ eq("schema_test"),
+ eq(Tag.CREATION_TIME.toString()),
+ eq("1")))
+ .thenReturn(rate);
+ when(metricService.getOrCreateHistogram(
+ eq(Metric.PIPE_SCHEMA_BATCH_SIZE.toString()),
+ eq(MetricLevel.IMPORTANT),
+ eq(Tag.NAME.toString()),
+ eq("schema_test"),
+ eq(Tag.CREATION_TIME.toString()),
+ eq("1")))
+ .thenReturn(batchSizeHistogram);
+ when(metricService.getOrCreateHistogram(
+ eq(Metric.PIPE_SCHEMA_BATCH_TIME_COST.toString()),
+ eq(MetricLevel.IMPORTANT),
+ eq(Tag.NAME.toString()),
+ eq("schema_test"),
+ eq(Tag.CREATION_TIME.toString()),
+ eq("1")))
+ .thenReturn(batchTimeHistogram);
+ when(metricService.getOrCreateHistogram(
+ eq(Metric.PIPE_CONNECTOR_BATCH_SIZE.toString()),
+ eq(MetricLevel.IMPORTANT),
+ eq(Tag.NAME.toString()),
+ eq("schema_test")))
+ .thenReturn(eventSizeHistogram);
+
+ final PipeSchemaRegionSinkMetrics metrics =
PipeSchemaRegionSinkMetrics.getInstance();
+
+ final Field metricServiceField =
+ PipeSchemaRegionSinkMetrics.class.getDeclaredField("metricService");
+ metricServiceField.setAccessible(true);
+ final Field connectorMapField =
+ PipeSchemaRegionSinkMetrics.class.getDeclaredField("connectorMap");
+ connectorMapField.setAccessible(true);
+ final Field schemaRateMapField =
+ PipeSchemaRegionSinkMetrics.class.getDeclaredField("schemaRateMap");
+ schemaRateMapField.setAccessible(true);
+
+ ((Map<?, ?>) connectorMapField.get(metrics)).clear();
+ ((Map<?, ?>) schemaRateMapField.get(metrics)).clear();
+ metricServiceField.set(metrics, null);
+
+ try {
+ metrics.register(subtask);
+ metrics.bindTo(metricService);
+
+ verify(subtask).setSchemaBatchSizeHistogram(batchSizeHistogram);
+ verify(subtask).setSchemaBatchTimeIntervalHistogram(batchTimeHistogram);
+ verify(subtask).setEventSizeHistogram(eventSizeHistogram);
+
+ metrics.deregister(taskId);
+
+ verify(metricService)
+ .remove(
+ MetricType.HISTOGRAM,
+ Metric.PIPE_SCHEMA_BATCH_SIZE.toString(),
+ Tag.NAME.toString(),
+ "schema_test",
+ Tag.CREATION_TIME.toString(),
+ "1");
+ verify(metricService)
+ .remove(
+ MetricType.HISTOGRAM,
+ Metric.PIPE_SCHEMA_BATCH_TIME_COST.toString(),
+ Tag.NAME.toString(),
+ "schema_test",
+ Tag.CREATION_TIME.toString(),
+ "1");
+ verify(metricService)
+ .remove(
+ MetricType.HISTOGRAM,
+ Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
+ Tag.NAME.toString(),
+ "schema_test");
+ verify(metricService)
+ .remove(
+ MetricType.RATE,
+ Metric.PIPE_CONNECTOR_SCHEMA_TRANSFER.toString(),
+ Tag.NAME.toString(),
+ "schema_test",
+ Tag.CREATION_TIME.toString(),
+ "1");
+ deregistered = true;
+ } finally {
+ if (!deregistered) {
+ metrics.deregister(taskId);
+ }
+ ((Map<?, ?>) connectorMapField.get(metrics)).clear();
+ ((Map<?, ?>) schemaRateMapField.get(metrics)).clear();
+ metricServiceField.set(metrics, null);
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
index 7641070800f..ab08dc6b96e 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
@@ -26,6 +26,7 @@ import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV2;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.response.PipeTransferFilePieceResp;
import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
+import
org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
import
org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.CombineRequest;
@@ -395,6 +396,43 @@ public class PipeDataNodeThriftRequestTest {
Assert.assertEquals(req.getPlanNode(), deserializeReq.getPlanNode());
}
+ @Test
+ public void testPipeTransferPlanNodeReqBytesWithPartialDirectByteBuffer()
throws IOException {
+ final CreateAlignedTimeSeriesNode node =
+ new CreateAlignedTimeSeriesNode(
+ new PlanNodeId(""),
+ new PartialPath(new String[] {"root", "sg", "d"}),
+ Collections.singletonList("s"),
+ Collections.singletonList(TSDataType.INT32),
+ Collections.singletonList(TSEncoding.PLAIN),
+ Collections.singletonList(CompressionType.UNCOMPRESSED),
+ null,
+ null,
+ null);
+ final byte[] serializedPlanNodeBytes =
byteBufferToByteArray(node.serializeToByteBuffer());
+ final ByteBuffer serializedPlanNode =
+ ByteBuffer.allocateDirect(serializedPlanNodeBytes.length + 2);
+ serializedPlanNode.put((byte) 0);
+ serializedPlanNode.put(serializedPlanNodeBytes);
+ serializedPlanNode.put((byte) 1);
+ serializedPlanNode.flip();
+ serializedPlanNode.position(1);
+ serializedPlanNode.limit(1 + serializedPlanNodeBytes.length);
+
+ final byte[] transferBytes =
PipeTransferPlanNodeReq.toTPipeTransferBytes(serializedPlanNode);
+
+ Assert.assertEquals(1, serializedPlanNode.position());
+ Assert.assertEquals(1 + serializedPlanNodeBytes.length,
serializedPlanNode.limit());
+ final ByteBuffer transferBuffer = ByteBuffer.wrap(transferBytes);
+ Assert.assertEquals(
+ IoTDBSinkRequestVersion.VERSION_1.getVersion(),
ReadWriteIOUtils.readByte(transferBuffer));
+ Assert.assertEquals(
+ PipeRequestType.TRANSFER_PLAN_NODE.getType(),
ReadWriteIOUtils.readShort(transferBuffer));
+ Assert.assertEquals(node, PlanNodeType.deserialize(transferBuffer));
+ Assert.assertEquals(0, ReadWriteIOUtils.readInt(transferBuffer));
+ Assert.assertFalse(transferBuffer.hasRemaining());
+ }
+
@Test
public void testPipeTransferPlanNodeReqFromLegacyV13SchemaPlanBody() {
final CreateAlignedTimeSeriesNode node =
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSchemaRegionSinkTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSchemaRegionSinkTest.java
new file mode 100644
index 00000000000..586b13b7667
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSchemaRegionSinkTest.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
+import org.apache.iotdb.db.pipe.sink.client.IoTDBDataNodeSyncClientManager;
+import
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeSchemaRegionWritePlanEventBatch;
+import
org.apache.iotdb.db.pipe.sink.protocol.airgap.IoTDBSchemaRegionAirGapSink;
+import
org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBSchemaRegionSink;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateTimeSeriesNode;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.utils.Pair;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class PipeSchemaRegionSinkTest {
+
+ @Test
+ public void testSyncSinkFlushesBatchedEventsOnHeartbeat() throws Exception {
+ final TestIoTDBSchemaRegionSyncSink sink = new
TestIoTDBSchemaRegionSyncSink();
+ try {
+ final IoTDBDataNodeSyncClientManager clientManager =
+ Mockito.mock(IoTDBDataNodeSyncClientManager.class);
+ final org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient client =
+
Mockito.mock(org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient.class);
+ when(clientManager.getClient()).thenAnswer(invocation -> new
Pair<>(client, true));
+ when(client.getEndPoint()).thenReturn(new TEndPoint("127.0.0.1", 6667));
+
when(client.pipeTransfer(any(TPipeTransferReq.class))).thenReturn(createSuccessResp());
+
+ setField(sink, "clientManager", clientManager);
+ enableBatching(sink);
+
+ final PipeSchemaRegionWritePlanEvent event =
+ createBatchableEvent("root.db.d1.s1", "pipeA", 1L);
+
+ sink.transfer(event);
+ Assert.assertFalse(event.isReleased());
+ verify(client, never()).pipeTransfer(any(TPipeTransferReq.class));
+
+ sink.transfer(new PipeHeartbeatEvent(-1, false));
+
+ verify(client, times(1)).pipeTransfer(any(TPipeTransferReq.class));
+ Assert.assertTrue(event.isReleased());
+ Assert.assertTrue(getBatch(sink).isEmpty());
+ } finally {
+ sink.close();
+ }
+ }
+
+ @Test
+ public void testSyncSinkFlushesBufferedEventsBeforeStandaloneTransfer()
throws Exception {
+ final TestIoTDBSchemaRegionSyncSink sink = new
TestIoTDBSchemaRegionSyncSink();
+ try {
+ final IoTDBDataNodeSyncClientManager clientManager =
+ Mockito.mock(IoTDBDataNodeSyncClientManager.class);
+ final org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient client =
+
Mockito.mock(org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient.class);
+ when(clientManager.getClient()).thenAnswer(invocation -> new
Pair<>(client, true));
+ when(client.getEndPoint()).thenReturn(new TEndPoint("127.0.0.1", 6667));
+
when(client.pipeTransfer(any(TPipeTransferReq.class))).thenReturn(createSuccessResp());
+
+ setField(sink, "clientManager", clientManager);
+ enableBatching(sink);
+
+ final PipeSchemaRegionWritePlanEvent batchedEvent =
+ createBatchableEvent("root.db.d1.s1", "pipeA", 1L);
+ final PipeSchemaRegionWritePlanEvent standaloneEvent =
+ createNonBatchableEvent("root.db.d2.s1", "pipeA", 1L);
+
+ sink.transfer(batchedEvent);
+ sink.transfer(standaloneEvent);
+
+ verify(client, times(2)).pipeTransfer(any(TPipeTransferReq.class));
+ Assert.assertTrue(batchedEvent.isReleased());
+ Assert.assertTrue(standaloneEvent.isReleased());
+ Assert.assertTrue(getBatch(sink).isEmpty());
+ } finally {
+ sink.close();
+ }
+ }
+
+ @Test
+ public void testSyncSinkRetriesBatchingAfterFlushingIncompatibleBatch()
throws Exception {
+ final TestIoTDBSchemaRegionSyncSink sink = new
TestIoTDBSchemaRegionSyncSink();
+ try {
+ final IoTDBDataNodeSyncClientManager clientManager =
+ Mockito.mock(IoTDBDataNodeSyncClientManager.class);
+ final org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient client =
+
Mockito.mock(org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient.class);
+ when(clientManager.getClient()).thenAnswer(invocation -> new
Pair<>(client, true));
+ when(client.getEndPoint()).thenReturn(new TEndPoint("127.0.0.1", 6667));
+
when(client.pipeTransfer(any(TPipeTransferReq.class))).thenReturn(createSuccessResp());
+
+ setField(sink, "clientManager", clientManager);
+ enableBatching(sink);
+
+ final PipeSchemaRegionWritePlanEvent firstEvent =
+ createBatchableEvent("root.db.d1.s1", "pipeA", 1L);
+ final PipeSchemaRegionWritePlanEvent secondEvent =
+ createBatchableEvent("root.db.d2.s1", "pipeB", 1L);
+
+ sink.transfer(firstEvent);
+ sink.transfer(secondEvent);
+
+ verify(client, times(1)).pipeTransfer(any(TPipeTransferReq.class));
+ Assert.assertTrue(firstEvent.isReleased());
+ Assert.assertFalse(secondEvent.isReleased());
+ Assert.assertEquals(1, getBatch(sink).size());
+ Assert.assertEquals("pipeB", getBatch(sink).getPipeName());
+
+ sink.transfer(new PipeHeartbeatEvent(-1, false));
+
+ verify(client, times(2)).pipeTransfer(any(TPipeTransferReq.class));
+ Assert.assertTrue(secondEvent.isReleased());
+ Assert.assertTrue(getBatch(sink).isEmpty());
+ } finally {
+ sink.close();
+ }
+ }
+
+ @Test
+ public void testAirGapSinkFlushesBatchedEventsOnHeartbeat() throws Exception
{
+ final TestIoTDBSchemaRegionAirGapSink sink = new
TestIoTDBSchemaRegionAirGapSink();
+ try {
+ enableBatching(sink);
+
+ final PipeSchemaRegionWritePlanEvent event =
+ createBatchableEvent("root.db.d1.s1", "pipeA", 1L);
+
+ sink.transfer(event);
+ Assert.assertFalse(event.isReleased());
+ Assert.assertEquals(0, sink.getSendCount());
+
+ sink.transfer(new PipeHeartbeatEvent(-1, false));
+
+ Assert.assertEquals(1, sink.getSendCount());
+ Assert.assertTrue(event.isReleased());
+ Assert.assertTrue(getBatch(sink).isEmpty());
+ } finally {
+ sink.close();
+ }
+ }
+
+ @Test
+ public void testAirGapSinkRetriesBatchingAfterFlushingIncompatibleBatch()
throws Exception {
+ final TestIoTDBSchemaRegionAirGapSink sink = new
TestIoTDBSchemaRegionAirGapSink();
+ try {
+ enableBatching(sink);
+
+ final PipeSchemaRegionWritePlanEvent firstEvent =
+ createBatchableEvent("root.db.d1.s1", "pipeA", 1L);
+ final PipeSchemaRegionWritePlanEvent secondEvent =
+ createBatchableEvent("root.db.d2.s1", "pipeB", 1L);
+
+ sink.transfer(firstEvent);
+ sink.transfer(secondEvent);
+
+ Assert.assertEquals(1, sink.getSendCount());
+ Assert.assertTrue(firstEvent.isReleased());
+ Assert.assertFalse(secondEvent.isReleased());
+ Assert.assertEquals(1, getBatch(sink).size());
+ Assert.assertEquals("pipeB", getBatch(sink).getPipeName());
+
+ sink.transfer(new PipeHeartbeatEvent(-1, false));
+
+ Assert.assertEquals(2, sink.getSendCount());
+ Assert.assertTrue(secondEvent.isReleased());
+ Assert.assertTrue(getBatch(sink).isEmpty());
+ } finally {
+ sink.close();
+ }
+ }
+
+ @Test
+ public void testAirGapSinkFlushesBufferedEventsBeforeStandaloneTransfer()
throws Exception {
+ final TestIoTDBSchemaRegionAirGapSink sink = new
TestIoTDBSchemaRegionAirGapSink();
+ try {
+ enableBatching(sink);
+
+ final PipeSchemaRegionWritePlanEvent batchedEvent =
+ createBatchableEvent("root.db.d1.s1", "pipeA", 1L);
+ final PipeSchemaRegionWritePlanEvent standaloneEvent =
+ createNonBatchableEvent("root.db.d2.s1", "pipeA", 1L);
+
+ sink.transfer(batchedEvent);
+ sink.transfer(standaloneEvent);
+
+ Assert.assertEquals(2, sink.getSendCount());
+ Assert.assertTrue(batchedEvent.isReleased());
+ Assert.assertTrue(standaloneEvent.isReleased());
+ Assert.assertTrue(getBatch(sink).isEmpty());
+ } finally {
+ sink.close();
+ }
+ }
+
+ private PipeParameters createParameters() {
+ return new PipeParameters(
+ new HashMap<String, String>() {
+ {
+ put(CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY, "100000");
+ put(CONNECTOR_IOTDB_BATCH_SIZE_KEY, "1048576");
+ }
+ });
+ }
+
+ private void enableBatching(final Object sink) throws Exception {
+ setField(sink, "isTabletBatchModeEnabled", true);
+ setField(
+ sink,
+ "schemaRegionWritePlanEventBatch",
+ new PipeSchemaRegionWritePlanEventBatch(createParameters()));
+ }
+
+ private PipeSchemaRegionWritePlanEventBatch getBatch(final Object sink)
throws Exception {
+ return (PipeSchemaRegionWritePlanEventBatch)
+ getFieldValue(sink, "schemaRegionWritePlanEventBatch");
+ }
+
+ private PipeSchemaRegionWritePlanEvent createBatchableEvent(
+ final String path, final String pipeName, final long creationTime)
throws Exception {
+ return new PipeSchemaRegionWritePlanEvent(
+ new CreateTimeSeriesNode(
+ new
org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId(path),
+ new org.apache.iotdb.commons.path.MeasurementPath(path),
+ TSDataType.INT64,
+ TSEncoding.PLAIN,
+ CompressionType.LZ4,
+ null,
+ null,
+ null,
+ null),
+ pipeName,
+ creationTime,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ true,
+ false);
+ }
+
+ private PipeSchemaRegionWritePlanEvent createNonBatchableEvent(
+ final String path, final String pipeName, final long creationTime)
throws Exception {
+ return new PipeSchemaRegionWritePlanEvent(
+ new CreateTimeSeriesNode(
+ new
org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId(path +
"-p"),
+ new org.apache.iotdb.commons.path.MeasurementPath(path),
+ TSDataType.INT64,
+ TSEncoding.PLAIN,
+ CompressionType.LZ4,
+ java.util.Collections.singletonMap("prop", "v1"),
+ null,
+ null,
+ null),
+ pipeName,
+ creationTime,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ true,
+ false);
+ }
+
+ private TPipeTransferResp createSuccessResp() {
+ final TPipeTransferResp resp = new TPipeTransferResp();
+ resp.setStatus(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()).setMessage("success"));
+ return resp;
+ }
+
+ private void setField(final Object target, final String fieldName, final
Object value)
+ throws Exception {
+ Field field = findField(target.getClass(), fieldName);
+ field.setAccessible(true);
+ field.set(target, value);
+ }
+
+ private Object getFieldValue(final Object target, final String fieldName)
throws Exception {
+ Field field = findField(target.getClass(), fieldName);
+ field.setAccessible(true);
+ return field.get(target);
+ }
+
+ private Field findField(final Class<?> clazz, final String fieldName)
+ throws NoSuchFieldException {
+ Class<?> current = clazz;
+ while (current != null) {
+ try {
+ return current.getDeclaredField(fieldName);
+ } catch (NoSuchFieldException ignored) {
+ current = current.getSuperclass();
+ }
+ }
+ throw new NoSuchFieldException(fieldName);
+ }
+
+ private static class TestIoTDBSchemaRegionSyncSink extends
IoTDBSchemaRegionSink {
+
+ @Override
+ public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) {
+ return req;
+ }
+
+ @Override
+ public void rateLimitIfNeeded(
+ final String pipeName,
+ final long creationTime,
+ final TEndPoint endPoint,
+ final long bytesLength) {
+ // Do nothing in tests.
+ }
+ }
+
+ private static class TestIoTDBSchemaRegionAirGapSink extends
IoTDBSchemaRegionAirGapSink {
+
+ private int sendCount = 0;
+
+ private TestIoTDBSchemaRegionAirGapSink() {
+ sockets.add(new AirGapSocket("127.0.0.1", 6667));
+ isSocketAlive.add(true);
+ }
+
+ @Override
+ protected int nextSocketIndex() {
+ return 0;
+ }
+
+ @Override
+ protected boolean send(
+ final String pipeName,
+ final long creationTime,
+ final AirGapSocket socket,
+ final byte[] bytes) {
+ sendCount++;
+ return true;
+ }
+
+ private int getSendCount() {
+ return sendCount;
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSchemaRegionWritePlanEventBatchTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSchemaRegionWritePlanEventBatchTest.java
new file mode 100644
index 00000000000..2f31fdb41f7
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSchemaRegionWritePlanEventBatchTest.java
@@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink;
+
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
+import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
+import
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
+import
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeSchemaRegionWritePlanEventBatch;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.ActivateTemplateNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.BatchActivateTemplateNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateAlignedTimeSeriesNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateMultiTimeSeriesNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateTimeSeriesNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.InternalBatchActivateTemplateNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.InternalCreateMultiTimeSeriesNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.InternalCreateTimeSeriesNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.MeasurementGroup;
+import org.apache.iotdb.metrics.type.Histogram;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.utils.Pair;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class PipeSchemaRegionWritePlanEventBatchTest {
+
+ @Test
+ public void testBatchTimeSeriesEvents() throws Exception {
+ try (PipeSchemaRegionWritePlanEventBatch batch =
+ new PipeSchemaRegionWritePlanEventBatch(createParameters(1000,
1048576))) {
+ Assert.assertTrue(
+ batch.onEvent(
+ createEvent(
+ new CreateTimeSeriesNode(
+ new PlanNodeId("1"),
+ new MeasurementPath("root.db.d1.s1"),
+ TSDataType.INT64,
+ TSEncoding.PLAIN,
+ CompressionType.LZ4,
+ null,
+ Collections.singletonMap("tag", "v1"),
+ Collections.singletonMap("attr", "a1"),
+ "alias1"),
+ "pipeA",
+ 1L)));
+
+ Assert.assertTrue(
+ batch.onEvent(
+ createEvent(
+ new CreateAlignedTimeSeriesNode(
+ new PlanNodeId("2"),
+ new PartialPath("root.db.d2"),
+ Arrays.asList("s1", "s2"),
+ Arrays.asList(TSDataType.INT32, TSDataType.DOUBLE),
+ Arrays.asList(TSEncoding.RLE, TSEncoding.GORILLA),
+ Arrays.asList(CompressionType.SNAPPY,
CompressionType.ZSTD),
+ Arrays.asList("alias2", null),
+ Arrays.asList(Collections.singletonMap("tag", "v2"),
null),
+ Arrays.asList(Collections.singletonMap("attr", "a2"),
null)),
+ "pipeA",
+ 1L)));
+
+ final PlanNode planNode = batch.toPlanNode();
+ Assert.assertTrue(planNode instanceof InternalCreateMultiTimeSeriesNode);
+
+ final Map<PartialPath, Pair<Boolean, MeasurementGroup>> deviceMap =
+ ((InternalCreateMultiTimeSeriesNode) planNode).getDeviceMap();
+ Assert.assertEquals(2, deviceMap.size());
+
+ final Pair<Boolean, MeasurementGroup> d1Group = deviceMap.get(new
PartialPath("root.db.d1"));
+ Assert.assertNotNull(d1Group);
+ Assert.assertFalse(d1Group.getLeft());
+ Assert.assertEquals(Collections.singletonList("s1"),
d1Group.getRight().getMeasurements());
+ Assert.assertEquals(Collections.singletonList("alias1"),
d1Group.getRight().getAliasList());
+ Assert.assertEquals(
+ Collections.singletonList(Collections.singletonMap("tag", "v1")),
+ d1Group.getRight().getTagsList());
+ Assert.assertEquals(
+ Collections.singletonList(Collections.singletonMap("attr", "a1")),
+ d1Group.getRight().getAttributesList());
+
+ final Pair<Boolean, MeasurementGroup> d2Group = deviceMap.get(new
PartialPath("root.db.d2"));
+ Assert.assertNotNull(d2Group);
+ Assert.assertTrue(d2Group.getLeft());
+ Assert.assertEquals(Arrays.asList("s1", "s2"),
d2Group.getRight().getMeasurements());
+ Assert.assertEquals(Arrays.asList("alias2", null),
d2Group.getRight().getAliasList());
+ }
+ }
+
+ @Test
+ public void testBatchAdditionalTimeSeriesNodeTypes() throws Exception {
+ try (PipeSchemaRegionWritePlanEventBatch batch =
+ new PipeSchemaRegionWritePlanEventBatch(createParameters(1000,
1048576))) {
+ final Map<PartialPath, MeasurementGroup> createMultiMap = new
HashMap<>();
+ createMultiMap.put(new PartialPath("root.db.d1"),
createMeasurementGroup("s1", "alias1"));
+ Assert.assertTrue(
+ batch.onEvent(
+ createEvent(
+ new CreateMultiTimeSeriesNode(new PlanNodeId("1"),
createMultiMap),
+ "pipeA",
+ 1L)));
+
+ Assert.assertTrue(
+ batch.onEvent(
+ createEvent(
+ new InternalCreateTimeSeriesNode(
+ new PlanNodeId("2"),
+ new PartialPath("root.db.d2"),
+ createMeasurementGroup("s2", "alias2"),
+ true),
+ "pipeA",
+ 1L)));
+
+ final Map<PartialPath, Pair<Boolean, MeasurementGroup>>
internalCreateMultiMap =
+ new HashMap<>();
+ internalCreateMultiMap.put(
+ new PartialPath("root.db.d3"), new Pair<>(false,
createMeasurementGroup("s3", "alias3")));
+ Assert.assertTrue(
+ batch.onEvent(
+ createEvent(
+ new InternalCreateMultiTimeSeriesNode(
+ new PlanNodeId("3"), internalCreateMultiMap),
+ "pipeA",
+ 1L)));
+
+ final PlanNode planNode = batch.toPlanNode();
+ Assert.assertTrue(planNode instanceof InternalCreateMultiTimeSeriesNode);
+
+ final Map<PartialPath, Pair<Boolean, MeasurementGroup>> deviceMap =
+ ((InternalCreateMultiTimeSeriesNode) planNode).getDeviceMap();
+ Assert.assertEquals(3, deviceMap.size());
+ Assert.assertFalse(deviceMap.get(new
PartialPath("root.db.d1")).getLeft());
+ Assert.assertTrue(deviceMap.get(new
PartialPath("root.db.d2")).getLeft());
+ Assert.assertFalse(deviceMap.get(new
PartialPath("root.db.d3")).getLeft());
+ Assert.assertEquals(
+ Collections.singletonList("s2"),
+ deviceMap.get(new
PartialPath("root.db.d2")).getRight().getMeasurements());
+ Assert.assertEquals(
+ Collections.singletonList("alias3"),
+ deviceMap.get(new
PartialPath("root.db.d3")).getRight().getAliasList());
+ }
+ }
+
+ @Test
+ public void testBatchTemplateActivationEvents() throws Exception {
+ try (PipeSchemaRegionWritePlanEventBatch batch =
+ new PipeSchemaRegionWritePlanEventBatch(createParameters(1000,
1048576))) {
+ Assert.assertTrue(
+ batch.onEvent(
+ createEvent(
+ new ActivateTemplateNode(
+ new PlanNodeId("1"), new PartialPath("root.db.d1"), 1,
10),
+ "pipeA",
+ 1L)));
+
+ final Map<PartialPath, Pair<Integer, Integer>> templateActivationMap =
new HashMap<>();
+ templateActivationMap.put(new PartialPath("root.db.d2"), new Pair<>(2,
20));
+ Assert.assertTrue(
+ batch.onEvent(
+ createEvent(
+ new BatchActivateTemplateNode(new PlanNodeId("2"),
templateActivationMap),
+ "pipeA",
+ 1L)));
+
+ final Map<PartialPath, Pair<Integer, Integer>>
internalTemplateActivationMap =
+ new HashMap<>();
+ internalTemplateActivationMap.put(new PartialPath("root.db.d3"), new
Pair<>(3, 30));
+ Assert.assertTrue(
+ batch.onEvent(
+ createEvent(
+ new InternalBatchActivateTemplateNode(
+ new PlanNodeId("3"), internalTemplateActivationMap),
+ "pipeA",
+ 1L)));
+
+ final PlanNode planNode = batch.toPlanNode();
+ Assert.assertTrue(planNode instanceof BatchActivateTemplateNode);
+
+ final Map<PartialPath, Pair<Integer, Integer>> batchedMap =
+ ((BatchActivateTemplateNode) planNode).getTemplateActivationMap();
+ Assert.assertEquals(3, batchedMap.size());
+ Assert.assertEquals(new Pair<>(10, 1), batchedMap.get(new
PartialPath("root.db.d1")));
+ Assert.assertEquals(new Pair<>(2, 20), batchedMap.get(new
PartialPath("root.db.d2")));
+ Assert.assertEquals(new Pair<>(3, 30), batchedMap.get(new
PartialPath("root.db.d3")));
+
+ Assert.assertFalse(
+ batch.onEvent(
+ createEvent(
+ new CreateTimeSeriesNode(
+ new PlanNodeId("4"),
+ new MeasurementPath("root.db.d4.s1"),
+ TSDataType.INT64,
+ TSEncoding.PLAIN,
+ CompressionType.LZ4,
+ null,
+ null,
+ null,
+ null),
+ "pipeA",
+ 1L)));
+ }
+ }
+
+ @Test
+ public void testRejectDifferentPipePropsAndAlignmentConflict() throws
Exception {
+ try (PipeSchemaRegionWritePlanEventBatch batch =
+ new PipeSchemaRegionWritePlanEventBatch(createParameters(1000,
1048576))) {
+ Assert.assertFalse(
+ batch.onEvent(
+ createEvent(
+ new CreateTimeSeriesNode(
+ new PlanNodeId("1"),
+ new MeasurementPath("root.db.d1.s1"),
+ TSDataType.INT64,
+ TSEncoding.PLAIN,
+ CompressionType.LZ4,
+ Collections.singletonMap("prop", "v1"),
+ null,
+ null,
+ null),
+ "pipeA",
+ 1L)));
+
+ final Map<PartialPath, MeasurementGroup> measurementGroupMapWithProps =
new HashMap<>();
+ measurementGroupMapWithProps.put(
+ new PartialPath("root.db.d2"),
createMeasurementGroupWithProps("s1"));
+ Assert.assertFalse(
+ batch.onEvent(
+ createEvent(
+ new CreateMultiTimeSeriesNode(new PlanNodeId("2"),
measurementGroupMapWithProps),
+ "pipeA",
+ 1L)));
+
+ Assert.assertTrue(
+ batch.onEvent(
+ createEvent(
+ new CreateTimeSeriesNode(
+ new PlanNodeId("3"),
+ new MeasurementPath("root.db.d3.s1"),
+ TSDataType.INT64,
+ TSEncoding.PLAIN,
+ CompressionType.LZ4,
+ null,
+ null,
+ null,
+ null),
+ "pipeA",
+ 1L)));
+
+ Assert.assertFalse(
+ batch.onEvent(
+ createEvent(
+ new CreateAlignedTimeSeriesNode(
+ new PlanNodeId("4"),
+ new PartialPath("root.db.d3"),
+ Collections.singletonList("s2"),
+ Collections.singletonList(TSDataType.INT32),
+ Collections.singletonList(TSEncoding.RLE),
+ Collections.singletonList(CompressionType.SNAPPY),
+ null,
+ null,
+ null),
+ "pipeA",
+ 1L)));
+
+ Assert.assertFalse(
+ batch.onEvent(
+ createEvent(
+ new CreateTimeSeriesNode(
+ new PlanNodeId("5"),
+ new MeasurementPath("root.db.d4.s1"),
+ TSDataType.INT64,
+ TSEncoding.PLAIN,
+ CompressionType.LZ4,
+ null,
+ null,
+ null,
+ null),
+ "pipeB",
+ 1L)));
+ }
+ }
+
+ @Test
+ public void testDiscardEventsOfPipeRebuildsBatchAndResetsEmitWindow() throws
Exception {
+ try (PipeSchemaRegionWritePlanEventBatch batch =
+ new PipeSchemaRegionWritePlanEventBatch(createParameters(10000,
1048576))) {
+ final PipeSchemaRegionWritePlanEvent removedEvent =
+ createEvent(
+ new CreateTimeSeriesNode(
+ new PlanNodeId("1"),
+ new MeasurementPath("root.db.d1.s1"),
+ TSDataType.INT64,
+ TSEncoding.PLAIN,
+ CompressionType.LZ4,
+ null,
+ null,
+ null,
+ null),
+ "pipeA",
+ 1L,
+ 1);
+ final PipeSchemaRegionWritePlanEvent remainingEvent =
+ createEvent(
+ new CreateTimeSeriesNode(
+ new PlanNodeId("2"),
+ new MeasurementPath("root.db.d2.s1"),
+ TSDataType.INT64,
+ TSEncoding.PLAIN,
+ CompressionType.LZ4,
+ null,
+ null,
+ null,
+ null),
+ "pipeA",
+ 1L,
+ 2);
+
+ Assert.assertTrue(batch.onEvent(removedEvent));
+ Assert.assertTrue(batch.onEvent(remainingEvent));
+
+ setField(batch, "firstEventProcessingTime", System.currentTimeMillis() -
20000);
+ Assert.assertTrue(batch.shouldEmit());
+
+ batch.discardEventsOfPipe("pipeA", 1L, 1);
+
+ Assert.assertTrue(removedEvent.isReleased());
+ Assert.assertFalse(remainingEvent.isReleased());
+ Assert.assertEquals(1, batch.size());
+ Assert.assertEquals("pipeA", batch.getPipeName());
+ Assert.assertEquals(1L, batch.getCreationTime());
+ Assert.assertFalse(batch.shouldEmit());
+
+ final PlanNode planNode = batch.toPlanNode();
+ final Map<PartialPath, Pair<Boolean, MeasurementGroup>> deviceMap =
+ ((InternalCreateMultiTimeSeriesNode) planNode).getDeviceMap();
+ Assert.assertEquals(1, deviceMap.size());
+ Assert.assertTrue(deviceMap.containsKey(new PartialPath("root.db.d2")));
+ }
+ }
+
+ @Test
+ public void testRecordMetricsAndCloseReleaseEvents() throws Exception {
+ try (PipeSchemaRegionWritePlanEventBatch batch =
+ new PipeSchemaRegionWritePlanEventBatch(createParameters(1000,
1048576))) {
+ final Histogram batchSizeHistogram = Mockito.mock(Histogram.class);
+ final Histogram batchTimeIntervalHistogram =
Mockito.mock(Histogram.class);
+ final Histogram eventSizeHistogram = Mockito.mock(Histogram.class);
+ batch.setBatchSizeHistogram(batchSizeHistogram);
+ batch.setBatchTimeIntervalHistogram(batchTimeIntervalHistogram);
+ batch.setEventSizeHistogram(eventSizeHistogram);
+
+ final PipeSchemaRegionWritePlanEvent event =
+ createEvent(
+ new CreateTimeSeriesNode(
+ new PlanNodeId("1"),
+ new MeasurementPath("root.db.d1.s1"),
+ TSDataType.INT64,
+ TSEncoding.PLAIN,
+ CompressionType.LZ4,
+ null,
+ null,
+ null,
+ null),
+ "pipeA",
+ 1L);
+
+ Assert.assertTrue(batch.onEvent(event));
+ batch.recordBatchMetrics();
+
+ verify(batchTimeIntervalHistogram, times(1)).update(anyLong());
+ verify(batchSizeHistogram, times(1)).update(anyLong());
+ verify(eventSizeHistogram, times(1)).update(1L);
+
+ batch.close();
+ Assert.assertTrue(event.isReleased());
+ }
+ }
+
+ private PipeParameters createParameters(final int delayInMs, final long
batchSizeInBytes) {
+ return new PipeParameters(
+ new HashMap<String, String>() {
+ {
+ put(CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY, String.valueOf(delayInMs));
+ put(CONNECTOR_IOTDB_BATCH_SIZE_KEY,
String.valueOf(batchSizeInBytes));
+ }
+ });
+ }
+
+ private MeasurementGroup createMeasurementGroup(final String measurement,
final String alias) {
+ final MeasurementGroup measurementGroup = new MeasurementGroup();
+ measurementGroup.addMeasurement(
+ measurement, TSDataType.INT64, TSEncoding.PLAIN, CompressionType.LZ4);
+ measurementGroup.addAlias(alias);
+ measurementGroup.addTags(Collections.singletonMap("tag", alias));
+ measurementGroup.addAttributes(Collections.singletonMap("attr", alias));
+ return measurementGroup;
+ }
+
+ private MeasurementGroup createMeasurementGroupWithProps(final String
measurement) {
+ final MeasurementGroup measurementGroup = new MeasurementGroup();
+ measurementGroup.addMeasurement(
+ measurement, TSDataType.INT64, TSEncoding.PLAIN, CompressionType.LZ4);
+ measurementGroup.addProps(Collections.singletonMap("prop", "v1"));
+ return measurementGroup;
+ }
+
+ private PipeSchemaRegionWritePlanEvent createEvent(
+ final PlanNode planNode, final String pipeName, final long creationTime)
{
+ return createEvent(planNode, pipeName, creationTime, -1);
+ }
+
+ private PipeSchemaRegionWritePlanEvent createEvent(
+ final PlanNode planNode, final String pipeName, final long creationTime,
final int regionId) {
+ final PipeSchemaRegionWritePlanEvent event =
+ new PipeSchemaRegionWritePlanEvent(
+ planNode, pipeName, creationTime, null, null, null, null, null,
null, true, false);
+ event.setCommitterKeyAndCommitId(new CommitterKey(pipeName, creationTime,
regionId, -1), 1L);
+ return event;
+ }
+
+ private void setField(final Object target, final String fieldName, final
Object value)
+ throws Exception {
+ final Field field = target.getClass().getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(target, value);
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
index b5662aeec2c..9e00b49072e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
@@ -656,6 +656,10 @@ public abstract class IoTDBSink implements PipeConnector,
PipeConnectorWithEvent
// do nothing by default
}
+ public void setSchemaBatchSizeHistogram(Histogram schemaBatchSizeHistogram) {
+ // do nothing by default
+ }
+
public void setTsFileBatchSizeHistogram(Histogram tsFileBatchSizeHistogram) {
// do nothing by default
}
@@ -664,6 +668,10 @@ public abstract class IoTDBSink implements PipeConnector,
PipeConnectorWithEvent
// do nothing by default
}
+ public void setSchemaBatchTimeIntervalHistogram(Histogram
schemaBatchTimeIntervalHistogram) {
+ // do nothing by default
+ }
+
public void setTsFileBatchTimeIntervalHistogram(Histogram
tsFileBatchTimeIntervalHistogram) {
// do nothing by default
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index cbeef1f2c7c..f5a76e6d66a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -152,8 +152,10 @@ public enum Metric {
UNTRANSFERRED_TSFILE_COUNT("untransferred_tsfile_count"),
UNTRANSFERRED_HEARTBEAT_COUNT("untransferred_heartbeat_count"),
PIPE_INSERT_NODE_BATCH_SIZE("pipe_insert_node_batch_size"),
+ PIPE_SCHEMA_BATCH_SIZE("pipe_schema_batch_size"),
PIPE_TSFILE_BATCH_SIZE("pipe_tsfile_batch_size"),
PIPE_INSERT_NODE_BATCH_TIME_COST("pipe_insert_node_batch_time_cost"),
+ PIPE_SCHEMA_BATCH_TIME_COST("pipe_schema_batch_time_cost"),
PIPE_TSFILE_BATCH_TIME_COST("pipe_tsfile_batch_time_cost"),
PIPE_CONNECTOR_BATCH_SIZE("pipe_connector_batch_size"),
PIPE_PENDING_HANDLERS_SIZE("pipe_pending_handlers_size"),