This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/2.0.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e1162bede4206c8d212344d93b83c033387cbf72 Author: Zhenyu Luo <[email protected]> AuthorDate: Tue Jul 29 15:51:57 2025 +0800 Pipe: Modify Sink Batch Metrics (#16018) * Pipe: Modify Sink Batch Metrics * update * update (cherry picked from commit 226fc6fd684834dd1797fcd83fa732da1472fb79) --- .../agent/task/subtask/sink/PipeSinkSubtask.java | 27 +++++ .../metric/sink/PipeDataRegionSinkMetrics.java | 111 +++++++++++++++------ .../evolvable/batch/PipeTabletEventBatch.java | 19 +++- .../evolvable/batch/PipeTabletEventPlainBatch.java | 17 ++-- .../batch/PipeTabletEventTsFileBatch.java | 21 ++-- .../batch/PipeTransferBatchReqBuilder.java | 51 +++++++++- .../thrift/async/IoTDBDataRegionAsyncSink.java | 29 ++++++ .../thrift/sync/IoTDBDataRegionSyncSink.java | 29 ++++++ .../commons/pipe/sink/protocol/IoTDBSink.java | 17 ++++ 9 files changed, 265 insertions(+), 56 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java index 4b98ae82d81..ffd484b1306 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java @@ -34,6 +34,7 @@ import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncS import org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.utils.ErrorHandlingUtils; +import org.apache.iotdb.metrics.type.Histogram; import org.apache.iotdb.pipe.api.PipeConnector; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; @@ -337,6 +338,32 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask { : 0; } + public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) { + if (outputPipeConnector instanceof IoTDBSink) { + ((IoTDBSink) outputPipeConnector).setTabletBatchSizeHistogram(tabletBatchSizeHistogram); + } + } + + public void setTsFileBatchSizeHistogram(Histogram tsFileBatchSizeHistogram) { + if (outputPipeConnector instanceof IoTDBSink) { + ((IoTDBSink) outputPipeConnector).setTsFileBatchSizeHistogram(tsFileBatchSizeHistogram); + } + } + + public void setTabletBatchTimeIntervalHistogram(Histogram tabletBatchTimeIntervalHistogram) { + if (outputPipeConnector instanceof IoTDBSink) { + ((IoTDBSink) outputPipeConnector) + .setTabletBatchTimeIntervalHistogram(tabletBatchTimeIntervalHistogram); + } + } + + public void setTsFileBatchTimeIntervalHistogram(Histogram tsFileBatchTimeIntervalHistogram) { + if (outputPipeConnector instanceof IoTDBSink) { + ((IoTDBSink) outputPipeConnector) + .setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram); + } + } + //////////////////////////// Error report //////////////////////////// @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java index 52a8828b2ee..e1ddc368a5f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java @@ -23,7 +23,6 @@ import org.apache.iotdb.commons.service.metric.enums.Metric; import org.apache.iotdb.commons.service.metric.enums.Tag; import org.apache.iotdb.db.pipe.agent.task.subtask.sink.PipeSinkSubtask; import org.apache.iotdb.metrics.AbstractMetricService; -import org.apache.iotdb.metrics.impl.DoNothingHistogram; import org.apache.iotdb.metrics.metricsets.IMetricSet; import org.apache.iotdb.metrics.type.Histogram; import org.apache.iotdb.metrics.type.Rate; @@ -45,14 +44,6 @@ public class PipeDataRegionSinkMetrics implements IMetricSet { private static final Logger LOGGER = LoggerFactory.getLogger(PipeDataRegionSinkMetrics.class); - public static Histogram tabletBatchSizeHistogram = new DoNothingHistogram(); - - public static Histogram tsFileBatchSizeHistogram = new DoNothingHistogram(); - - public static Histogram tabletBatchTimeIntervalHistogram = new DoNothingHistogram(); - - public static Histogram tsFileBatchTimeIntervalHistogram = new DoNothingHistogram(); - @SuppressWarnings("java:S3077") private volatile AbstractMetricService metricService; @@ -75,28 +66,13 @@ public class PipeDataRegionSinkMetrics implements IMetricSet { for (String taskID : taskIDs) { createMetrics(taskID); } - - tabletBatchSizeHistogram = - metricService.getOrCreateHistogram( - Metric.PIPE_INSERT_NODE_BATCH_SIZE.toString(), MetricLevel.IMPORTANT); - - tsFileBatchSizeHistogram = - metricService.getOrCreateHistogram( - Metric.PIPE_TSFILE_BATCH_SIZE.toString(), MetricLevel.IMPORTANT); - - tabletBatchTimeIntervalHistogram = - metricService.getOrCreateHistogram( - Metric.PIPE_INSERT_NODE_BATCH_TIME_COST.toString(), MetricLevel.IMPORTANT); - - tsFileBatchTimeIntervalHistogram = - metricService.getOrCreateHistogram( - Metric.PIPE_TSFILE_BATCH_TIME_COST.toString(), MetricLevel.IMPORTANT); } private void createMetrics(final String taskID) { createAutoGauge(taskID); createRate(taskID); createTimer(taskID); + createHistogram(taskID); } private void createAutoGauge(final String taskID) { @@ -245,6 +221,50 @@ public class PipeDataRegionSinkMetrics implements IMetricSet { String.valueOf(connector.getCreationTime()))); } + private void createHistogram(final String taskID) { + final PipeSinkSubtask connector = connectorMap.get(taskID); + + final Histogram tabletBatchSizeHistogram = + metricService.getOrCreateHistogram( + Metric.PIPE_INSERT_NODE_BATCH_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + connector.getAttributeSortedString(), + Tag.CREATION_TIME.toString(), + String.valueOf(connector.getCreationTime())); + connector.setTabletBatchSizeHistogram(tabletBatchSizeHistogram); + + final Histogram tsFileBatchSizeHistogram = + metricService.getOrCreateHistogram( + Metric.PIPE_TSFILE_BATCH_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + connector.getAttributeSortedString(), + Tag.CREATION_TIME.toString(), + String.valueOf(connector.getCreationTime())); + connector.setTsFileBatchSizeHistogram(tsFileBatchSizeHistogram); + + final Histogram tabletBatchTimeIntervalHistogram = + metricService.getOrCreateHistogram( + Metric.PIPE_INSERT_NODE_BATCH_TIME_COST.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + connector.getAttributeSortedString(), + Tag.CREATION_TIME.toString(), + String.valueOf(connector.getCreationTime())); + connector.setTabletBatchTimeIntervalHistogram(tabletBatchTimeIntervalHistogram); + + final Histogram tsFileBatchTimeIntervalHistogram = + metricService.getOrCreateHistogram( + Metric.PIPE_TSFILE_BATCH_TIME_COST.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + connector.getAttributeSortedString(), + Tag.CREATION_TIME.toString(), + String.valueOf(connector.getCreationTime())); + connector.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram); + } + @Override public void unbindFrom(final AbstractMetricService metricService) { final ImmutableSet<String> taskIDs = ImmutableSet.copyOf(connectorMap.keySet()); @@ -255,20 +275,13 @@ public class PipeDataRegionSinkMetrics implements IMetricSet { LOGGER.warn( "Failed to unbind from pipe data region connector metrics, connector map not empty"); } - - metricService.remove(MetricType.HISTOGRAM, Metric.PIPE_INSERT_NODE_BATCH_SIZE.toString()); - - metricService.remove(MetricType.HISTOGRAM, Metric.PIPE_TSFILE_BATCH_SIZE.toString()); - - metricService.remove(MetricType.HISTOGRAM, Metric.PIPE_INSERT_NODE_BATCH_TIME_COST.toString()); - - metricService.remove(MetricType.HISTOGRAM, Metric.PIPE_TSFILE_BATCH_TIME_COST.toString()); } private void removeMetrics(final String taskID) { removeAutoGauge(taskID); removeRate(taskID); removeTimer(taskID); + removeHistogram(taskID); } private void removeAutoGauge(final String taskID) { @@ -397,6 +410,38 @@ public class PipeDataRegionSinkMetrics implements IMetricSet { compressionTimerMap.remove(connector.getAttributeSortedString()); } + private void removeHistogram(final String taskID) { + final PipeSinkSubtask connector = connectorMap.get(taskID); + metricService.remove( + MetricType.HISTOGRAM, + Metric.PIPE_INSERT_NODE_BATCH_SIZE.toString(), + Tag.NAME.toString(), + connector.getAttributeSortedString(), + Tag.CREATION_TIME.toString(), + String.valueOf(connector.getCreationTime())); + metricService.remove( + MetricType.HISTOGRAM, + Metric.PIPE_TSFILE_BATCH_SIZE.toString(), + Tag.NAME.toString(), + connector.getAttributeSortedString(), + Tag.CREATION_TIME.toString(), + String.valueOf(connector.getCreationTime())); + metricService.remove( + MetricType.HISTOGRAM, + Metric.PIPE_INSERT_NODE_BATCH_TIME_COST.toString(), + Tag.NAME.toString(), + connector.getAttributeSortedString(), + Tag.CREATION_TIME.toString(), + String.valueOf(connector.getCreationTime())); + metricService.remove( + MetricType.HISTOGRAM, + Metric.PIPE_TSFILE_BATCH_TIME_COST.toString(), + Tag.NAME.toString(), + connector.getAttributeSortedString(), + Tag.CREATION_TIME.toString(), + String.valueOf(connector.getCreationTime())); + } + //////////////////////////// register & deregister (pipe integration) //////////////////////////// public void register(@NonNull final PipeSinkSubtask pipeSinkSubtask) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java index c7622c4a95f..58316f4c816 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java @@ -36,6 +36,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.function.BiConsumer; public abstract class PipeTabletEventBatch implements AutoCloseable { @@ -43,6 +44,7 @@ public abstract class PipeTabletEventBatch implements AutoCloseable { private static PipeModelFixedMemoryBlock pipeModelFixedMemoryBlock = null; protected final List<EnrichedEvent> events = new ArrayList<>(); + protected final BiConsumer<Long, Long> recordMetric; private final int maxDelayInMs; private long firstEventProcessingTime = Long.MIN_VALUE; @@ -52,12 +54,23 @@ public abstract class PipeTabletEventBatch implements AutoCloseable { protected volatile boolean isClosed = false; - protected PipeTabletEventBatch(final int maxDelayInMs, final long requestMaxBatchSizeInBytes) { + protected PipeTabletEventBatch( + final int maxDelayInMs, + final long requestMaxBatchSizeInBytes, + final BiConsumer<Long, Long> recordMetric) { if (pipeModelFixedMemoryBlock == null) { init(); } this.maxDelayInMs = maxDelayInMs; + if (recordMetric != null) { + this.recordMetric = recordMetric; + } else { + this.recordMetric = + (timeInterval, bufferSize) -> { + // do nothing + }; + } // limit in buffer size this.allocatedMemoryBlock = @@ -129,14 +142,12 @@ public abstract class PipeTabletEventBatch implements AutoCloseable { final long diff = System.currentTimeMillis() - firstEventProcessingTime; if (totalBufferSize >= getMaxBatchSizeInBytes() || diff >= maxDelayInMs) { allocatedMemoryBlock.updateCurrentMemoryEfficiencyAdjustMem((double) diff / maxDelayInMs); - recordMetric(diff, totalBufferSize); + recordMetric.accept(diff, totalBufferSize); return true; } return false; } - protected abstract void recordMetric(final long timeInterval, final long bufferSize); - private long getMaxBatchSizeInBytes() { return allocatedMemoryBlock.getMemoryUsageInBytes(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java index c637b2bff42..9e3155bb9e8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java @@ -22,7 +22,6 @@ package org.apache.iotdb.db.pipe.sink.payload.evolvable.batch; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; -import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBatchReqV2; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException; @@ -40,6 +39,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.BiConsumer; public class PipeTabletEventPlainBatch extends PipeTabletEventBatch { @@ -56,7 +56,14 @@ public class PipeTabletEventPlainBatch extends PipeTabletEventBatch { private final Map<Pair<String, Long>, Long> pipe2BytesAccumulated = new HashMap<>(); PipeTabletEventPlainBatch(final int maxDelayInMs, final long requestMaxBatchSizeInBytes) { - super(maxDelayInMs, requestMaxBatchSizeInBytes); + super(maxDelayInMs, requestMaxBatchSizeInBytes, null); + } + + PipeTabletEventPlainBatch( + final int maxDelayInMs, + final long requestMaxBatchSizeInBytes, + final BiConsumer<Long, Long> recordMetric) { + super(maxDelayInMs, requestMaxBatchSizeInBytes, recordMetric); } @Override @@ -72,12 +79,6 @@ public class PipeTabletEventPlainBatch extends PipeTabletEventBatch { return true; } - @Override - protected void recordMetric(long timeInterval, long bufferSize) { - PipeDataRegionSinkMetrics.tabletBatchTimeIntervalHistogram.update(timeInterval); - PipeDataRegionSinkMetrics.tabletBatchSizeHistogram.update(bufferSize); - } - @Override public synchronized void onSuccess() { super.onSuccess(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java index 660706a4325..cb8c25ef755 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java @@ -21,7 +21,6 @@ package org.apache.iotdb.db.pipe.sink.payload.evolvable.batch; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; -import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil; import org.apache.iotdb.db.pipe.sink.util.builder.PipeTableModelTsFileBuilderV2; import org.apache.iotdb.db.pipe.sink.util.builder.PipeTreeModelTsFileBuilderV2; @@ -45,6 +44,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; public class PipeTabletEventTsFileBatch extends PipeTabletEventBatch { @@ -59,7 +59,18 @@ public class PipeTabletEventTsFileBatch extends PipeTabletEventBatch { private final Map<Pair<String, Long>, Double> pipeName2WeightMap = new HashMap<>(); public PipeTabletEventTsFileBatch(final int maxDelayInMs, final long requestMaxBatchSizeInBytes) { - super(maxDelayInMs, requestMaxBatchSizeInBytes); + super(maxDelayInMs, requestMaxBatchSizeInBytes, null); + + final AtomicLong tsFileIdGenerator = new AtomicLong(0); + treeModeTsFileBuilder = new PipeTreeModelTsFileBuilderV2(currentBatchId, tsFileIdGenerator); + tableModeTsFileBuilder = new PipeTableModelTsFileBuilderV2(currentBatchId, tsFileIdGenerator); + } + + public PipeTabletEventTsFileBatch( + final int maxDelayInMs, + final long requestMaxBatchSizeInBytes, + final BiConsumer<Long, Long> recordMetric) { + super(maxDelayInMs, requestMaxBatchSizeInBytes, recordMetric); final AtomicLong tsFileIdGenerator = new AtomicLong(0); treeModeTsFileBuilder = new PipeTreeModelTsFileBuilderV2(currentBatchId, tsFileIdGenerator); @@ -126,12 +137,6 @@ public class PipeTabletEventTsFileBatch extends PipeTabletEventBatch { return true; } - @Override - protected void recordMetric(long timeInterval, long bufferSize) { - PipeDataRegionSinkMetrics.tsFileBatchTimeIntervalHistogram.update(timeInterval); - PipeDataRegionSinkMetrics.tsFileBatchSizeHistogram.update(bufferSize); - } - private void bufferTreeModelTablet( final String pipeName, final long creationTime, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java index d30acaf146a..9c4b74a6800 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java @@ -25,6 +25,8 @@ import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.sink.client.IoTDBDataNodeCacheLeaderClientManager; import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException; +import org.apache.iotdb.metrics.impl.DoNothingHistogram; +import org.apache.iotdb.metrics.type.Histogram; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; @@ -68,6 +70,11 @@ public class PipeTransferBatchReqBuilder implements AutoCloseable { private final int requestMaxDelayInMs; private final long requestMaxBatchSizeInBytes; + private Histogram tabletBatchSizeHistogram = new DoNothingHistogram(); + private Histogram tsFileBatchSizeHistogram = new DoNothingHistogram(); + private Histogram tabletBatchTimeIntervalHistogram = new DoNothingHistogram(); + private Histogram tsFileBatchTimeIntervalHistogram = new DoNothingHistogram(); + // If the leader cache is disabled (or unable to find the endpoint of event in the leader cache), // the event will be stored in the default batch. private final PipeTabletEventBatch defaultBatch; @@ -113,8 +120,10 @@ public class PipeTransferBatchReqBuilder implements AutoCloseable { : CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE); this.defaultBatch = usingTsFileBatch - ? new PipeTabletEventTsFileBatch(requestMaxDelayInMs, requestMaxBatchSizeInBytes) - : new PipeTabletEventPlainBatch(requestMaxDelayInMs, requestMaxBatchSizeInBytes); + ? new PipeTabletEventTsFileBatch( + requestMaxDelayInMs, requestMaxBatchSizeInBytes, this::recordTsFileMetric) + : new PipeTabletEventPlainBatch( + requestMaxDelayInMs, requestMaxBatchSizeInBytes, this::recordTabletMetric); } /** @@ -157,7 +166,9 @@ public class PipeTransferBatchReqBuilder implements AutoCloseable { endPointToBatch .computeIfAbsent( endPoint, - k -> new PipeTabletEventPlainBatch(requestMaxDelayInMs, requestMaxBatchSizeInBytes)) + k -> + new PipeTabletEventPlainBatch( + requestMaxDelayInMs, requestMaxBatchSizeInBytes, this::recordTabletMetric)) .onEvent(event); } @@ -208,4 +219,38 @@ public class PipeTransferBatchReqBuilder implements AutoCloseable { defaultBatch.close(); endPointToBatch.values().forEach(PipeTabletEventPlainBatch::close); } + + public void recordTabletMetric(long timeInterval, long bufferSize) { + this.tabletBatchTimeIntervalHistogram.update(timeInterval); + this.tabletBatchSizeHistogram.update(bufferSize); + } + + public void recordTsFileMetric(long timeInterval, long bufferSize) { + this.tsFileBatchTimeIntervalHistogram.update(timeInterval); + this.tsFileBatchSizeHistogram.update(bufferSize); + } + + public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) { + if (tabletBatchSizeHistogram != null) { + this.tabletBatchSizeHistogram = tabletBatchSizeHistogram; + } + } + + public void setTsFileBatchSizeHistogram(Histogram tsFileBatchSizeHistogram) { + if (tsFileBatchSizeHistogram != null) { + this.tsFileBatchSizeHistogram = tsFileBatchSizeHistogram; + } + } + + public void setTabletBatchTimeIntervalHistogram(Histogram tabletBatchTimeIntervalHistogram) { + if (tabletBatchTimeIntervalHistogram != null) { + this.tabletBatchTimeIntervalHistogram = tabletBatchTimeIntervalHistogram; + } + } + + public void setTsFileBatchTimeIntervalHistogram(Histogram tsFileBatchTimeIntervalHistogram) { + if (tsFileBatchTimeIntervalHistogram != null) { + this.tsFileBatchTimeIntervalHistogram = tsFileBatchTimeIntervalHistogram; + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java index 9fd63a054be..38ac7e0201a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java @@ -47,6 +47,7 @@ import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler.PipeTransferT import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler.PipeTransferTsFileHandler; import org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; +import org.apache.iotdb.metrics.type.Histogram; import org.apache.iotdb.pipe.api.PipeConnector; import org.apache.iotdb.pipe.api.annotation.TableModel; import org.apache.iotdb.pipe.api.annotation.TreeModel; @@ -832,4 +833,32 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { public void setTransferTsFileCounter(AtomicInteger transferTsFileCounter) { this.transferTsFileCounter = transferTsFileCounter; } + + @Override + public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) { + if (tabletBatchBuilder != null) { + tabletBatchBuilder.setTabletBatchSizeHistogram(tabletBatchSizeHistogram); + } + } + + @Override + public void setTsFileBatchSizeHistogram(Histogram tsFileBatchSizeHistogram) { + if (tabletBatchBuilder != null) { + tabletBatchBuilder.setTsFileBatchSizeHistogram(tsFileBatchSizeHistogram); + } + } + + @Override + public void setTabletBatchTimeIntervalHistogram(Histogram tabletBatchTimeIntervalHistogram) { + if (tabletBatchBuilder != null) { + tabletBatchBuilder.setTabletBatchTimeIntervalHistogram(tabletBatchTimeIntervalHistogram); + } + } + + @Override + public void setTsFileBatchTimeIntervalHistogram(Histogram tsFileBatchTimeIntervalHistogram) { + if (tabletBatchBuilder != null) { + tabletBatchBuilder.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java index 6b8327170c4..c9185b44d44 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java @@ -48,6 +48,7 @@ import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFil import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFileSealWithModReq; import org.apache.iotdb.db.pipe.sink.util.cacher.LeaderCacheUtils; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; +import org.apache.iotdb.metrics.type.Histogram; import org.apache.iotdb.pipe.api.annotation.TableModel; import org.apache.iotdb.pipe.api.annotation.TreeModel; import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; @@ -626,4 +627,32 @@ public class IoTDBDataRegionSyncSink extends IoTDBDataNodeSyncSink { public IoTDBDataNodeSyncClientManager getClientManager() { return clientManager; } + + @Override + public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) { + if (tabletBatchBuilder != null) { + tabletBatchBuilder.setTabletBatchSizeHistogram(tabletBatchSizeHistogram); + } + } + + @Override + public void setTsFileBatchSizeHistogram(Histogram tsFileBatchSizeHistogram) { + if (tabletBatchBuilder != null) { + tabletBatchBuilder.setTsFileBatchSizeHistogram(tsFileBatchSizeHistogram); + } + } + + @Override + public void setTabletBatchTimeIntervalHistogram(Histogram tabletBatchTimeIntervalHistogram) { + if (tabletBatchBuilder != null) { + tabletBatchBuilder.setTabletBatchTimeIntervalHistogram(tabletBatchTimeIntervalHistogram); + } + } + + @Override + public void setTsFileBatchTimeIntervalHistogram(Histogram tsFileBatchTimeIntervalHistogram) { + if (tabletBatchBuilder != null) { + tabletBatchBuilder.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram); + } + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java index 471dc28bc06..38f3503b66f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java @@ -30,6 +30,7 @@ import org.apache.iotdb.commons.pipe.sink.limiter.GlobalRPCRateLimiter; import org.apache.iotdb.commons.pipe.sink.limiter.PipeEndPointRateLimiter; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferCompressedReq; import org.apache.iotdb.commons.utils.NodeUrlUtils; +import org.apache.iotdb.metrics.type.Histogram; import org.apache.iotdb.metrics.type.Timer; import org.apache.iotdb.pipe.api.PipeConnector; import org.apache.iotdb.pipe.api.annotation.TableModel; @@ -631,4 +632,20 @@ public abstract class IoTDBSink implements PipeConnector { public PipeReceiverStatusHandler statusHandler() { return receiverStatusHandler; } + + public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) { + // do nothing by default + } + + public void setTsFileBatchSizeHistogram(Histogram tsFileBatchSizeHistogram) { + // do nothing by default + } + + public void setTabletBatchTimeIntervalHistogram(Histogram tabletBatchTimeIntervalHistogram) { + // do nothing by default + } + + public void setTsFileBatchTimeIntervalHistogram(Histogram tsFileBatchTimeIntervalHistogram) { + // do nothing by default + } }
