This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/1.3.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1ee10537dda88b1e93436a1f2ea252541dc11886 Author: JackieTien97 <[email protected]> AuthorDate: Wed Jul 16 18:07:30 2025 +0800 Revert "Pipe: Added rate limiter for tsFile sending (#15765) (#15947)" This reverts commit 025bf970cf138040b49b59464003e3643a53e984. --- .../protocol/IoTDBConfigRegionAirGapConnector.java | 5 -- .../protocol/IoTDBConfigRegionConnector.java | 5 -- .../task/builder/PipeDataNodeTaskBuilder.java | 68 ++++++++-------------- .../airgap/IoTDBDataRegionAirGapConnector.java | 31 ---------- .../airgap/IoTDBSchemaRegionAirGapConnector.java | 5 -- .../async/IoTDBDataRegionAsyncConnector.java | 14 ----- .../async/handler/PipeTransferTsFileHandler.java | 6 -- .../thrift/sync/IoTDBDataRegionSyncConnector.java | 21 ------- .../thrift/sync/IoTDBSchemaRegionConnector.java | 5 -- .../dataregion/IoTDBDataRegionExtractor.java | 4 +- .../pipe/metric/overview/PipeResourceMetrics.java | 14 +---- .../load/limiter/LoadTsFileRateLimiter.java | 64 +++++++++++++++++--- .../apache/iotdb/commons/conf/CommonConfig.java | 16 ----- .../iotdb/commons/pipe/config/PipeConfig.java | 6 -- .../iotdb/commons/pipe/config/PipeDescriptor.java | 6 -- .../config/constant/PipeConnectorConstant.java | 5 -- .../connector/limiter/GlobalRPCRateLimiter.java | 33 ----------- .../pipe/connector/limiter/GlobalRateLimiter.java | 14 ++--- .../connector/limiter/TsFileSendRateLimiter.java | 47 --------------- .../connector/protocol/IoTDBAirGapConnector.java | 3 - .../pipe/connector/protocol/IoTDBConnector.java | 4 +- .../connector/protocol/IoTDBSslSyncConnector.java | 3 - .../iotdb/commons/service/metric/enums/Metric.java | 1 - 23 files changed, 91 insertions(+), 289 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java index 595182d47d5..c5fdc4765f9 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java @@ -85,11 +85,6 @@ public class IoTDBConfigRegionAirGapConnector extends IoTDBAirGapConnector { return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params); } - @Override - protected void mayLimitRateAndRecordIO(final long requiredBytes) { - // Do nothing - } - @Override protected boolean mayNeedHandshakeWhenFail() { return true; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java index f04e18027b0..3761f696875 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java @@ -97,11 +97,6 @@ public class IoTDBConfigRegionConnector extends IoTDBSslSyncConnector { return PipeTransferConfigSnapshotPieceReq.toTPipeTransferReq(fileName, position, payLoad); } - @Override - protected void mayLimitRateAndRecordIO(final long requiredBytes) { - // Do nothing - } - @Override public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception { throw new UnsupportedOperationException( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java index 6ba1acae9bb..f84cb73fd69 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java @@ -52,10 +52,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE; -import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE; -import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_KEY; -import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_ENABLE_KEY; public class PipeDataNodeTaskBuilder { @@ -173,6 +170,10 @@ public class PipeDataNodeTaskBuilder { extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE) || extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE); + if (!insertionDeletionListeningOptionPair.right + && !shouldTerminatePipeOnAllHistoricalEventsConsumed) { + return; + } } catch (final IllegalPathException e) { LOGGER.warn( "PipeDataNodeTaskBuilder failed to parse 'inclusion' and 'exclusion' parameters: {}", @@ -181,52 +182,29 @@ public class PipeDataNodeTaskBuilder { return; } - if (insertionDeletionListeningOptionPair.right - || shouldTerminatePipeOnAllHistoricalEventsConsumed) { - final Boolean isRealtime = - connectorParameters.getBooleanByKeys( - PipeConnectorConstant.CONNECTOR_REALTIME_FIRST_KEY, - PipeConnectorConstant.SINK_REALTIME_FIRST_KEY); - if (isRealtime == null) { - connectorParameters.addAttribute( - PipeConnectorConstant.CONNECTOR_REALTIME_FIRST_KEY, "false"); - if (insertionDeletionListeningOptionPair.right) { - LOGGER.info( - "PipeDataNodeTaskBuilder: When 'inclusion' contains 'data.delete', 'realtime-first' is defaulted to 'false' to prevent sync issues after deletion."); - } else { - LOGGER.info( - "PipeDataNodeTaskBuilder: When extractor uses snapshot model, 'realtime-first' is defaulted to 'false' to prevent premature halt before transfer completion."); - } - } else if (isRealtime) { - if (insertionDeletionListeningOptionPair.right) { - LOGGER.warn( - "PipeDataNodeTaskBuilder: When 'inclusion' includes 'data.delete', 'realtime-first' set to 'true' may result in data synchronization issues after deletion."); - } else { - LOGGER.warn( - "PipeDataNodeTaskBuilder: When extractor uses snapshot model, 'realtime-first' set to 'true' may cause prevent premature halt before transfer completion."); - } + final Boolean isRealtime = + connectorParameters.getBooleanByKeys( + PipeConnectorConstant.CONNECTOR_REALTIME_FIRST_KEY, + PipeConnectorConstant.SINK_REALTIME_FIRST_KEY); + if (isRealtime == null) { + connectorParameters.addAttribute(PipeConnectorConstant.CONNECTOR_REALTIME_FIRST_KEY, "false"); + if (insertionDeletionListeningOptionPair.right) { + LOGGER.info( + "PipeDataNodeTaskBuilder: When 'inclusion' contains 'data.delete', 'realtime-first' is defaulted to 'false' to prevent sync issues after deletion."); + } else { + LOGGER.info( + "PipeDataNodeTaskBuilder: When extractor uses snapshot model, 'realtime-first' is defaulted to 'false' to prevent premature halt before transfer completion."); } + return; } - final boolean isRealtimeEnabled = - extractorParameters.getBooleanOrDefault( - Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY), - EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE); - - if (isRealtimeEnabled && !shouldTerminatePipeOnAllHistoricalEventsConsumed) { - final Boolean enableSendTsFileLimit = - connectorParameters.getBooleanByKeys( - PipeConnectorConstant.SINK_ENABLE_SEND_TSFILE_LIMIT, - PipeConnectorConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT); - - if (enableSendTsFileLimit == null) { - connectorParameters.addAttribute( - PipeConnectorConstant.SINK_ENABLE_SEND_TSFILE_LIMIT, "true"); - LOGGER.info( - "PipeDataNodeTaskBuilder: When the realtime sync is enabled, we enable rate limiter in sending tsfile by default to reserve disk and network IO for realtime sending."); - } else if (!enableSendTsFileLimit) { + if (isRealtime) { + if (insertionDeletionListeningOptionPair.right) { + LOGGER.warn( + "PipeDataNodeTaskBuilder: When 'inclusion' includes 'data.delete', 'realtime-first' set to 'true' may result in data synchronization issues after deletion."); + } else { LOGGER.warn( - "PipeDataNodeTaskBuilder: When the realtime sync is enabled, not enabling the rate limiter in sending tsfile may introduce delay for realtime sending."); + "PipeDataNodeTaskBuilder: When extractor uses snapshot model, 'realtime-first' set to 'true' may cause prevent premature halt before transfer completion."); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java index 57b92262abb..bc4805e8c8e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java @@ -20,7 +20,6 @@ package org.apache.iotdb.db.pipe.connector.protocol.airgap; import org.apache.iotdb.common.rpc.thrift.TSStatus; -import org.apache.iotdb.commons.pipe.connector.limiter.TsFileSendRateLimiter; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq; @@ -35,12 +34,9 @@ import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; -import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics; import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionConnectorMetrics; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException; -import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; -import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; @@ -53,32 +49,13 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; -import java.util.Arrays; import java.util.Objects; -import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT; -import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE; -import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_ENABLE_SEND_TSFILE_LIMIT; - public class IoTDBDataRegionAirGapConnector extends IoTDBDataNodeAirGapConnector { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataRegionAirGapConnector.class); - private boolean enableSendTsFileLimit; - - @Override - public void customize( - final PipeParameters parameters, final PipeConnectorRuntimeConfiguration configuration) - throws Exception { - super.customize(parameters, configuration); - - enableSendTsFileLimit = - parameters.getBooleanOrDefault( - Arrays.asList(SINK_ENABLE_SEND_TSFILE_LIMIT, CONNECTOR_ENABLE_SEND_TSFILE_LIMIT), - CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE); - } - @Override public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception { // PipeProcessor can change the type of TabletInsertionEvent @@ -316,14 +293,6 @@ public class IoTDBDataRegionAirGapConnector extends IoTDBDataNodeAirGapConnector } } - @Override - protected void mayLimitRateAndRecordIO(final long requiredBytes) { - PipeResourceMetrics.getInstance().recordDiskIO(requiredBytes); - if (enableSendTsFileLimit) { - TsFileSendRateLimiter.getInstance().acquire(requiredBytes); - } - } - @Override protected byte[] getTransferSingleFilePieceBytes( final String fileName, final long position, final byte[] payLoad) throws IOException { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java index 84cc085509c..342807a5dfb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java @@ -143,11 +143,6 @@ public class IoTDBSchemaRegionAirGapConnector extends IoTDBDataNodeAirGapConnect } } - @Override - protected void mayLimitRateAndRecordIO(final long requiredBytes) { - // Do nothing - } - @Override protected byte[] getTransferSingleFilePieceBytes( final String fileName, final long position, final byte[] payLoad) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java index f78a52a8281..0dacf0cb912 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java @@ -78,11 +78,8 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT; -import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY; -import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_ENABLE_SEND_TSFILE_LIMIT; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY; @@ -118,8 +115,6 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector { private final Map<PipeTransferTrackableHandler, PipeTransferTrackableHandler> pendingHandlers = new ConcurrentHashMap<>(); - private boolean enableSendTsFileLimit; - @Override public void validate(final PipeParameterValidator validator) throws Exception { super.validate(validator); @@ -175,11 +170,6 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector { if (isTabletBatchModeEnabled) { tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters); } - - enableSendTsFileLimit = - parameters.getBooleanOrDefault( - Arrays.asList(SINK_ENABLE_SEND_TSFILE_LIMIT, CONNECTOR_ENABLE_SEND_TSFILE_LIMIT), - CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE); } @Override @@ -692,10 +682,6 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector { events.forEach(this::addFailureEventToRetryQueue); } - public boolean isEnableSendTsFileLimit() { - return enableSendTsFileLimit; - } - //////////////////////////// Operations for close //////////////////////////// @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java index dbbf489b5a5..7353ea91e91 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java @@ -22,7 +22,6 @@ package org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient; import org.apache.iotdb.commons.pipe.config.PipeConfig; -import org.apache.iotdb.commons.pipe.connector.limiter.TsFileSendRateLimiter; import org.apache.iotdb.commons.pipe.connector.payload.thrift.response.PipeTransferFilePieceResp; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.utils.RetryUtils; @@ -33,7 +32,6 @@ import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransfer import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealWithModReq; import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; -import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager; import org.apache.iotdb.db.pipe.resource.memory.PipeTsFileMemoryBlock; @@ -166,10 +164,6 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { client.setShouldReturnSelf(false); client.setTimeoutDynamically(clientManager.getConnectionTimeout()); - PipeResourceMetrics.getInstance().recordDiskIO(readFileBufferSize); - if (connector.isEnableSendTsFileLimit()) { - TsFileSendRateLimiter.getInstance().acquire(readFileBufferSize); - } final int readLength = reader.read(readBuffer); if (readLength == -1) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java index e0fd0b11827..d996cac2820 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java @@ -22,7 +22,6 @@ package org.apache.iotdb.db.pipe.connector.protocol.thrift.sync; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient; -import org.apache.iotdb.commons.pipe.connector.limiter.TsFileSendRateLimiter; import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFilePieceReq; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.utils.RetryUtils; @@ -44,7 +43,6 @@ import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; -import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics; import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionConnectorMetrics; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; @@ -67,22 +65,16 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.nio.file.NoSuchFileException; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; -import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT; -import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE; -import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_ENABLE_SEND_TSFILE_LIMIT; - public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataRegionSyncConnector.class); private PipeTransferBatchReqBuilder tabletBatchBuilder; - private boolean enableSendTsFileLimit; @Override public void customize( @@ -94,11 +86,6 @@ public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector { if (isTabletBatchModeEnabled) { tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters); } - - enableSendTsFileLimit = - parameters.getBooleanOrDefault( - Arrays.asList(SINK_ENABLE_SEND_TSFILE_LIMIT, CONNECTOR_ENABLE_SEND_TSFILE_LIMIT), - CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE); } @Override @@ -113,14 +100,6 @@ public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector { return PipeTransferTsFilePieceWithModReq.toTPipeTransferReq(fileName, position, payLoad); } - @Override - protected void mayLimitRateAndRecordIO(final long requiredBytes) { - PipeResourceMetrics.getInstance().recordDiskIO(requiredBytes); - if (enableSendTsFileLimit) { - TsFileSendRateLimiter.getInstance().acquire(requiredBytes); - } - } - @Override public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception { // PipeProcessor can change the type of TabletInsertionEvent diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java index f12a8125f91..f70e18c0651 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java @@ -163,9 +163,4 @@ public class IoTDBSchemaRegionConnector extends IoTDBDataNodeSyncConnector { final String fileName, final long position, final byte[] payLoad) throws IOException { return PipeTransferSchemaSnapshotPieceReq.toTPipeTransferReq(fileName, position, payLoad); } - - @Override - protected void mayLimitRateAndRecordIO(final long requiredBytes) { - // Do nothing - } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java index 7fd7e37139e..8158c22d1a6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java @@ -200,13 +200,11 @@ public class IoTDBDataRegionExtractor extends IoTDBExtractor { SOURCE_HISTORY_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY)) { LOGGER.warn( - "When {}, {}, {} or {} is specified, specifying {}, {}, {}, {}, {} and {} is invalid.", + "When {}, {}, {} or {} is specified, specifying {}, {}, {} and {} is invalid.", SOURCE_START_TIME_KEY, EXTRACTOR_START_TIME_KEY, SOURCE_END_TIME_KEY, EXTRACTOR_END_TIME_KEY, - SOURCE_HISTORY_ENABLE_KEY, - EXTRACTOR_HISTORY_ENABLE_KEY, SOURCE_HISTORY_START_TIME_KEY, EXTRACTOR_HISTORY_START_TIME_KEY, SOURCE_HISTORY_END_TIME_KEY, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java index 19a8060f89c..37f8eb5e26a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java @@ -26,9 +26,7 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager; import org.apache.iotdb.metrics.AbstractMetricService; -import org.apache.iotdb.metrics.impl.DoNothingMetricManager; import org.apache.iotdb.metrics.metricsets.IMetricSet; -import org.apache.iotdb.metrics.type.Counter; import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.metrics.utils.MetricType; @@ -42,8 +40,8 @@ public class PipeResourceMetrics implements IMetricSet { private static final String PIPE_TS_FILE_USED_MEMORY = "PipeTsFileUsedMemory"; private static final String PIPE_TOTAL_MEMORY = "PipeTotalMemory"; + private static final String PIPE_FLOATING_MEMORY = "PipeFloatingMemory"; - private Counter diskIOCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; //////////////////////////// bindTo & unbindFrom (metric framework) //////////////////////////// @@ -98,10 +96,6 @@ public class PipeResourceMetrics implements IMetricSet { MetricLevel.IMPORTANT, PipeDataNodeResourceManager.ref(), PipePhantomReferenceManager::getPhantomReferenceCount); - // tsFile send rate - diskIOCounter = - metricService.getOrCreateCounter( - Metric.PIPE_TSFILE_SEND_DISK_IO.toString(), MetricLevel.IMPORTANT); } @Override @@ -136,12 +130,6 @@ public class PipeResourceMetrics implements IMetricSet { metricService.remove(MetricType.AUTO_GAUGE, Metric.PIPE_LINKED_TSFILE_SIZE.toString()); // phantom reference count metricService.remove(MetricType.AUTO_GAUGE, Metric.PIPE_PHANTOM_REFERENCE_COUNT.toString()); - - metricService.remove(MetricType.RATE, Metric.PIPE_TSFILE_SEND_DISK_IO.toString()); - } - - public void recordDiskIO(final long bytes) { - diskIOCounter.inc(bytes); } //////////////////////////// singleton //////////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/limiter/LoadTsFileRateLimiter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/limiter/LoadTsFileRateLimiter.java index 3be223287a2..eaaf376a819 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/limiter/LoadTsFileRateLimiter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/limiter/LoadTsFileRateLimiter.java @@ -19,28 +19,78 @@ package org.apache.iotdb.db.storageengine.load.limiter; -import org.apache.iotdb.commons.pipe.connector.limiter.GlobalRateLimiter; +import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet; -public class LoadTsFileRateLimiter extends GlobalRateLimiter { +import com.google.common.util.concurrent.AtomicDouble; +import com.google.common.util.concurrent.RateLimiter; + +import java.util.concurrent.TimeUnit; + +public class LoadTsFileRateLimiter { private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); - @Override + private final AtomicDouble throughputBytesPerSecond = + new AtomicDouble(CONFIG.getLoadWriteThroughputBytesPerSecond()); + private final RateLimiter loadWriteRateLimiter; + public void acquire(long bytes) { LoadTsFileCostMetricsSet.getInstance().recordDiskIO(bytes); - super.acquire(bytes); + + if (reloadParams()) { + return; + } + + while (bytes > 0) { + if (bytes > Integer.MAX_VALUE) { + tryAcquireWithRateCheck(Integer.MAX_VALUE); + bytes -= Integer.MAX_VALUE; + } else { + tryAcquireWithRateCheck((int) bytes); + return; + } + } } - @Override - protected double getThroughputBytesPerSecond() { - return CONFIG.getLoadWriteThroughputBytesPerSecond(); + private void tryAcquireWithRateCheck(final int bytes) { + while (!loadWriteRateLimiter.tryAcquire( + bytes, + PipeConfig.getInstance().getRateLimiterHotReloadCheckIntervalMs(), + TimeUnit.MILLISECONDS)) { + if (reloadParams()) { + return; + } + } + } + + private boolean reloadParams() { + final double throughputBytesPerSecondLimit = CONFIG.getLoadWriteThroughputBytesPerSecond(); + + if (throughputBytesPerSecond.get() != throughputBytesPerSecondLimit) { + throughputBytesPerSecond.set(throughputBytesPerSecondLimit); + loadWriteRateLimiter.setRate( + // if throughput <= 0, disable rate limiting + throughputBytesPerSecondLimit <= 0 ? Double.MAX_VALUE : throughputBytesPerSecondLimit); + } + + // For performance, we don't need to acquire rate limiter if throughput <= 0 + return throughputBytesPerSecondLimit <= 0; } //////////////////////////// Singleton //////////////////////////// + private LoadTsFileRateLimiter() { + final double throughputBytesPerSecondLimit = throughputBytesPerSecond.get(); + loadWriteRateLimiter = + // if throughput <= 0, disable rate limiting + throughputBytesPerSecondLimit <= 0 + ? RateLimiter.create(Double.MAX_VALUE) + : RateLimiter.create(throughputBytesPerSecondLimit); + } + private static class LoadTsFileRateLimiterHolder { private static final LoadTsFileRateLimiter INSTANCE = new LoadTsFileRateLimiter(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 9dfe5b68017..dfcbba78cff 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -277,7 +277,6 @@ public class CommonConfig { private int pipeAsyncConnectorMaxTsFileClientNumber = Math.max(16, Runtime.getRuntime().availableProcessors()); - private double pipeSendTsFileRateLimitBytesPerSecond = 32 * MB; private double pipeAllSinksRateLimitBytesPerSecond = -1; private int rateLimiterHotReloadCheckIntervalMs = 1000; @@ -1922,21 +1921,6 @@ public class CommonConfig { logger.info("pipeTransferTsFileSync is set to {}", pipeTransferTsFileSync); } - public double getPipeSendTsFileRateLimitBytesPerSecond() { - return pipeSendTsFileRateLimitBytesPerSecond; - } - - public void setPipeSendTsFileRateLimitBytesPerSecond( - double pipeSendTsFileRateLimitBytesPerSecond) { - if (this.pipeSendTsFileRateLimitBytesPerSecond == pipeSendTsFileRateLimitBytesPerSecond) { - return; - } - this.pipeSendTsFileRateLimitBytesPerSecond = pipeSendTsFileRateLimitBytesPerSecond; - logger.info( - "pipeSendTsFileRateLimitBytesPerSecond is set to {}", - pipeSendTsFileRateLimitBytesPerSecond); - } - public double getPipeAllSinksRateLimitBytesPerSecond() { return pipeAllSinksRateLimitBytesPerSecond; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index 620666ded48..4ce2fd19093 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -217,10 +217,6 @@ public class PipeConfig { return COMMON_CONFIG.getPipeAsyncConnectorMaxTsFileClientNumber(); } - public double getPipeSendTsFileRateLimitBytesPerSecond() { - return COMMON_CONFIG.getPipeSendTsFileRateLimitBytesPerSecond(); - } - public double getPipeAllConnectorsRateLimitBytesPerSecond() { return COMMON_CONFIG.getPipeAllSinksRateLimitBytesPerSecond(); } @@ -541,8 +537,6 @@ public class PipeConfig { "PipeAsyncConnectorMaxTsFileClientNumber: {}", getPipeAsyncConnectorMaxTsFileClientNumber()); - LOGGER.info( - "PipeSendTsFileRateLimitBytesPerSecond: {}", getPipeSendTsFileRateLimitBytesPerSecond()); LOGGER.info( "PipeAllConnectorsRateLimitBytesPerSecond: {}", getPipeAllConnectorsRateLimitBytesPerSecond()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java index 7822934478e..d6bad9d18f6 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java @@ -594,12 +594,6 @@ public class PipeDescriptor { config.setPipeAsyncConnectorMaxTsFileClientNumber(Integer.parseInt(value)); } - value = - parserPipeConfig(properties, "pipe_send_tsfile_rate_limit_bytes_per_second", isHotModify); - if (value != null) { - config.setPipeSendTsFileRateLimitBytesPerSecond(Double.parseDouble(value)); - } - value = parserPipeConfig(properties, "pipe_all_sinks_rate_limit_bytes_per_second", isHotModify); if (value != null) { config.setPipeAllSinksRateLimitBytesPerSecond(Double.parseDouble(value)); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java index ede91b27652..c6780934703 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java @@ -215,11 +215,6 @@ public class PipeConnectorConstant { public static final int CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MIN_VALUE = Zstd.minCompressionLevel(); public static final int CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MAX_VALUE = Zstd.maxCompressionLevel(); - public static final String CONNECTOR_ENABLE_SEND_TSFILE_LIMIT = - "connector.enable-send-tsfile-limit"; - public static final String SINK_ENABLE_SEND_TSFILE_LIMIT = "sink.enable-send-tsfile-limit"; - public static final boolean CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE = false; - public static final String CONNECTOR_RATE_LIMIT_KEY = "connector.rate-limit-bytes-per-second"; public static final String SINK_RATE_LIMIT_KEY = "sink.rate-limit-bytes-per-second"; public static final double CONNECTOR_RATE_LIMIT_DEFAULT_VALUE = -1; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRPCRateLimiter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRPCRateLimiter.java deleted file mode 100644 index ee450d16330..00000000000 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRPCRateLimiter.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.commons.pipe.connector.limiter; - -import org.apache.iotdb.commons.pipe.config.PipeConfig; - -/** This is a global rate limiter for all connectors. */ -public class GlobalRPCRateLimiter extends GlobalRateLimiter { - - private static final PipeConfig CONFIG = PipeConfig.getInstance(); - - @Override - protected double getThroughputBytesPerSecond() { - return CONFIG.getPipeAllConnectorsRateLimitBytesPerSecond(); - } -} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRateLimiter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRateLimiter.java index bb583901796..08190eff500 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRateLimiter.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRateLimiter.java @@ -26,17 +26,18 @@ import com.google.common.util.concurrent.RateLimiter; import java.util.concurrent.TimeUnit; -public abstract class GlobalRateLimiter { +/** This is a global rate limiter for all connectors. */ +public class GlobalRateLimiter { - private final AtomicDouble throughputBytesPerSecond = - new AtomicDouble(getThroughputBytesPerSecond()); + private static final PipeConfig CONFIG = PipeConfig.getInstance(); + private final AtomicDouble throughputBytesPerSecond = + new AtomicDouble(CONFIG.getPipeAllConnectorsRateLimitBytesPerSecond()); private final RateLimiter rateLimiter; public GlobalRateLimiter() { final double throughputBytesPerSecondLimit = throughputBytesPerSecond.get(); rateLimiter = - // if throughput <= 0, disable rate limiting throughputBytesPerSecondLimit <= 0 ? RateLimiter.create(Double.MAX_VALUE) : RateLimiter.create(throughputBytesPerSecondLimit); @@ -70,7 +71,8 @@ public abstract class GlobalRateLimiter { } private boolean reloadParams() { - final double throughputBytesPerSecondLimit = getThroughputBytesPerSecond(); + final double throughputBytesPerSecondLimit = + CONFIG.getPipeAllConnectorsRateLimitBytesPerSecond(); if (throughputBytesPerSecond.get() != throughputBytesPerSecondLimit) { throughputBytesPerSecond.set(throughputBytesPerSecondLimit); @@ -82,6 +84,4 @@ public abstract class GlobalRateLimiter { // For performance, we don't need to acquire rate limiter if throughput <= 0 return throughputBytesPerSecondLimit <= 0; } - - protected abstract double getThroughputBytesPerSecond(); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/TsFileSendRateLimiter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/TsFileSendRateLimiter.java deleted file mode 100644 index ff5e4e111b3..00000000000 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/TsFileSendRateLimiter.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.commons.pipe.connector.limiter; - -import org.apache.iotdb.commons.pipe.config.PipeConfig; - -public class TsFileSendRateLimiter extends GlobalRateLimiter { - - private static final PipeConfig CONFIG = PipeConfig.getInstance(); - - @Override - protected double getThroughputBytesPerSecond() { - return CONFIG.getPipeSendTsFileRateLimitBytesPerSecond(); - } - - //////////////////////////// Singleton //////////////////////////// - - private static class TsFileSendRateLimiterHolder { - - private static final TsFileSendRateLimiter INSTANCE = new TsFileSendRateLimiter(); - - private TsFileSendRateLimiterHolder() { - // Prevent instantiation - } - } - - public static TsFileSendRateLimiter getInstance() { - return TsFileSendRateLimiter.TsFileSendRateLimiterHolder.INSTANCE; - } -} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java index 444ba634a81..01fcd45ffba 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java @@ -259,7 +259,6 @@ public abstract class IoTDBAirGapConnector extends IoTDBConnector { long position = 0; try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) { while (true) { - mayLimitRateAndRecordIO(readFileBufferSize); final int readLength = reader.read(readBuffer); if (readLength == -1) { break; @@ -295,8 +294,6 @@ public abstract class IoTDBAirGapConnector extends IoTDBConnector { } } - protected abstract void mayLimitRateAndRecordIO(final long requiredBytes); - protected abstract boolean mayNeedHandshakeWhenFail(); protected abstract byte[] getTransferSingleFilePieceBytes( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java index ab34a7e5463..a7dacb49553 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java @@ -25,7 +25,7 @@ import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskConnectorRuntimeE import org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressor; import org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressorConfig; import org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressorFactory; -import org.apache.iotdb.commons.pipe.connector.limiter.GlobalRPCRateLimiter; +import org.apache.iotdb.commons.pipe.connector.limiter.GlobalRateLimiter; import org.apache.iotdb.commons.pipe.connector.limiter.PipeEndPointRateLimiter; import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq; import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler; @@ -165,7 +165,7 @@ public abstract class IoTDBConnector implements PipeConnector { private static final Map<Pair<String, Long>, PipeEndPointRateLimiter> PIPE_END_POINT_RATE_LIMITER_MAP = new ConcurrentHashMap<>(); private double endPointRateLimitBytesPerSecond = -1; - private static final GlobalRPCRateLimiter GLOBAL_RATE_LIMITER = new GlobalRPCRateLimiter(); + private static final GlobalRateLimiter GLOBAL_RATE_LIMITER = new GlobalRateLimiter(); protected boolean isTabletBatchModeEnabled = true; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java index c0a94c5645d..51e532d737d 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java @@ -173,7 +173,6 @@ public abstract class IoTDBSslSyncConnector extends IoTDBConnector { long position = 0; try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) { while (true) { - mayLimitRateAndRecordIO(readFileBufferSize); final int readLength = reader.read(readBuffer); if (readLength == -1) { break; @@ -243,8 +242,6 @@ public abstract class IoTDBSslSyncConnector extends IoTDBConnector { protected abstract PipeTransferFilePieceReq getTransferMultiFilePieceReq( final String fileName, final long position, final byte[] payLoad) throws IOException; - protected abstract void mayLimitRateAndRecordIO(final long requiredBytes); - @Override public void close() { if (clientManager != null) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java index 698ff13abe6..cd91c788884 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java @@ -165,7 +165,6 @@ public enum Metric { PIPE_LINKED_TSFILE_COUNT("pipe_linked_tsfile_count"), PIPE_LINKED_TSFILE_SIZE("pipe_linked_tsfile_size"), PIPE_PHANTOM_REFERENCE_COUNT("pipe_phantom_reference_count"), - PIPE_TSFILE_SEND_DISK_IO("pipe_tsfile_send_disk_io"), PIPE_ASYNC_CONNECTOR_RETRY_EVENT_QUEUE_SIZE("pipe_async_connector_retry_event_queue_size"), PIPE_EVENT_COMMIT_QUEUE_SIZE("pipe_event_commit_queue_size"), PIPE_PROCEDURE("pipe_procedure"),
