This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch 13290-1.3.3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ad197c121779f590dad790db309970e36d4b2cc1 Author: Steve Yurong Su <[email protected]> AuthorDate: Mon Aug 26 17:29:43 2024 +0800 Pipe: Avoid req size excceeding the thrift max frame size by slicing the original req automatically (#13290) (cherry picked from commit 41ee831b7e24a46549cd6946e5b677f5dd39edf2) --- .../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 + .../pipe/metric/PipeDataNodeReceiverMetrics.java | 63 ++++++ .../protocol/thrift/IoTDBDataNodeReceiver.java | 237 +++++++++++++++------ .../apache/iotdb/commons/conf/CommonConfig.java | 13 ++ .../iotdb/commons/conf/CommonDescriptor.java | 6 + .../iotdb/commons/pipe/config/PipeConfig.java | 8 + .../pipe/connector/client/IoTDBSyncClient.java | 76 +++++++ .../thrift/common/PipeTransferSliceReqHandler.java | 135 ++++++++++++ .../payload/thrift/request/PipeRequestType.java | 3 + .../thrift/request/PipeTransferSliceReq.java | 171 +++++++++++++++ 10 files changed, 643 insertions(+), 70 deletions(-) diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index 7ce87a64886..b095e81c71e 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -231,6 +231,7 @@ public enum TSStatusCode { PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION(1809), PIPE_RECEIVER_USER_CONFLICT_EXCEPTION(1810), PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED(1811), + PIPE_TRANSFER_SLICE_OUT_OF_ORDER(1812), // Subscription SUBSCRIPTION_VERSION_ERROR(1900), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeReceiverMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeReceiverMetrics.java index eddad988041..8405e48085a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeReceiverMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeReceiverMetrics.java @@ -45,6 +45,9 @@ public class PipeDataNodeReceiverMetrics implements IMetricSet { private Timer transferSchemaPlanTimer = DoNothingMetricManager.DO_NOTHING_TIMER; private Timer transferSchemaSnapshotPieceTimer = DoNothingMetricManager.DO_NOTHING_TIMER; private Timer transferSchemaSnapshotSealTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + private Timer transferConfigPlanTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + private Timer transferCompressedTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + private Timer transferSliceTimer = DoNothingMetricManager.DO_NOTHING_TIMER; private static final String RECEIVER = "pipeDataNodeReceiver"; @@ -102,6 +105,18 @@ public class PipeDataNodeReceiverMetrics implements IMetricSet { transferSchemaSnapshotSealTimer.updateNanos(costTimeInNanos); } + public void recordTransferConfigPlanTimer(long costTimeInNanos) { + transferConfigPlanTimer.updateNanos(costTimeInNanos); + } + + public void recordTransferCompressedTimer(long costTimeInNanos) { + transferCompressedTimer.updateNanos(costTimeInNanos); + } + + public void recordTransferSliceTimer(long costTimeInNanos) { + transferSliceTimer.updateNanos(costTimeInNanos); + } + @Override public void bindTo(AbstractMetricService metricService) { bindToTimer(metricService); @@ -212,6 +227,30 @@ public class PipeDataNodeReceiverMetrics implements IMetricSet { RECEIVER, Tag.TYPE.toString(), "transferSchemaSnapshotSeal"); + transferConfigPlanTimer = + metricService.getOrCreateTimer( + Metric.PIPE_DATANODE_RECEIVER.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferConfigPlan"); + transferCompressedTimer = + metricService.getOrCreateTimer( + Metric.PIPE_DATANODE_RECEIVER.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferCompressed"); + transferSliceTimer = + metricService.getOrCreateTimer( + Metric.PIPE_DATANODE_RECEIVER.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferSlice"); } @Override @@ -233,6 +272,9 @@ public class PipeDataNodeReceiverMetrics implements IMetricSet { transferSchemaPlanTimer = DoNothingMetricManager.DO_NOTHING_TIMER; transferSchemaSnapshotPieceTimer = DoNothingMetricManager.DO_NOTHING_TIMER; transferSchemaSnapshotSealTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + transferConfigPlanTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + transferCompressedTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + transferSliceTimer = DoNothingMetricManager.DO_NOTHING_TIMER; metricService.remove( MetricType.TIMER, @@ -325,6 +367,27 @@ public class PipeDataNodeReceiverMetrics implements IMetricSet { RECEIVER, Tag.TYPE.toString(), "transferSchemaSnapshotSeal"); + metricService.remove( + MetricType.TIMER, + Metric.PIPE_DATANODE_RECEIVER.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferConfigPlan"); + metricService.remove( + MetricType.TIMER, + Metric.PIPE_DATANODE_RECEIVER.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferCompressed"); + metricService.remove( + MetricType.TIMER, + Metric.PIPE_DATANODE_RECEIVER.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferSlice"); } public static PipeDataNodeReceiverMetrics getInstance() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index b70f833a7c5..20a34319d35 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -24,10 +24,12 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.pipe.connector.PipeReceiverStatusHandler; import org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest; +import org.apache.iotdb.commons.pipe.connector.payload.thrift.common.PipeTransferSliceReqHandler; import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeRequestType; import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq; import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV1; import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV2; +import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferSliceReq; import org.apache.iotdb.commons.pipe.pattern.IoTDBPipePattern; import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver; import org.apache.iotdb.db.auth.AuthorityChecker; @@ -130,6 +132,8 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { private static final AtomicLong CONFIG_RECEIVER_ID_GENERATOR = new AtomicLong(0); protected final AtomicReference<String> configReceiverId = new AtomicReference<>(); + private final PipeTransferSliceReqHandler sliceReqHandler = new PipeTransferSliceReqHandler(); + static { try { folderManager = @@ -145,111 +149,185 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { @Override public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { try { - long startTime = System.nanoTime(); + final long startTime = System.nanoTime(); final short rawRequestType = req.getType(); if (PipeRequestType.isValidatedRequestType(rawRequestType)) { - TPipeTransferResp resp; - switch (PipeRequestType.valueOf(rawRequestType)) { + final PipeRequestType requestType = PipeRequestType.valueOf(rawRequestType); + if (requestType != PipeRequestType.TRANSFER_SLICE) { + sliceReqHandler.clear(); + } + switch (requestType) { case HANDSHAKE_DATANODE_V1: - resp = - handleTransferHandshakeV1( + { + try { + return handleTransferHandshakeV1( PipeTransferDataNodeHandshakeV1Req.fromTPipeTransferReq(req)); - PipeDataNodeReceiverMetrics.getInstance() - .recordHandshakeDatanodeV1Timer(System.nanoTime() - startTime); - return resp; + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordHandshakeDatanodeV1Timer(System.nanoTime() - startTime); + } + } case HANDSHAKE_DATANODE_V2: - resp = - handleTransferHandshakeV2( + { + try { + return handleTransferHandshakeV2( PipeTransferDataNodeHandshakeV2Req.fromTPipeTransferReq(req)); - PipeDataNodeReceiverMetrics.getInstance() - .recordHandshakeDatanodeV2Timer(System.nanoTime() - startTime); - return resp; + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordHandshakeDatanodeV2Timer(System.nanoTime() - startTime); + } + } case TRANSFER_TABLET_INSERT_NODE: - resp = - handleTransferTabletInsertNode( + { + try { + return handleTransferTabletInsertNode( PipeTransferTabletInsertNodeReq.fromTPipeTransferReq(req)); - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTabletInsertNodeTimer(System.nanoTime() - startTime); - return resp; + + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTabletInsertNodeTimer(System.nanoTime() - startTime); + } + } case TRANSFER_TABLET_RAW: - resp = handleTransferTabletRaw(PipeTransferTabletRawReq.fromTPipeTransferReq(req)); - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTabletRawTimer(System.nanoTime() - startTime); - return resp; + { + try { + return handleTransferTabletRaw(PipeTransferTabletRawReq.fromTPipeTransferReq(req)); + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTabletRawTimer(System.nanoTime() - startTime); + } + } case TRANSFER_TABLET_BINARY: - resp = - handleTransferTabletBinary(PipeTransferTabletBinaryReq.fromTPipeTransferReq(req)); - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTabletBinaryTimer(System.nanoTime() - startTime); - return resp; + { + try { + return handleTransferTabletBinary( + PipeTransferTabletBinaryReq.fromTPipeTransferReq(req)); + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTabletBinaryTimer(System.nanoTime() - startTime); + } + } case TRANSFER_TABLET_BATCH: - resp = handleTransferTabletBatch(PipeTransferTabletBatchReq.fromTPipeTransferReq(req)); - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTabletBatchTimer(System.nanoTime() - startTime); - return resp; + { + try { + return handleTransferTabletBatch( + PipeTransferTabletBatchReq.fromTPipeTransferReq(req)); + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTabletBatchTimer(System.nanoTime() - startTime); + } + } case TRANSFER_TS_FILE_PIECE: - resp = - handleTransferFilePiece( + { + try { + return handleTransferFilePiece( PipeTransferTsFilePieceReq.fromTPipeTransferReq(req), req instanceof AirGapPseudoTPipeTransferRequest, true); - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTsFilePieceTimer(System.nanoTime() - startTime); - return resp; + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTsFilePieceTimer(System.nanoTime() - startTime); + } + } case TRANSFER_TS_FILE_SEAL: - resp = handleTransferFileSealV1(PipeTransferTsFileSealReq.fromTPipeTransferReq(req)); - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTsFileSealTimer(System.nanoTime() - startTime); - return resp; + { + try { + return handleTransferFileSealV1( + PipeTransferTsFileSealReq.fromTPipeTransferReq(req)); + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTsFileSealTimer(System.nanoTime() - startTime); + } + } case TRANSFER_TS_FILE_PIECE_WITH_MOD: - resp = - handleTransferFilePiece( + { + try { + return handleTransferFilePiece( PipeTransferTsFilePieceWithModReq.fromTPipeTransferReq(req), req instanceof AirGapPseudoTPipeTransferRequest, false); - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTsFilePieceWithModTimer(System.nanoTime() - startTime); - return resp; + + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTsFilePieceWithModTimer(System.nanoTime() - startTime); + } + } case TRANSFER_TS_FILE_SEAL_WITH_MOD: - resp = - handleTransferFileSealV2( + { + try { + return handleTransferFileSealV2( PipeTransferTsFileSealWithModReq.fromTPipeTransferReq(req)); - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTsFileSealWithModTimer(System.nanoTime() - startTime); - return resp; + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferTsFileSealWithModTimer(System.nanoTime() - startTime); + } + } case TRANSFER_SCHEMA_PLAN: - resp = handleTransferSchemaPlan(PipeTransferPlanNodeReq.fromTPipeTransferReq(req)); - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferSchemaPlanTimer(System.nanoTime() - startTime); - return resp; + { + try { + return handleTransferSchemaPlan(PipeTransferPlanNodeReq.fromTPipeTransferReq(req)); + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferSchemaPlanTimer(System.nanoTime() - startTime); + } + } case TRANSFER_SCHEMA_SNAPSHOT_PIECE: - resp = - handleTransferFilePiece( + { + try { + return handleTransferFilePiece( PipeTransferSchemaSnapshotPieceReq.fromTPipeTransferReq(req), req instanceof AirGapPseudoTPipeTransferRequest, false); - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferSchemaSnapshotPieceTimer(System.nanoTime() - startTime); - return resp; + + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferSchemaSnapshotPieceTimer(System.nanoTime() - startTime); + } + } case TRANSFER_SCHEMA_SNAPSHOT_SEAL: - resp = - handleTransferFileSealV2( + { + try { + return handleTransferFileSealV2( PipeTransferSchemaSnapshotSealReq.fromTPipeTransferReq(req)); - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferSchemaSnapshotSealTimer(System.nanoTime() - startTime); - return resp; + + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferSchemaSnapshotSealTimer(System.nanoTime() - startTime); + } + } case HANDSHAKE_CONFIGNODE_V1: case HANDSHAKE_CONFIGNODE_V2: case TRANSFER_CONFIG_PLAN: case TRANSFER_CONFIG_SNAPSHOT_PIECE: case TRANSFER_CONFIG_SNAPSHOT_SEAL: - // Config requests will first be received by the DataNode receiver, - // then transferred to ConfigNode receiver to execute. - resp = handleTransferConfigPlan(req); - return resp; + { + try { + // Config requests will first be received by the DataNode receiver, + // then transferred to ConfigNode receiver to execute. + return handleTransferConfigPlan(req); + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferConfigPlanTimer(System.nanoTime() - startTime); + } + } + case TRANSFER_SLICE: + { + try { + return handleTransferSlice(PipeTransferSliceReq.fromTPipeTransferReq(req)); + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferSliceTimer(System.nanoTime() - startTime); + } + } case TRANSFER_COMPRESSED: - resp = receive(PipeTransferCompressedReq.fromTPipeTransferReq(req)); - return resp; + { + try { + return receive(PipeTransferCompressedReq.fromTPipeTransferReq(req)); + } finally { + PipeDataNodeReceiverMetrics.getInstance() + .recordTransferCompressedTimer(System.nanoTime() - startTime); + } + } default: break; } @@ -418,6 +496,25 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { return configReceiverId.get(); } + private TPipeTransferResp handleTransferSlice(final PipeTransferSliceReq pipeTransferSliceReq) { + final boolean isInorder = sliceReqHandler.receiveSlice(pipeTransferSliceReq); + if (!isInorder) { + return new TPipeTransferResp( + RpcUtils.getStatus( + TSStatusCode.PIPE_TRANSFER_SLICE_OUT_OF_ORDER, + "Slice request is out of order, please check the request sequence.")); + } + final Optional<TPipeTransferReq> req = sliceReqHandler.makeReqIfComplete(); + if (!req.isPresent()) { + return new TPipeTransferResp( + RpcUtils.getStatus( + TSStatusCode.SUCCESS_STATUS, + "Slice received, waiting for more slices to complete the request.")); + } + // sliceReqHandler will be cleared in the receive(req) method + return receive(req.get()); + } + /** * For {@link InsertRowsStatement} and {@link InsertMultiTabletsStatement}, the returned {@link * TSStatus} will use sub-status to record the endpoint for redirection. Each sub-status records diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 245614d2c09..e897061f424 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.enums.HandleSystemErrorStrategy; import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime; import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.commons.utils.KillPoint.KillPoint; +import org.apache.iotdb.rpc.RpcUtils; import org.apache.tsfile.fileSystem.FSType; import org.slf4j.Logger; @@ -214,6 +215,9 @@ public class CommonConfig { private double pipeAllSinksRateLimitBytesPerSecond = -1; private int rateLimiterHotReloadCheckIntervalMs = 1000; + private int pipeConnectorRequestSliceThresholdBytes = + (int) (RpcUtils.THRIFT_FRAME_MAX_SIZE * 0.8); + private boolean isSeperatedPipeHeartbeatEnabled = true; private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 100; private long pipeMetaSyncerInitialSyncDelayMinutes = 3; @@ -1106,6 +1110,15 @@ public class CommonConfig { this.rateLimiterHotReloadCheckIntervalMs = rateLimiterHotReloadCheckIntervalMs; } + public int getPipeConnectorRequestSliceThresholdBytes() { + return pipeConnectorRequestSliceThresholdBytes; + } + + public void setPipeConnectorRequestSliceThresholdBytes( + int pipeConnectorRequestSliceThresholdBytes) { + this.pipeConnectorRequestSliceThresholdBytes = pipeConnectorRequestSliceThresholdBytes; + } + public long getTwoStageAggregateMaxCombinerLiveTimeInMs() { return twoStageAggregateMaxCombinerLiveTimeInMs; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index 36003ec30e1..c41c932c395 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -409,6 +409,12 @@ public class CommonDescriptor { "rate_limiter_hot_reload_check_interval_ms", String.valueOf(config.getRateLimiterHotReloadCheckIntervalMs())))); + config.setPipeConnectorRequestSliceThresholdBytes( + Integer.parseInt( + properties.getProperty( + "pipe_connector_request_slice_threshold_bytes", + String.valueOf(config.getPipeConnectorRequestSliceThresholdBytes())))); + config.setSeperatedPipeHeartbeatEnabled( Boolean.parseBoolean( properties.getProperty( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index b215c7a3200..fc9fb0911a0 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -144,6 +144,10 @@ public class PipeConfig { return COMMON_CONFIG.getRateLimiterHotReloadCheckIntervalMs(); } + public int getPipeConnectorRequestSliceThresholdBytes() { + return COMMON_CONFIG.getPipeConnectorRequestSliceThresholdBytes(); + } + public float getPipeLeaderCacheMemoryUsagePercentage() { return COMMON_CONFIG.getPipeLeaderCacheMemoryUsagePercentage(); } @@ -375,6 +379,10 @@ public class PipeConfig { LOGGER.info( "RateLimiterHotReloadCheckIntervalMs: {}", getRateLimiterHotReloadCheckIntervalMs()); + LOGGER.info( + "PipeConnectorRequestSliceThresholdBytes: {}", + getPipeConnectorRequestSliceThresholdBytes()); + LOGGER.info("SeperatedPipeHeartbeatEnabled: {}", isSeperatedPipeHeartbeatEnabled()); LOGGER.info( "PipeHeartbeatIntervalSecondsForCollectingPipeMeta: {}", diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java index f15934afd5e..d79c430474c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java @@ -22,16 +22,32 @@ package org.apache.iotdb.commons.pipe.connector.client; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.client.property.ThriftClientProperty; +import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion; +import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferSliceReq; +import org.apache.iotdb.pipe.api.exception.PipeConnectionException; import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.rpc.TimeoutChangeableTransport; import org.apache.iotdb.service.rpc.thrift.IClientRPCService; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; +import org.apache.thrift.TException; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicInteger; public class IoTDBSyncClient extends IClientRPCService.Client implements ThriftClient, AutoCloseable { + private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSyncClient.class); + + private static final AtomicInteger SLICE_ORDER_ID_GENERATOR = new AtomicInteger(0); + private final String ipAddress; private final int port; private final TEndPoint endPoint; @@ -82,6 +98,66 @@ public class IoTDBSyncClient extends IClientRPCService.Client ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout); } + @Override + public TPipeTransferResp pipeTransfer(final TPipeTransferReq req) throws TException { + final int bodySizeLimit = PipeConfig.getInstance().getPipeConnectorRequestSliceThresholdBytes(); + if (req.getVersion() != IoTDBConnectorRequestVersion.VERSION_1.getVersion() + || req.body.limit() < bodySizeLimit) { + return super.pipeTransfer(req); + } + + LOGGER.warn( + "The body size of the request is too large. The request will be sliced. Origin req: {}-{}. " + + "Request body size: {}, threshold: {}", + req.getVersion(), + req.getType(), + req.body.limit(), + bodySizeLimit); + + try { + final int sliceOrderId = SLICE_ORDER_ID_GENERATOR.getAndIncrement(); + // Slice the buffer to avoid the buffer being too large + final int sliceCount = + req.body.limit() / bodySizeLimit + (req.body.limit() % bodySizeLimit == 0 ? 0 : 1); + for (int i = 0; i < sliceCount; ++i) { + final int startIndexInBody = i * bodySizeLimit; + final int endIndexInBody = Math.min((i + 1) * bodySizeLimit, req.body.limit()); + final TPipeTransferResp sliceResp = + super.pipeTransfer( + PipeTransferSliceReq.toTPipeTransferReq( + sliceOrderId, + req.getType(), + i, + sliceCount, + req.body.duplicate(), + startIndexInBody, + endIndexInBody)); + + if (i == sliceCount - 1) { + return sliceResp; + } + + if (sliceResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new PipeConnectionException( + String.format( + "Failed to transfer slice. Origin req: %s-%s, slice index: %d, slice count: %d. Reason: %s", + req.getVersion(), req.getType(), i, sliceCount, sliceResp.getStatus())); + } + } + + // Should not reach here + return super.pipeTransfer(req); + } catch (final Exception e) { + LOGGER.warn( + "Failed to transfer slice. Origin req: {}-{}. Retry the whole transfer.", + req.getVersion(), + req.getType(), + e); + // Fall back to the original behavior + return super.pipeTransfer(req); + } + } + @Override public void close() throws Exception { invalidate(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferSliceReqHandler.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferSliceReqHandler.java new file mode 100644 index 00000000000..b3bd16acc7d --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferSliceReqHandler.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.connector.payload.thrift.common; + +import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion; +import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferSliceReq; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +public class PipeTransferSliceReqHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(PipeTransferSliceReqHandler.class); + + private int orderId = -1; + + private short originReqType = -1; + private int originBodySize = -1; + + private int sliceCount = -1; + private final List<byte[]> sliceBodies = new ArrayList<>(); + + public boolean receiveSlice(final PipeTransferSliceReq req) { + if (orderId == -1 + || originReqType == -1 + || originBodySize == -1 + || sliceCount == -1 + || sliceBodies.isEmpty()) { + if (orderId == -1 + && originReqType == -1 + && originBodySize == -1 + && sliceCount == -1 + && sliceBodies.isEmpty()) { + orderId = req.getOrderId(); + originReqType = req.getOriginReqType(); + originBodySize = req.getOriginBodySize(); + sliceCount = req.getSliceCount(); + } else { + LOGGER.warn( + "Invalid state: orderId={}, originReqType={}, originBodySize={}, sliceCount={}, sliceBodies.size={}", + orderId, + originReqType, + originBodySize, + sliceCount, + sliceBodies.size()); + clear(); + return false; + } + } + + if (orderId != req.getOrderId()) { + LOGGER.warn("Order ID mismatch: expected {}, actual {}", orderId, req.getOrderId()); + clear(); + return false; + } + if (originReqType != req.getOriginReqType()) { + LOGGER.warn( + "Origin request type mismatch: expected {}, actual {}", + originReqType, + req.getOriginReqType()); + clear(); + return false; + } + if (originBodySize != req.getOriginBodySize()) { + LOGGER.warn( + "Origin body size mismatch: expected {}, actual {}", + originBodySize, + req.getOriginBodySize()); + clear(); + return false; + } + if (sliceCount != req.getSliceCount()) { + LOGGER.warn("Slice count mismatch: expected {}, actual {}", sliceCount, req.getSliceCount()); + clear(); + return false; + } + if (sliceBodies.size() != req.getSliceIndex()) { + LOGGER.warn( + "Invalid slice index: expected {}, actual {}", sliceBodies.size(), req.getSliceIndex()); + clear(); + return false; + } + + sliceBodies.add(req.getSliceBody()); + return true; + } + + public Optional<TPipeTransferReq> makeReqIfComplete() { + if (sliceBodies.size() != sliceCount) { + return Optional.empty(); + } + + final TPipeTransferReq req = new TPipeTransferReq(); + req.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion(); + req.type = originReqType; + + final ByteBuffer body = ByteBuffer.allocate(originBodySize); + sliceBodies.forEach(body::put); + body.flip(); + req.body = body; + + return Optional.of(req); + } + + public void clear() { + orderId = -1; + originReqType = -1; + originBodySize = -1; + sliceCount = -1; + sliceBodies.clear(); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java index 003c8b9afb3..0f9b6f4dafb 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java @@ -53,6 +53,9 @@ public enum PipeRequestType { // RPC Compression TRANSFER_COMPRESSED((short) 300), + + // Fallback Handling + TRANSFER_SLICE((short) 400), ; private final short type; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferSliceReq.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferSliceReq.java new file mode 100644 index 00000000000..4df6008400d --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferSliceReq.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.connector.payload.thrift.request; + +import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; + +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.PublicBAOS; +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Objects; + +public class PipeTransferSliceReq extends TPipeTransferReq { + + private transient int orderId; + + private transient short originReqType; + private transient int originBodySize; + + private transient byte[] sliceBody; + + private transient int sliceIndex; + private transient int sliceCount; + + public int getOrderId() { + return orderId; + } + + public short getOriginReqType() { + return originReqType; + } + + public int getOriginBodySize() { + return originBodySize; + } + + public byte[] getSliceBody() { + return sliceBody; + } + + public int getSliceIndex() { + return sliceIndex; + } + + public int getSliceCount() { + return sliceCount; + } + + /////////////////////////////// Thrift /////////////////////////////// + + public static PipeTransferSliceReq toTPipeTransferReq( + final int orderId, + final short originReqType, + final int sliceIndex, + final int sliceCount, + final ByteBuffer duplicatedOriginBody, + final int startIndexInBody, + final int endIndexInBody) + throws IOException { + final PipeTransferSliceReq sliceReq = new PipeTransferSliceReq(); + + sliceReq.orderId = orderId; + + sliceReq.originReqType = originReqType; + sliceReq.originBodySize = duplicatedOriginBody.limit(); + + sliceReq.sliceBody = new byte[endIndexInBody - startIndexInBody]; + duplicatedOriginBody.position(startIndexInBody); + duplicatedOriginBody.get(sliceReq.sliceBody); + + sliceReq.sliceIndex = sliceIndex; + sliceReq.sliceCount = sliceCount; + + sliceReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion(); + sliceReq.type = PipeRequestType.TRANSFER_SLICE.getType(); + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + ReadWriteIOUtils.write(sliceReq.orderId, outputStream); + + ReadWriteIOUtils.write(sliceReq.originReqType, outputStream); + ReadWriteIOUtils.write(sliceReq.originBodySize, outputStream); + + ReadWriteIOUtils.write(new Binary(sliceReq.sliceBody), outputStream); + + ReadWriteIOUtils.write(sliceReq.sliceIndex, outputStream); + ReadWriteIOUtils.write(sliceReq.sliceCount, outputStream); + + sliceReq.body = + ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + } + + return sliceReq; + } + + public static PipeTransferSliceReq fromTPipeTransferReq(final TPipeTransferReq transferReq) { + final PipeTransferSliceReq sliceReq = new PipeTransferSliceReq(); + + sliceReq.orderId = ReadWriteIOUtils.readInt(transferReq.body); + + sliceReq.originReqType = ReadWriteIOUtils.readShort(transferReq.body); + sliceReq.originBodySize = ReadWriteIOUtils.readInt(transferReq.body); + + sliceReq.sliceBody = ReadWriteIOUtils.readBinary(transferReq.body).getValues(); + + sliceReq.sliceIndex = ReadWriteIOUtils.readInt(transferReq.body); + sliceReq.sliceCount = ReadWriteIOUtils.readInt(transferReq.body); + + sliceReq.version = transferReq.version; + sliceReq.type = transferReq.type; + sliceReq.body = transferReq.body; + + return sliceReq; + } + + /////////////////////////////// Object /////////////////////////////// + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + final PipeTransferSliceReq that = (PipeTransferSliceReq) obj; + return Objects.equals(orderId, that.orderId) + && Objects.equals(originReqType, that.originReqType) + && Objects.equals(originBodySize, that.originBodySize) + && Arrays.equals(sliceBody, that.sliceBody) + && Objects.equals(sliceIndex, that.sliceIndex) + && Objects.equals(sliceCount, that.sliceCount) + && Objects.equals(version, that.version) + && Objects.equals(type, that.type) + && Objects.equals(body, that.body); + } + + @Override + public int hashCode() { + return Objects.hash( + orderId, + originReqType, + originBodySize, + Arrays.hashCode(sliceBody), + sliceIndex, + sliceCount, + version, + type, + body); + } +}
