This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/2.0.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d1cffae9a1e0d83ef3835de831b4cc4c6f6e53b0 Author: Caideyipi <[email protected]> AuthorDate: Tue Aug 5 18:06:00 2025 +0800 Revert Pipe: Added rate limiter for tsFile sending (cherry picked from commit 73a504814f549cd15648072ae744837c04baf913) --- .../sink/protocol/IoTDBConfigRegionAirGapSink.java | 5 -- .../pipe/sink/protocol/IoTDBConfigRegionSink.java | 5 -- .../task/builder/PipeDataNodeTaskBuilder.java | 66 ++++++++-------------- .../pipe/metric/overview/PipeResourceMetrics.java | 13 ----- .../protocol/airgap/IoTDBDataRegionAirGapSink.java | 31 ---------- .../airgap/IoTDBSchemaRegionAirGapSink.java | 5 -- .../thrift/async/IoTDBDataRegionAsyncSink.java | 14 ----- .../async/handler/PipeTransferTsFileHandler.java | 6 -- .../thrift/sync/IoTDBDataRegionSyncSink.java | 21 ------- .../thrift/sync/IoTDBSchemaRegionSink.java | 5 -- .../source/dataregion/IoTDBDataRegionSource.java | 10 +--- .../load/limiter/LoadTsFileRateLimiter.java | 64 ++++++++++++++++++--- .../apache/iotdb/commons/conf/CommonConfig.java | 16 ------ .../iotdb/commons/pipe/config/PipeConfig.java | 6 -- .../iotdb/commons/pipe/config/PipeDescriptor.java | 6 -- .../pipe/config/constant/PipeSinkConstant.java | 5 -- .../pipe/sink/limiter/GlobalRPCRateLimiter.java | 33 ----------- .../pipe/sink/limiter/GlobalRateLimiter.java | 14 ++--- .../pipe/sink/limiter/TsFileSendRateLimiter.java | 47 --------------- .../pipe/sink/protocol/IoTDBAirGapSink.java | 3 - .../commons/pipe/sink/protocol/IoTDBSink.java | 4 +- .../pipe/sink/protocol/IoTDBSslSyncSink.java | 3 - .../iotdb/commons/service/metric/enums/Metric.java | 1 - 23 files changed, 92 insertions(+), 291 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionAirGapSink.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionAirGapSink.java index 5f32bae8c27..c9c8ded4cf0 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionAirGapSink.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionAirGapSink.java @@ -88,11 +88,6 @@ public class IoTDBConfigRegionAirGapSink extends IoTDBAirGapSink { return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params); } - @Override - protected void mayLimitRateAndRecordIO(final long requiredBytes) { - // Do nothing - } - @Override protected boolean mayNeedHandshakeWhenFail() { return true; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java index e846410b5e7..a1e9239ccb2 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java @@ -103,11 +103,6 @@ public class IoTDBConfigRegionSink extends IoTDBSslSyncSink { return PipeTransferConfigSnapshotPieceReq.toTPipeTransferReq(fileName, position, payLoad); } - @Override - protected void mayLimitRateAndRecordIO(final long requiredBytes) { - // Do nothing - } - @Override public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception { throw new UnsupportedOperationException( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java index 2b6f18c167d..7f0b6b4ff18 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java @@ -56,11 +56,8 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.E import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE; -import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE; -import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_SNAPSHOT_KEY; -import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_REALTIME_ENABLE_KEY; public class PipeDataNodeTaskBuilder { @@ -187,6 +184,10 @@ public class PipeDataNodeTaskBuilder { || extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE); } + if (!insertionDeletionListeningOptionPair.right + && !shouldTerminatePipeOnAllHistoricalEventsConsumed) { + return; + } } catch (final IllegalPathException e) { LOGGER.warn( "PipeDataNodeTaskBuilder failed to parse 'inclusion' and 'exclusion' parameters: {}", @@ -195,50 +196,29 @@ public class PipeDataNodeTaskBuilder { return; } - if (insertionDeletionListeningOptionPair.right - || shouldTerminatePipeOnAllHistoricalEventsConsumed) { - final Boolean isRealtime = - connectorParameters.getBooleanByKeys( - PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY, - PipeSinkConstant.SINK_REALTIME_FIRST_KEY); - if (isRealtime == null) { - connectorParameters.addAttribute(PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY, "false"); - if (insertionDeletionListeningOptionPair.right) { - LOGGER.info( - "PipeDataNodeTaskBuilder: When 'inclusion' contains 'data.delete', 'realtime-first' is defaulted to 'false' to prevent sync issues after deletion."); - } else { - LOGGER.info( - "PipeDataNodeTaskBuilder: When extractor uses snapshot model, 'realtime-first' is defaulted to 'false' to prevent premature halt before transfer completion."); - } - } else if (isRealtime) { - if (insertionDeletionListeningOptionPair.right) { - LOGGER.warn( - "PipeDataNodeTaskBuilder: When 'inclusion' includes 'data.delete', 'realtime-first' set to 'true' may result in data synchronization issues after deletion."); - } else { - LOGGER.warn( - "PipeDataNodeTaskBuilder: When extractor uses snapshot model, 'realtime-first' set to 'true' may cause prevent premature halt before transfer completion."); - } + final Boolean isRealtime = + connectorParameters.getBooleanByKeys( + PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY, + PipeSinkConstant.SINK_REALTIME_FIRST_KEY); + if (isRealtime == null) { + connectorParameters.addAttribute(PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY, "false"); + if (insertionDeletionListeningOptionPair.right) { + LOGGER.info( + "PipeDataNodeTaskBuilder: When 'inclusion' contains 'data.delete', 'realtime-first' is defaulted to 'false' to prevent sync issues after deletion."); + } else { + LOGGER.info( + "PipeDataNodeTaskBuilder: When extractor uses snapshot model, 'realtime-first' is defaulted to 'false' to prevent premature halt before transfer completion."); } + return; } - final boolean isRealtimeEnabled = - extractorParameters.getBooleanOrDefault( - Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY), - EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE); - - if (isRealtimeEnabled && !shouldTerminatePipeOnAllHistoricalEventsConsumed) { - final Boolean enableSendTsFileLimit = - connectorParameters.getBooleanByKeys( - PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT, - PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT); - - if (enableSendTsFileLimit == null) { - connectorParameters.addAttribute(PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT, "true"); - LOGGER.info( - "PipeDataNodeTaskBuilder: When the realtime sync is enabled, we enable rate limiter in sending tsfile by default to reserve disk and network IO for realtime sending."); - } else if (!enableSendTsFileLimit) { + if (isRealtime) { + if (insertionDeletionListeningOptionPair.right) { + LOGGER.warn( + "PipeDataNodeTaskBuilder: When 'inclusion' includes 'data.delete', 'realtime-first' set to 'true' may result in data synchronization issues after deletion."); + } else { LOGGER.warn( - "PipeDataNodeTaskBuilder: When the realtime sync is enabled, not enabling the rate limiter in sending tsfile may introduce delay for realtime sending."); + "PipeDataNodeTaskBuilder: When extractor uses snapshot model, 'realtime-first' set to 'true' may cause prevent premature halt before transfer completion."); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java index 4156aea4bc5..54f9dfe4092 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java @@ -26,9 +26,7 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager; import org.apache.iotdb.metrics.AbstractMetricService; -import org.apache.iotdb.metrics.impl.DoNothingMetricManager; import org.apache.iotdb.metrics.metricsets.IMetricSet; -import org.apache.iotdb.metrics.type.Counter; import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.metrics.utils.MetricType; @@ -43,7 +41,6 @@ public class PipeResourceMetrics implements IMetricSet { private static final String PIPE_TOTAL_MEMORY = "PipeTotalMemory"; - private Counter diskIOCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; private static final String PIPE_FLOATING_MEMORY = "PipeFloatingMemory"; //////////////////////////// bindTo & unbindFrom (metric framework) //////////////////////////// @@ -99,10 +96,6 @@ public class PipeResourceMetrics implements IMetricSet { MetricLevel.IMPORTANT, PipeDataNodeResourceManager.ref(), PipePhantomReferenceManager::getPhantomReferenceCount); - // tsFile send rate - diskIOCounter = - metricService.getOrCreateCounter( - Metric.PIPE_TSFILE_SEND_DISK_IO.toString(), MetricLevel.IMPORTANT); } @Override @@ -137,12 +130,6 @@ public class PipeResourceMetrics implements IMetricSet { metricService.remove(MetricType.AUTO_GAUGE, Metric.PIPE_LINKED_TSFILE_SIZE.toString()); // phantom reference count metricService.remove(MetricType.AUTO_GAUGE, Metric.PIPE_PHANTOM_REFERENCE_COUNT.toString()); - - metricService.remove(MetricType.RATE, Metric.PIPE_TSFILE_SEND_DISK_IO.toString()); - } - - public void recordDiskIO(final long bytes) { - diskIOCounter.inc(bytes); } //////////////////////////// singleton //////////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java index 5e2dfb6f9a0..8f776b21759 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java @@ -21,14 +21,12 @@ package org.apache.iotdb.db.pipe.sink.protocol.airgap; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; -import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter; import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; -import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics; import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferPlanNodeReq; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBinaryReqV2; @@ -41,8 +39,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException; import org.apache.iotdb.pipe.api.annotation.TableModel; import org.apache.iotdb.pipe.api.annotation.TreeModel; -import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; -import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; @@ -55,33 +51,14 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; -import java.util.Arrays; import java.util.Objects; -import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT; -import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE; -import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT; - @TreeModel @TableModel public class IoTDBDataRegionAirGapSink extends IoTDBDataNodeAirGapSink { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataRegionAirGapSink.class); - private boolean enableSendTsFileLimit; - - @Override - public void customize( - final PipeParameters parameters, final PipeConnectorRuntimeConfiguration configuration) - throws Exception { - super.customize(parameters, configuration); - - enableSendTsFileLimit = - parameters.getBooleanOrDefault( - Arrays.asList(SINK_ENABLE_SEND_TSFILE_LIMIT, CONNECTOR_ENABLE_SEND_TSFILE_LIMIT), - CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE); - } - @Override public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception { // PipeProcessor can change the type of TabletInsertionEvent @@ -376,14 +353,6 @@ public class IoTDBDataRegionAirGapSink extends IoTDBDataNodeAirGapSink { } } - @Override - protected void mayLimitRateAndRecordIO(final long requiredBytes) { - PipeResourceMetrics.getInstance().recordDiskIO(requiredBytes); - if (enableSendTsFileLimit) { - TsFileSendRateLimiter.getInstance().acquire(requiredBytes); - } - } - @Override protected byte[] getTransferSingleFilePieceBytes( final String fileName, final long position, final byte[] payLoad) throws IOException { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java index 3634b2396a4..4cc5bb055d3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java @@ -197,11 +197,6 @@ public class IoTDBSchemaRegionAirGapSink extends IoTDBDataNodeAirGapSink { } } - @Override - protected void mayLimitRateAndRecordIO(final long requiredBytes) { - // Do nothing - } - @Override protected byte[] getTransferSingleFilePieceBytes( final String fileName, final long position, final byte[] payLoad) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java index 21fb0c9a730..1f35a96b84d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java @@ -81,11 +81,8 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT; -import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY; -import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_SSL_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY; @@ -124,8 +121,6 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { private final Map<PipeTransferTrackableHandler, PipeTransferTrackableHandler> pendingHandlers = new ConcurrentHashMap<>(); - private boolean enableSendTsFileLimit; - @Override public void validate(final PipeParameterValidator validator) throws Exception { super.validate(validator); @@ -181,11 +176,6 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { if (isTabletBatchModeEnabled) { tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters); } - - enableSendTsFileLimit = - parameters.getBooleanOrDefault( - Arrays.asList(SINK_ENABLE_SEND_TSFILE_LIMIT, CONNECTOR_ENABLE_SEND_TSFILE_LIMIT), - CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE); } @Override @@ -711,10 +701,6 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { events.forEach(this::addFailureEventToRetryQueue); } - public boolean isEnableSendTsFileLimit() { - return enableSendTsFileLimit; - } - //////////////////////////// Operations for close //////////////////////////// @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java index 9f8fbcf44b0..4ca34073d44 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java @@ -23,11 +23,9 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; -import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter; import org.apache.iotdb.commons.pipe.sink.payload.thrift.response.PipeTransferFilePieceResp; import org.apache.iotdb.commons.utils.RetryUtils; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; -import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager; import org.apache.iotdb.db.pipe.resource.memory.PipeTsFileMemoryBlock; @@ -169,10 +167,6 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { client.setShouldReturnSelf(false); client.setTimeoutDynamically(clientManager.getConnectionTimeout()); - PipeResourceMetrics.getInstance().recordDiskIO(readFileBufferSize); - if (connector.isEnableSendTsFileLimit()) { - TsFileSendRateLimiter.getInstance().acquire(readFileBufferSize); - } final int readLength = reader.read(readBuffer); if (readLength == -1) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java index b3bded4a2bd..4b6ec2a843b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java @@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient; -import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFilePieceReq; import org.apache.iotdb.commons.utils.RetryUtils; import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent; @@ -32,7 +31,6 @@ import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; -import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics; import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics; import org.apache.iotdb.db.pipe.sink.client.IoTDBDataNodeSyncClientManager; import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventBatch; @@ -71,16 +69,11 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.nio.file.NoSuchFileException; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; -import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT; -import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE; -import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT; - @TreeModel @TableModel public class IoTDBDataRegionSyncSink extends IoTDBDataNodeSyncSink { @@ -88,7 +81,6 @@ public class IoTDBDataRegionSyncSink extends IoTDBDataNodeSyncSink { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataRegionSyncSink.class); private PipeTransferBatchReqBuilder tabletBatchBuilder; - private boolean enableSendTsFileLimit; @Override public void customize( @@ -100,11 +92,6 @@ public class IoTDBDataRegionSyncSink extends IoTDBDataNodeSyncSink { if (isTabletBatchModeEnabled) { tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters); } - - enableSendTsFileLimit = - parameters.getBooleanOrDefault( - Arrays.asList(SINK_ENABLE_SEND_TSFILE_LIMIT, CONNECTOR_ENABLE_SEND_TSFILE_LIMIT), - CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE); } @Override @@ -119,14 +106,6 @@ public class IoTDBDataRegionSyncSink extends IoTDBDataNodeSyncSink { return PipeTransferTsFilePieceWithModReq.toTPipeTransferReq(fileName, position, payLoad); } - @Override - protected void mayLimitRateAndRecordIO(final long requiredBytes) { - PipeResourceMetrics.getInstance().recordDiskIO(requiredBytes); - if (enableSendTsFileLimit) { - TsFileSendRateLimiter.getInstance().acquire(requiredBytes); - } - } - @Override public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception { // PipeProcessor can change the type of TabletInsertionEvent diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java index e1e1e7868d7..ab18ac32dc6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java @@ -244,9 +244,4 @@ public class IoTDBSchemaRegionSink extends IoTDBDataNodeSyncSink { final String fileName, final long position, final byte[] payLoad) throws IOException { return PipeTransferSchemaSnapshotPieceReq.toTPipeTransferReq(fileName, position, payLoad); } - - @Override - protected void mayLimitRateAndRecordIO(final long requiredBytes) { - // Do nothing - } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java index 12bf5bc6db2..72dc0b51754 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java @@ -336,19 +336,15 @@ public class IoTDBDataRegionSource extends IoTDBSource { EXTRACTOR_END_TIME_KEY) && parameters.hasAnyAttributes( EXTRACTOR_HISTORY_ENABLE_KEY, + EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_HISTORY_ENABLE_KEY, - SOURCE_HISTORY_START_TIME_KEY, - EXTRACTOR_HISTORY_START_TIME_KEY, - SOURCE_HISTORY_END_TIME_KEY, - EXTRACTOR_HISTORY_END_TIME_KEY)) { + SOURCE_REALTIME_ENABLE_KEY)) { LOGGER.warn( - "When {}, {}, {} or {} is specified, specifying {}, {}, {}, {}, {} and {} is invalid.", + "When {}, {}, {} or {} is specified, specifying {}, {}, {} and {} is invalid.", SOURCE_START_TIME_KEY, EXTRACTOR_START_TIME_KEY, SOURCE_END_TIME_KEY, EXTRACTOR_END_TIME_KEY, - SOURCE_HISTORY_ENABLE_KEY, - EXTRACTOR_HISTORY_ENABLE_KEY, SOURCE_HISTORY_START_TIME_KEY, EXTRACTOR_HISTORY_START_TIME_KEY, SOURCE_HISTORY_END_TIME_KEY, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/limiter/LoadTsFileRateLimiter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/limiter/LoadTsFileRateLimiter.java index f616c6695e5..eaaf376a819 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/limiter/LoadTsFileRateLimiter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/limiter/LoadTsFileRateLimiter.java @@ -19,28 +19,78 @@ package org.apache.iotdb.db.storageengine.load.limiter; -import org.apache.iotdb.commons.pipe.sink.limiter.GlobalRateLimiter; +import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet; -public class LoadTsFileRateLimiter extends GlobalRateLimiter { +import com.google.common.util.concurrent.AtomicDouble; +import com.google.common.util.concurrent.RateLimiter; + +import java.util.concurrent.TimeUnit; + +public class LoadTsFileRateLimiter { private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); - @Override + private final AtomicDouble throughputBytesPerSecond = + new AtomicDouble(CONFIG.getLoadWriteThroughputBytesPerSecond()); + private final RateLimiter loadWriteRateLimiter; + public void acquire(long bytes) { LoadTsFileCostMetricsSet.getInstance().recordDiskIO(bytes); - super.acquire(bytes); + + if (reloadParams()) { + return; + } + + while (bytes > 0) { + if (bytes > Integer.MAX_VALUE) { + tryAcquireWithRateCheck(Integer.MAX_VALUE); + bytes -= Integer.MAX_VALUE; + } else { + tryAcquireWithRateCheck((int) bytes); + return; + } + } } - @Override - protected double getThroughputBytesPerSecond() { - return CONFIG.getLoadWriteThroughputBytesPerSecond(); + private void tryAcquireWithRateCheck(final int bytes) { + while (!loadWriteRateLimiter.tryAcquire( + bytes, + PipeConfig.getInstance().getRateLimiterHotReloadCheckIntervalMs(), + TimeUnit.MILLISECONDS)) { + if (reloadParams()) { + return; + } + } + } + + private boolean reloadParams() { + final double throughputBytesPerSecondLimit = CONFIG.getLoadWriteThroughputBytesPerSecond(); + + if (throughputBytesPerSecond.get() != throughputBytesPerSecondLimit) { + throughputBytesPerSecond.set(throughputBytesPerSecondLimit); + loadWriteRateLimiter.setRate( + // if throughput <= 0, disable rate limiting + throughputBytesPerSecondLimit <= 0 ? Double.MAX_VALUE : throughputBytesPerSecondLimit); + } + + // For performance, we don't need to acquire rate limiter if throughput <= 0 + return throughputBytesPerSecondLimit <= 0; } //////////////////////////// Singleton //////////////////////////// + private LoadTsFileRateLimiter() { + final double throughputBytesPerSecondLimit = throughputBytesPerSecond.get(); + loadWriteRateLimiter = + // if throughput <= 0, disable rate limiting + throughputBytesPerSecondLimit <= 0 + ? RateLimiter.create(Double.MAX_VALUE) + : RateLimiter.create(throughputBytesPerSecondLimit); + } + private static class LoadTsFileRateLimiterHolder { private static final LoadTsFileRateLimiter INSTANCE = new LoadTsFileRateLimiter(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 648cfeaad68..3031518edc3 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -273,7 +273,6 @@ public class CommonConfig { private int pipeAsyncConnectorMaxTsFileClientNumber = Math.max(16, Runtime.getRuntime().availableProcessors()); - private double pipeSendTsFileRateLimitBytesPerSecond = 32 * MB; private double pipeAllSinksRateLimitBytesPerSecond = -1; private int rateLimiterHotReloadCheckIntervalMs = 1000; @@ -1867,21 +1866,6 @@ public class CommonConfig { pipeCheckSyncAllClientLiveTimeIntervalMs); } - public double getPipeSendTsFileRateLimitBytesPerSecond() { - return pipeSendTsFileRateLimitBytesPerSecond; - } - - public void setPipeSendTsFileRateLimitBytesPerSecond( - double pipeSendTsFileRateLimitBytesPerSecond) { - if (this.pipeSendTsFileRateLimitBytesPerSecond == pipeSendTsFileRateLimitBytesPerSecond) { - return; - } - this.pipeSendTsFileRateLimitBytesPerSecond = pipeSendTsFileRateLimitBytesPerSecond; - logger.info( - "pipeSendTsFileRateLimitBytesPerSecond is set to {}", - pipeSendTsFileRateLimitBytesPerSecond); - } - public double getPipeAllSinksRateLimitBytesPerSecond() { return pipeAllSinksRateLimitBytesPerSecond; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index a646ed90dbf..7d1064c7462 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -217,10 +217,6 @@ public class PipeConfig { return COMMON_CONFIG.getPipeAsyncConnectorMaxTsFileClientNumber(); } - public double getPipeSendTsFileRateLimitBytesPerSecond() { - return COMMON_CONFIG.getPipeSendTsFileRateLimitBytesPerSecond(); - } - public double getPipeAllConnectorsRateLimitBytesPerSecond() { return COMMON_CONFIG.getPipeAllSinksRateLimitBytesPerSecond(); } @@ -541,8 +537,6 @@ public class PipeConfig { "PipeAsyncConnectorMaxTsFileClientNumber: {}", getPipeAsyncConnectorMaxTsFileClientNumber()); - LOGGER.info( - "PipeSendTsFileRateLimitBytesPerSecond: {}", getPipeSendTsFileRateLimitBytesPerSecond()); LOGGER.info( "PipeAllConnectorsRateLimitBytesPerSecond: {}", getPipeAllConnectorsRateLimitBytesPerSecond()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java index 7bdfab83efa..895b940ebf8 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java @@ -578,12 +578,6 @@ public class PipeDescriptor { config.setPipeAsyncConnectorMaxTsFileClientNumber(Integer.parseInt(value)); } - value = - parserPipeConfig(properties, "pipe_send_tsfile_rate_limit_bytes_per_second", isHotModify); - if (value != null) { - config.setPipeSendTsFileRateLimitBytesPerSecond(Double.parseDouble(value)); - } - value = parserPipeConfig(properties, "pipe_all_sinks_rate_limit_bytes_per_second", isHotModify); if (value != null) { config.setPipeAllSinksRateLimitBytesPerSecond(Double.parseDouble(value)); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java index e09dd0ab5dd..42848eee8fc 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java @@ -220,11 +220,6 @@ public class PipeSinkConstant { public static final int CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MIN_VALUE = Zstd.minCompressionLevel(); public static final int CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MAX_VALUE = Zstd.maxCompressionLevel(); - public static final String CONNECTOR_ENABLE_SEND_TSFILE_LIMIT = - "connector.enable-send-tsfile-limit"; - public static final String SINK_ENABLE_SEND_TSFILE_LIMIT = "sink.enable-send-tsfile-limit"; - public static final boolean CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE = false; - public static final String CONNECTOR_RATE_LIMIT_KEY = "connector.rate-limit-bytes-per-second"; public static final String SINK_RATE_LIMIT_KEY = "sink.rate-limit-bytes-per-second"; public static final double CONNECTOR_RATE_LIMIT_DEFAULT_VALUE = -1; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRPCRateLimiter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRPCRateLimiter.java deleted file mode 100644 index 9a6aba5b90b..00000000000 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRPCRateLimiter.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.commons.pipe.sink.limiter; - -import org.apache.iotdb.commons.pipe.config.PipeConfig; - -/** This is a global rate limiter for all connectors. */ -public class GlobalRPCRateLimiter extends GlobalRateLimiter { - - private static final PipeConfig CONFIG = PipeConfig.getInstance(); - - @Override - protected double getThroughputBytesPerSecond() { - return CONFIG.getPipeAllConnectorsRateLimitBytesPerSecond(); - } -} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRateLimiter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRateLimiter.java index 358e6c844ec..10f465870ee 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRateLimiter.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRateLimiter.java @@ -26,17 +26,18 @@ import com.google.common.util.concurrent.RateLimiter; import java.util.concurrent.TimeUnit; -public abstract class GlobalRateLimiter { +/** This is a global rate limiter for all connectors. */ +public class GlobalRateLimiter { - private final AtomicDouble throughputBytesPerSecond = - new AtomicDouble(getThroughputBytesPerSecond()); + private static final PipeConfig CONFIG = PipeConfig.getInstance(); + private final AtomicDouble throughputBytesPerSecond = + new AtomicDouble(CONFIG.getPipeAllConnectorsRateLimitBytesPerSecond()); private final RateLimiter rateLimiter; public GlobalRateLimiter() { final double throughputBytesPerSecondLimit = throughputBytesPerSecond.get(); rateLimiter = - // if throughput <= 0, disable rate limiting throughputBytesPerSecondLimit <= 0 ? RateLimiter.create(Double.MAX_VALUE) : RateLimiter.create(throughputBytesPerSecondLimit); @@ -70,7 +71,8 @@ public abstract class GlobalRateLimiter { } private boolean reloadParams() { - final double throughputBytesPerSecondLimit = getThroughputBytesPerSecond(); + final double throughputBytesPerSecondLimit = + CONFIG.getPipeAllConnectorsRateLimitBytesPerSecond(); if (throughputBytesPerSecond.get() != throughputBytesPerSecondLimit) { throughputBytesPerSecond.set(throughputBytesPerSecondLimit); @@ -82,6 +84,4 @@ public abstract class GlobalRateLimiter { // For performance, we don't need to acquire rate limiter if throughput <= 0 return throughputBytesPerSecondLimit <= 0; } - - protected abstract double getThroughputBytesPerSecond(); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/TsFileSendRateLimiter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/TsFileSendRateLimiter.java deleted file mode 100644 index e7f1cc415b7..00000000000 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/TsFileSendRateLimiter.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.commons.pipe.sink.limiter; - -import org.apache.iotdb.commons.pipe.config.PipeConfig; - -public class TsFileSendRateLimiter extends GlobalRateLimiter { - - private static final PipeConfig CONFIG = PipeConfig.getInstance(); - - @Override - protected double getThroughputBytesPerSecond() { - return CONFIG.getPipeSendTsFileRateLimitBytesPerSecond(); - } - - //////////////////////////// Singleton //////////////////////////// - - private static class TsFileSendRateLimiterHolder { - - private static final TsFileSendRateLimiter INSTANCE = new TsFileSendRateLimiter(); - - private TsFileSendRateLimiterHolder() { - // Prevent instantiation - } - } - - public static TsFileSendRateLimiter getInstance() { - return TsFileSendRateLimiter.TsFileSendRateLimiterHolder.INSTANCE; - } -} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java index aba6c7e1d82..a9fe17ca0cc 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java @@ -274,7 +274,6 @@ public abstract class IoTDBAirGapSink extends IoTDBSink { long position = 0; try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) { while (true) { - mayLimitRateAndRecordIO(readFileBufferSize); final int readLength = reader.read(readBuffer); if (readLength == -1) { break; @@ -310,8 +309,6 @@ public abstract class IoTDBAirGapSink extends IoTDBSink { } } - protected abstract void mayLimitRateAndRecordIO(final long requiredBytes); - protected abstract boolean mayNeedHandshakeWhenFail(); protected abstract byte[] getTransferSingleFilePieceBytes( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java index efee7c9c311..0babe802639 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java @@ -26,7 +26,7 @@ import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler; import org.apache.iotdb.commons.pipe.sink.compressor.PipeCompressor; import org.apache.iotdb.commons.pipe.sink.compressor.PipeCompressorConfig; import org.apache.iotdb.commons.pipe.sink.compressor.PipeCompressorFactory; -import org.apache.iotdb.commons.pipe.sink.limiter.GlobalRPCRateLimiter; +import org.apache.iotdb.commons.pipe.sink.limiter.GlobalRateLimiter; import org.apache.iotdb.commons.pipe.sink.limiter.PipeEndPointRateLimiter; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferCompressedReq; import org.apache.iotdb.commons.utils.NodeUrlUtils; @@ -170,7 +170,7 @@ public abstract class IoTDBSink implements PipeConnector { private static final Map<Pair<String, Long>, PipeEndPointRateLimiter> PIPE_END_POINT_RATE_LIMITER_MAP = new ConcurrentHashMap<>(); private double endPointRateLimitBytesPerSecond = -1; - private static final GlobalRPCRateLimiter GLOBAL_RATE_LIMITER = new GlobalRPCRateLimiter(); + private static final GlobalRateLimiter GLOBAL_RATE_LIMITER = new GlobalRateLimiter(); protected boolean isTabletBatchModeEnabled = true; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java index 710a57f4f17..66d84ed7d2c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java @@ -186,7 +186,6 @@ public abstract class IoTDBSslSyncSink extends IoTDBSink { long position = 0; try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) { while (true) { - mayLimitRateAndRecordIO(readFileBufferSize); final int readLength = reader.read(readBuffer); if (readLength == -1) { break; @@ -256,8 +255,6 @@ public abstract class IoTDBSslSyncSink extends IoTDBSink { protected abstract PipeTransferFilePieceReq getTransferMultiFilePieceReq( final String fileName, final long position, final byte[] payLoad) throws IOException; - protected abstract void mayLimitRateAndRecordIO(final long requiredBytes); - @Override public void close() { if (clientManager != null) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java index c226b53a3ee..0cca2c87a31 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java @@ -171,7 +171,6 @@ public enum Metric { PIPE_LINKED_TSFILE_COUNT("pipe_linked_tsfile_count"), PIPE_LINKED_TSFILE_SIZE("pipe_linked_tsfile_size"), PIPE_PHANTOM_REFERENCE_COUNT("pipe_phantom_reference_count"), - PIPE_TSFILE_SEND_DISK_IO("pipe_tsfile_send_disk_io"), PIPE_ASYNC_CONNECTOR_RETRY_EVENT_QUEUE_SIZE("pipe_async_connector_retry_event_queue_size"), PIPE_EVENT_COMMIT_QUEUE_SIZE("pipe_event_commit_queue_size"), PIPE_PROCEDURE("pipe_procedure"),
