This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/1.3.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2e339af15f6f5fd132f24bf9c3aec39197ad266c Author: Caideyipi <[email protected]> AuthorDate: Tue Jul 15 23:50:22 2025 +0800 Pipe: Added rate limiter for tsFile sending (#15765) (#15947) As the title said. --- .../protocol/IoTDBConfigRegionAirGapConnector.java | 5 ++ .../protocol/IoTDBConfigRegionConnector.java | 5 ++ .../task/builder/PipeDataNodeTaskBuilder.java | 68 ++++++++++++++-------- .../airgap/IoTDBDataRegionAirGapConnector.java | 31 ++++++++++ .../airgap/IoTDBSchemaRegionAirGapConnector.java | 5 ++ .../async/IoTDBDataRegionAsyncConnector.java | 14 +++++ .../async/handler/PipeTransferTsFileHandler.java | 6 ++ .../thrift/sync/IoTDBDataRegionSyncConnector.java | 21 +++++++ .../thrift/sync/IoTDBSchemaRegionConnector.java | 5 ++ .../dataregion/IoTDBDataRegionExtractor.java | 4 +- .../pipe/metric/overview/PipeResourceMetrics.java | 14 ++++- .../load/limiter/LoadTsFileRateLimiter.java | 64 +++----------------- .../apache/iotdb/commons/conf/CommonConfig.java | 16 +++++ .../iotdb/commons/pipe/config/PipeConfig.java | 6 ++ .../iotdb/commons/pipe/config/PipeDescriptor.java | 6 ++ .../config/constant/PipeConnectorConstant.java | 5 ++ .../connector/limiter/GlobalRPCRateLimiter.java | 33 +++++++++++ .../pipe/connector/limiter/GlobalRateLimiter.java | 14 ++--- .../connector/limiter/TsFileSendRateLimiter.java | 47 +++++++++++++++ .../connector/protocol/IoTDBAirGapConnector.java | 3 + .../pipe/connector/protocol/IoTDBConnector.java | 4 +- .../connector/protocol/IoTDBSslSyncConnector.java | 3 + .../iotdb/commons/service/metric/enums/Metric.java | 1 + 23 files changed, 289 insertions(+), 91 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java index c5fdc4765f9..595182d47d5 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java @@ -85,6 +85,11 @@ public class IoTDBConfigRegionAirGapConnector extends IoTDBAirGapConnector { return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params); } + @Override + protected void mayLimitRateAndRecordIO(final long requiredBytes) { + // Do nothing + } + @Override protected boolean mayNeedHandshakeWhenFail() { return true; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java index 3761f696875..f04e18027b0 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java @@ -97,6 +97,11 @@ public class IoTDBConfigRegionConnector extends IoTDBSslSyncConnector { return PipeTransferConfigSnapshotPieceReq.toTPipeTransferReq(fileName, position, payLoad); } + @Override + protected void mayLimitRateAndRecordIO(final long requiredBytes) { + // Do nothing + } + @Override public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception { throw new UnsupportedOperationException( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java index f84cb73fd69..6ba1acae9bb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java @@ -52,7 +52,10 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_ENABLE_KEY; public class PipeDataNodeTaskBuilder { @@ -170,10 +173,6 @@ public class PipeDataNodeTaskBuilder { extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE) || extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE); - if (!insertionDeletionListeningOptionPair.right - && !shouldTerminatePipeOnAllHistoricalEventsConsumed) { - return; - } } catch (final IllegalPathException e) { LOGGER.warn( "PipeDataNodeTaskBuilder failed to parse 'inclusion' and 'exclusion' parameters: {}", @@ -182,29 +181,52 @@ public class PipeDataNodeTaskBuilder { return; } - final Boolean isRealtime = - connectorParameters.getBooleanByKeys( - PipeConnectorConstant.CONNECTOR_REALTIME_FIRST_KEY, - PipeConnectorConstant.SINK_REALTIME_FIRST_KEY); - if (isRealtime == null) { - connectorParameters.addAttribute(PipeConnectorConstant.CONNECTOR_REALTIME_FIRST_KEY, "false"); - if (insertionDeletionListeningOptionPair.right) { - LOGGER.info( - "PipeDataNodeTaskBuilder: When 'inclusion' contains 'data.delete', 'realtime-first' is defaulted to 'false' to prevent sync issues after deletion."); - } else { - LOGGER.info( - "PipeDataNodeTaskBuilder: When extractor uses snapshot model, 'realtime-first' is defaulted to 'false' to prevent premature halt before transfer completion."); + if (insertionDeletionListeningOptionPair.right + || shouldTerminatePipeOnAllHistoricalEventsConsumed) { + final Boolean isRealtime = + connectorParameters.getBooleanByKeys( + PipeConnectorConstant.CONNECTOR_REALTIME_FIRST_KEY, + PipeConnectorConstant.SINK_REALTIME_FIRST_KEY); + if (isRealtime == null) { + connectorParameters.addAttribute( + PipeConnectorConstant.CONNECTOR_REALTIME_FIRST_KEY, "false"); + if (insertionDeletionListeningOptionPair.right) { + LOGGER.info( + "PipeDataNodeTaskBuilder: When 'inclusion' contains 'data.delete', 'realtime-first' is defaulted to 'false' to prevent sync issues after deletion."); + } else { + LOGGER.info( + "PipeDataNodeTaskBuilder: When extractor uses snapshot model, 'realtime-first' is defaulted to 'false' to prevent premature halt before transfer completion."); + } + } else if (isRealtime) { + if (insertionDeletionListeningOptionPair.right) { + LOGGER.warn( + "PipeDataNodeTaskBuilder: When 'inclusion' includes 'data.delete', 'realtime-first' set to 'true' may result in data synchronization issues after deletion."); + } else { + LOGGER.warn( + "PipeDataNodeTaskBuilder: When extractor uses snapshot model, 'realtime-first' set to 'true' may cause prevent premature halt before transfer completion."); + } } - return; } - if (isRealtime) { - if (insertionDeletionListeningOptionPair.right) { - LOGGER.warn( - "PipeDataNodeTaskBuilder: When 'inclusion' includes 'data.delete', 'realtime-first' set to 'true' may result in data synchronization issues after deletion."); - } else { + final boolean isRealtimeEnabled = + extractorParameters.getBooleanOrDefault( + Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY), + EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE); + + if (isRealtimeEnabled && !shouldTerminatePipeOnAllHistoricalEventsConsumed) { + final Boolean enableSendTsFileLimit = + connectorParameters.getBooleanByKeys( + PipeConnectorConstant.SINK_ENABLE_SEND_TSFILE_LIMIT, + PipeConnectorConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT); + + if (enableSendTsFileLimit == null) { + connectorParameters.addAttribute( + PipeConnectorConstant.SINK_ENABLE_SEND_TSFILE_LIMIT, "true"); + LOGGER.info( + "PipeDataNodeTaskBuilder: When the realtime sync is enabled, we enable rate limiter in sending tsfile by default to reserve disk and network IO for realtime sending."); + } else if (!enableSendTsFileLimit) { LOGGER.warn( - "PipeDataNodeTaskBuilder: When extractor uses snapshot model, 'realtime-first' set to 'true' may cause prevent premature halt before transfer completion."); + "PipeDataNodeTaskBuilder: When the realtime sync is enabled, not enabling the rate limiter in sending tsfile may introduce delay for realtime sending."); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java index bc4805e8c8e..57b92262abb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.connector.protocol.airgap; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.pipe.connector.limiter.TsFileSendRateLimiter; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq; @@ -34,9 +35,12 @@ import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; +import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics; import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionConnectorMetrics; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; @@ -49,13 +53,32 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.util.Arrays; import java.util.Objects; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_ENABLE_SEND_TSFILE_LIMIT; + public class IoTDBDataRegionAirGapConnector extends IoTDBDataNodeAirGapConnector { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataRegionAirGapConnector.class); + private boolean enableSendTsFileLimit; + + @Override + public void customize( + final PipeParameters parameters, final PipeConnectorRuntimeConfiguration configuration) + throws Exception { + super.customize(parameters, configuration); + + enableSendTsFileLimit = + parameters.getBooleanOrDefault( + Arrays.asList(SINK_ENABLE_SEND_TSFILE_LIMIT, CONNECTOR_ENABLE_SEND_TSFILE_LIMIT), + CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE); + } + @Override public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception { // PipeProcessor can change the type of TabletInsertionEvent @@ -293,6 +316,14 @@ public class IoTDBDataRegionAirGapConnector extends IoTDBDataNodeAirGapConnector } } + @Override + protected void mayLimitRateAndRecordIO(final long requiredBytes) { + PipeResourceMetrics.getInstance().recordDiskIO(requiredBytes); + if (enableSendTsFileLimit) { + TsFileSendRateLimiter.getInstance().acquire(requiredBytes); + } + } + @Override protected byte[] getTransferSingleFilePieceBytes( final String fileName, final long position, final byte[] payLoad) throws IOException { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java index 342807a5dfb..84cc085509c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java @@ -143,6 +143,11 @@ public class IoTDBSchemaRegionAirGapConnector extends IoTDBDataNodeAirGapConnect } } + @Override + protected void mayLimitRateAndRecordIO(final long requiredBytes) { + // Do nothing + } + @Override protected byte[] getTransferSingleFilePieceBytes( final String fileName, final long position, final byte[] payLoad) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java index 0dacf0cb912..f78a52a8281 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java @@ -78,8 +78,11 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_ENABLE_SEND_TSFILE_LIMIT; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY; @@ -115,6 +118,8 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector { private final Map<PipeTransferTrackableHandler, PipeTransferTrackableHandler> pendingHandlers = new ConcurrentHashMap<>(); + private boolean enableSendTsFileLimit; + @Override public void validate(final PipeParameterValidator validator) throws Exception { super.validate(validator); @@ -170,6 +175,11 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector { if (isTabletBatchModeEnabled) { tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters); } + + enableSendTsFileLimit = + parameters.getBooleanOrDefault( + Arrays.asList(SINK_ENABLE_SEND_TSFILE_LIMIT, CONNECTOR_ENABLE_SEND_TSFILE_LIMIT), + CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE); } @Override @@ -682,6 +692,10 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector { events.forEach(this::addFailureEventToRetryQueue); } + public boolean isEnableSendTsFileLimit() { + return enableSendTsFileLimit; + } + //////////////////////////// Operations for close //////////////////////////// @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java index 7353ea91e91..dbbf489b5a5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient; import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.pipe.connector.limiter.TsFileSendRateLimiter; import org.apache.iotdb.commons.pipe.connector.payload.thrift.response.PipeTransferFilePieceResp; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.utils.RetryUtils; @@ -32,6 +33,7 @@ import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransfer import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealWithModReq; import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; +import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager; import org.apache.iotdb.db.pipe.resource.memory.PipeTsFileMemoryBlock; @@ -164,6 +166,10 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { client.setShouldReturnSelf(false); client.setTimeoutDynamically(clientManager.getConnectionTimeout()); + PipeResourceMetrics.getInstance().recordDiskIO(readFileBufferSize); + if (connector.isEnableSendTsFileLimit()) { + TsFileSendRateLimiter.getInstance().acquire(readFileBufferSize); + } final int readLength = reader.read(readBuffer); if (readLength == -1) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java index d996cac2820..e0fd0b11827 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.connector.protocol.thrift.sync; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient; +import org.apache.iotdb.commons.pipe.connector.limiter.TsFileSendRateLimiter; import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFilePieceReq; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.utils.RetryUtils; @@ -43,6 +44,7 @@ import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; +import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics; import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionConnectorMetrics; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; @@ -65,16 +67,22 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.nio.file.NoSuchFileException; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_ENABLE_SEND_TSFILE_LIMIT; + public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataRegionSyncConnector.class); private PipeTransferBatchReqBuilder tabletBatchBuilder; + private boolean enableSendTsFileLimit; @Override public void customize( @@ -86,6 +94,11 @@ public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector { if (isTabletBatchModeEnabled) { tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters); } + + enableSendTsFileLimit = + parameters.getBooleanOrDefault( + Arrays.asList(SINK_ENABLE_SEND_TSFILE_LIMIT, CONNECTOR_ENABLE_SEND_TSFILE_LIMIT), + CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE); } @Override @@ -100,6 +113,14 @@ public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector { return PipeTransferTsFilePieceWithModReq.toTPipeTransferReq(fileName, position, payLoad); } + @Override + protected void mayLimitRateAndRecordIO(final long requiredBytes) { + PipeResourceMetrics.getInstance().recordDiskIO(requiredBytes); + if (enableSendTsFileLimit) { + TsFileSendRateLimiter.getInstance().acquire(requiredBytes); + } + } + @Override public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception { // PipeProcessor can change the type of TabletInsertionEvent diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java index f70e18c0651..f12a8125f91 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java @@ -163,4 +163,9 @@ public class IoTDBSchemaRegionConnector extends IoTDBDataNodeSyncConnector { final String fileName, final long position, final byte[] payLoad) throws IOException { return PipeTransferSchemaSnapshotPieceReq.toTPipeTransferReq(fileName, position, payLoad); } + + @Override + protected void mayLimitRateAndRecordIO(final long requiredBytes) { + // Do nothing + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java index 8158c22d1a6..7fd7e37139e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java @@ -200,11 +200,13 @@ public class IoTDBDataRegionExtractor extends IoTDBExtractor { SOURCE_HISTORY_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY)) { LOGGER.warn( - "When {}, {}, {} or {} is specified, specifying {}, {}, {} and {} is invalid.", + "When {}, {}, {} or {} is specified, specifying {}, {}, {}, {}, {} and {} is invalid.", SOURCE_START_TIME_KEY, EXTRACTOR_START_TIME_KEY, SOURCE_END_TIME_KEY, EXTRACTOR_END_TIME_KEY, + SOURCE_HISTORY_ENABLE_KEY, + EXTRACTOR_HISTORY_ENABLE_KEY, SOURCE_HISTORY_START_TIME_KEY, EXTRACTOR_HISTORY_START_TIME_KEY, SOURCE_HISTORY_END_TIME_KEY, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java index 37f8eb5e26a..19a8060f89c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java @@ -26,7 +26,9 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager; import org.apache.iotdb.metrics.AbstractMetricService; +import org.apache.iotdb.metrics.impl.DoNothingMetricManager; import org.apache.iotdb.metrics.metricsets.IMetricSet; +import org.apache.iotdb.metrics.type.Counter; import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.metrics.utils.MetricType; @@ -40,8 +42,8 @@ public class PipeResourceMetrics implements IMetricSet { private static final String PIPE_TS_FILE_USED_MEMORY = "PipeTsFileUsedMemory"; private static final String PIPE_TOTAL_MEMORY = "PipeTotalMemory"; - private static final String PIPE_FLOATING_MEMORY = "PipeFloatingMemory"; + private Counter diskIOCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; //////////////////////////// bindTo & unbindFrom (metric framework) //////////////////////////// @@ -96,6 +98,10 @@ public class PipeResourceMetrics implements IMetricSet { MetricLevel.IMPORTANT, PipeDataNodeResourceManager.ref(), PipePhantomReferenceManager::getPhantomReferenceCount); + // tsFile send rate + diskIOCounter = + metricService.getOrCreateCounter( + Metric.PIPE_TSFILE_SEND_DISK_IO.toString(), MetricLevel.IMPORTANT); } @Override @@ -130,6 +136,12 @@ public class PipeResourceMetrics implements IMetricSet { metricService.remove(MetricType.AUTO_GAUGE, Metric.PIPE_LINKED_TSFILE_SIZE.toString()); // phantom reference count metricService.remove(MetricType.AUTO_GAUGE, Metric.PIPE_PHANTOM_REFERENCE_COUNT.toString()); + + metricService.remove(MetricType.RATE, Metric.PIPE_TSFILE_SEND_DISK_IO.toString()); + } + + public void recordDiskIO(final long bytes) { + diskIOCounter.inc(bytes); } //////////////////////////// singleton //////////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/limiter/LoadTsFileRateLimiter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/limiter/LoadTsFileRateLimiter.java index eaaf376a819..3be223287a2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/limiter/LoadTsFileRateLimiter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/limiter/LoadTsFileRateLimiter.java @@ -19,78 +19,28 @@ package org.apache.iotdb.db.storageengine.load.limiter; -import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.pipe.connector.limiter.GlobalRateLimiter; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet; -import com.google.common.util.concurrent.AtomicDouble; -import com.google.common.util.concurrent.RateLimiter; - -import java.util.concurrent.TimeUnit; - -public class LoadTsFileRateLimiter { +public class LoadTsFileRateLimiter extends GlobalRateLimiter { private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); - private final AtomicDouble throughputBytesPerSecond = - new AtomicDouble(CONFIG.getLoadWriteThroughputBytesPerSecond()); - private final RateLimiter loadWriteRateLimiter; - + @Override public void acquire(long bytes) { LoadTsFileCostMetricsSet.getInstance().recordDiskIO(bytes); - - if (reloadParams()) { - return; - } - - while (bytes > 0) { - if (bytes > Integer.MAX_VALUE) { - tryAcquireWithRateCheck(Integer.MAX_VALUE); - bytes -= Integer.MAX_VALUE; - } else { - tryAcquireWithRateCheck((int) bytes); - return; - } - } + super.acquire(bytes); } - private void tryAcquireWithRateCheck(final int bytes) { - while (!loadWriteRateLimiter.tryAcquire( - bytes, - PipeConfig.getInstance().getRateLimiterHotReloadCheckIntervalMs(), - TimeUnit.MILLISECONDS)) { - if (reloadParams()) { - return; - } - } - } - - private boolean reloadParams() { - final double throughputBytesPerSecondLimit = CONFIG.getLoadWriteThroughputBytesPerSecond(); - - if (throughputBytesPerSecond.get() != throughputBytesPerSecondLimit) { - throughputBytesPerSecond.set(throughputBytesPerSecondLimit); - loadWriteRateLimiter.setRate( - // if throughput <= 0, disable rate limiting - throughputBytesPerSecondLimit <= 0 ? Double.MAX_VALUE : throughputBytesPerSecondLimit); - } - - // For performance, we don't need to acquire rate limiter if throughput <= 0 - return throughputBytesPerSecondLimit <= 0; + @Override + protected double getThroughputBytesPerSecond() { + return CONFIG.getLoadWriteThroughputBytesPerSecond(); } //////////////////////////// Singleton //////////////////////////// - private LoadTsFileRateLimiter() { - final double throughputBytesPerSecondLimit = throughputBytesPerSecond.get(); - loadWriteRateLimiter = - // if throughput <= 0, disable rate limiting - throughputBytesPerSecondLimit <= 0 - ? RateLimiter.create(Double.MAX_VALUE) - : RateLimiter.create(throughputBytesPerSecondLimit); - } - private static class LoadTsFileRateLimiterHolder { private static final LoadTsFileRateLimiter INSTANCE = new LoadTsFileRateLimiter(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index dfcbba78cff..9dfe5b68017 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -277,6 +277,7 @@ public class CommonConfig { private int pipeAsyncConnectorMaxTsFileClientNumber = Math.max(16, Runtime.getRuntime().availableProcessors()); + private double pipeSendTsFileRateLimitBytesPerSecond = 32 * MB; private double pipeAllSinksRateLimitBytesPerSecond = -1; private int rateLimiterHotReloadCheckIntervalMs = 1000; @@ -1921,6 +1922,21 @@ public class CommonConfig { logger.info("pipeTransferTsFileSync is set to {}", pipeTransferTsFileSync); } + public double getPipeSendTsFileRateLimitBytesPerSecond() { + return pipeSendTsFileRateLimitBytesPerSecond; + } + + public void setPipeSendTsFileRateLimitBytesPerSecond( + double pipeSendTsFileRateLimitBytesPerSecond) { + if (this.pipeSendTsFileRateLimitBytesPerSecond == pipeSendTsFileRateLimitBytesPerSecond) { + return; + } + this.pipeSendTsFileRateLimitBytesPerSecond = pipeSendTsFileRateLimitBytesPerSecond; + logger.info( + "pipeSendTsFileRateLimitBytesPerSecond is set to {}", + pipeSendTsFileRateLimitBytesPerSecond); + } + public double getPipeAllSinksRateLimitBytesPerSecond() { return pipeAllSinksRateLimitBytesPerSecond; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index 4ce2fd19093..620666ded48 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -217,6 +217,10 @@ public class PipeConfig { return COMMON_CONFIG.getPipeAsyncConnectorMaxTsFileClientNumber(); } + public double getPipeSendTsFileRateLimitBytesPerSecond() { + return COMMON_CONFIG.getPipeSendTsFileRateLimitBytesPerSecond(); + } + public double getPipeAllConnectorsRateLimitBytesPerSecond() { return COMMON_CONFIG.getPipeAllSinksRateLimitBytesPerSecond(); } @@ -537,6 +541,8 @@ public class PipeConfig { "PipeAsyncConnectorMaxTsFileClientNumber: {}", getPipeAsyncConnectorMaxTsFileClientNumber()); + LOGGER.info( + "PipeSendTsFileRateLimitBytesPerSecond: {}", getPipeSendTsFileRateLimitBytesPerSecond()); LOGGER.info( "PipeAllConnectorsRateLimitBytesPerSecond: {}", getPipeAllConnectorsRateLimitBytesPerSecond()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java index d6bad9d18f6..7822934478e 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java @@ -594,6 +594,12 @@ public class PipeDescriptor { config.setPipeAsyncConnectorMaxTsFileClientNumber(Integer.parseInt(value)); } + value = + parserPipeConfig(properties, "pipe_send_tsfile_rate_limit_bytes_per_second", isHotModify); + if (value != null) { + config.setPipeSendTsFileRateLimitBytesPerSecond(Double.parseDouble(value)); + } + value = parserPipeConfig(properties, "pipe_all_sinks_rate_limit_bytes_per_second", isHotModify); if (value != null) { config.setPipeAllSinksRateLimitBytesPerSecond(Double.parseDouble(value)); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java index c6780934703..ede91b27652 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java @@ -215,6 +215,11 @@ public class PipeConnectorConstant { public static final int CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MIN_VALUE = Zstd.minCompressionLevel(); public static final int CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MAX_VALUE = Zstd.maxCompressionLevel(); + public static final String CONNECTOR_ENABLE_SEND_TSFILE_LIMIT = + "connector.enable-send-tsfile-limit"; + public static final String SINK_ENABLE_SEND_TSFILE_LIMIT = "sink.enable-send-tsfile-limit"; + public static final boolean CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE = false; + public static final String CONNECTOR_RATE_LIMIT_KEY = "connector.rate-limit-bytes-per-second"; public static final String SINK_RATE_LIMIT_KEY = "sink.rate-limit-bytes-per-second"; public static final double CONNECTOR_RATE_LIMIT_DEFAULT_VALUE = -1; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRPCRateLimiter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRPCRateLimiter.java new file mode 100644 index 00000000000..ee450d16330 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRPCRateLimiter.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.connector.limiter; + +import org.apache.iotdb.commons.pipe.config.PipeConfig; + +/** This is a global rate limiter for all connectors. */ +public class GlobalRPCRateLimiter extends GlobalRateLimiter { + + private static final PipeConfig CONFIG = PipeConfig.getInstance(); + + @Override + protected double getThroughputBytesPerSecond() { + return CONFIG.getPipeAllConnectorsRateLimitBytesPerSecond(); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRateLimiter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRateLimiter.java index 08190eff500..bb583901796 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRateLimiter.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRateLimiter.java @@ -26,18 +26,17 @@ import com.google.common.util.concurrent.RateLimiter; import java.util.concurrent.TimeUnit; -/** This is a global rate limiter for all connectors. */ -public class GlobalRateLimiter { - - private static final PipeConfig CONFIG = PipeConfig.getInstance(); +public abstract class GlobalRateLimiter { private final AtomicDouble throughputBytesPerSecond = - new AtomicDouble(CONFIG.getPipeAllConnectorsRateLimitBytesPerSecond()); + new AtomicDouble(getThroughputBytesPerSecond()); + private final RateLimiter rateLimiter; public GlobalRateLimiter() { final double throughputBytesPerSecondLimit = throughputBytesPerSecond.get(); rateLimiter = + // if throughput <= 0, disable rate limiting throughputBytesPerSecondLimit <= 0 ? RateLimiter.create(Double.MAX_VALUE) : RateLimiter.create(throughputBytesPerSecondLimit); @@ -71,8 +70,7 @@ public class GlobalRateLimiter { } private boolean reloadParams() { - final double throughputBytesPerSecondLimit = - CONFIG.getPipeAllConnectorsRateLimitBytesPerSecond(); + final double throughputBytesPerSecondLimit = getThroughputBytesPerSecond(); if (throughputBytesPerSecond.get() != throughputBytesPerSecondLimit) { throughputBytesPerSecond.set(throughputBytesPerSecondLimit); @@ -84,4 +82,6 @@ public class GlobalRateLimiter { // For performance, we don't need to acquire rate limiter if throughput <= 0 return throughputBytesPerSecondLimit <= 0; } + + protected abstract double getThroughputBytesPerSecond(); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/TsFileSendRateLimiter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/TsFileSendRateLimiter.java new file mode 100644 index 00000000000..ff5e4e111b3 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/TsFileSendRateLimiter.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.connector.limiter; + +import org.apache.iotdb.commons.pipe.config.PipeConfig; + +public class TsFileSendRateLimiter extends GlobalRateLimiter { + + private static final PipeConfig CONFIG = PipeConfig.getInstance(); + + @Override + protected double getThroughputBytesPerSecond() { + return CONFIG.getPipeSendTsFileRateLimitBytesPerSecond(); + } + + //////////////////////////// Singleton //////////////////////////// + + private static class TsFileSendRateLimiterHolder { + + private static final TsFileSendRateLimiter INSTANCE = new TsFileSendRateLimiter(); + + private TsFileSendRateLimiterHolder() { + // Prevent instantiation + } + } + + public static TsFileSendRateLimiter getInstance() { + return TsFileSendRateLimiter.TsFileSendRateLimiterHolder.INSTANCE; + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java index 01fcd45ffba..444ba634a81 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java @@ -259,6 +259,7 @@ public abstract class IoTDBAirGapConnector extends IoTDBConnector { long position = 0; try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) { while (true) { + mayLimitRateAndRecordIO(readFileBufferSize); final int readLength = reader.read(readBuffer); if (readLength == -1) { break; @@ -294,6 +295,8 @@ public abstract class IoTDBAirGapConnector extends IoTDBConnector { } } + protected abstract void mayLimitRateAndRecordIO(final long requiredBytes); + protected abstract boolean mayNeedHandshakeWhenFail(); protected abstract byte[] getTransferSingleFilePieceBytes( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java index a7dacb49553..ab34a7e5463 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java @@ -25,7 +25,7 @@ import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskConnectorRuntimeE import org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressor; import org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressorConfig; import org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressorFactory; -import org.apache.iotdb.commons.pipe.connector.limiter.GlobalRateLimiter; +import org.apache.iotdb.commons.pipe.connector.limiter.GlobalRPCRateLimiter; import org.apache.iotdb.commons.pipe.connector.limiter.PipeEndPointRateLimiter; import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq; import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler; @@ -165,7 +165,7 @@ public abstract class IoTDBConnector implements PipeConnector { private static final Map<Pair<String, Long>, PipeEndPointRateLimiter> PIPE_END_POINT_RATE_LIMITER_MAP = new ConcurrentHashMap<>(); private double endPointRateLimitBytesPerSecond = -1; - private static final GlobalRateLimiter GLOBAL_RATE_LIMITER = new GlobalRateLimiter(); + private static final GlobalRPCRateLimiter GLOBAL_RATE_LIMITER = new GlobalRPCRateLimiter(); protected boolean isTabletBatchModeEnabled = true; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java index 51e532d737d..c0a94c5645d 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java @@ -173,6 +173,7 @@ public abstract class IoTDBSslSyncConnector extends IoTDBConnector { long position = 0; try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) { while (true) { + mayLimitRateAndRecordIO(readFileBufferSize); final int readLength = reader.read(readBuffer); if (readLength == -1) { break; @@ -242,6 +243,8 @@ public abstract class IoTDBSslSyncConnector extends IoTDBConnector { protected abstract PipeTransferFilePieceReq getTransferMultiFilePieceReq( final String fileName, final long position, final byte[] payLoad) throws IOException; + protected abstract void mayLimitRateAndRecordIO(final long requiredBytes); + @Override public void close() { if (clientManager != null) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java index cd91c788884..698ff13abe6 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java @@ -165,6 +165,7 @@ public enum Metric { PIPE_LINKED_TSFILE_COUNT("pipe_linked_tsfile_count"), PIPE_LINKED_TSFILE_SIZE("pipe_linked_tsfile_size"), PIPE_PHANTOM_REFERENCE_COUNT("pipe_phantom_reference_count"), + PIPE_TSFILE_SEND_DISK_IO("pipe_tsfile_send_disk_io"), PIPE_ASYNC_CONNECTOR_RETRY_EVENT_QUEUE_SIZE("pipe_async_connector_retry_event_queue_size"), PIPE_EVENT_COMMIT_QUEUE_SIZE("pipe_event_commit_queue_size"), PIPE_PROCEDURE("pipe_procedure"),
