This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch slice in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 20b870da86a6bd0d9a31d0dcd62704c9422e882c Author: Caideyipi <[email protected]> AuthorDate: Thu May 14 16:20:29 2026 +0800 sl --- .../PipeTransferTabletBatchEventHandler.java | 2 +- .../PipeTransferTabletInsertNodeEventHandler.java | 2 +- .../handler/PipeTransferTabletRawEventHandler.java | 2 +- .../handler/PipeTransferTrackableHandler.java | 141 +++++++++++++ .../async/handler/PipeTransferTsFileHandler.java | 2 +- .../handler/PipeTransferTrackableHandlerTest.java | 218 +++++++++++++++++++++ .../async/AsyncPipeDataTransferServiceClient.java | 4 + 7 files changed, 367 insertions(+), 4 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java index 52c52b1038e..e6899dee3c5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java @@ -129,7 +129,7 @@ public class PipeTransferTabletBatchEventHandler extends PipeTransferTrackableHa protected void doTransfer( final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq req) throws TException { - client.pipeTransfer(req, this); + transferWithOptionalRequestSlicing(client, req); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java index 912a1e724f7..56d1ce41b02 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java @@ -41,7 +41,7 @@ public class PipeTransferTabletInsertNodeEventHandler protected void doTransfer( final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq req) throws TException { - client.pipeTransfer(req, this); + transferWithOptionalRequestSlicing(client, req); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java index b64e446827a..eb4677de358 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java @@ -40,7 +40,7 @@ public class PipeTransferTabletRawEventHandler extends PipeTransferTabletInserti protected void doTransfer( final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq req) throws TException { - client.pipeTransfer(req, this); + transferWithOptionalRequestSlicing(client, req); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java index a8b4a3b7a79..51d6135fc49 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java @@ -21,7 +21,13 @@ package org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler; import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient; +import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq; import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink; +import org.apache.iotdb.pipe.api.exception.PipeConnectionException; +import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; @@ -31,10 +37,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; public abstract class PipeTransferTrackableHandler implements AsyncMethodCallback<TPipeTransferResp>, AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(PipeTransferTsFileHandler.class); + private static final AtomicInteger SLICE_ORDER_ID_GENERATOR = new AtomicInteger(0); protected final IoTDBDataRegionAsyncSink sink; protected volatile AsyncPipeDataTransferServiceClient client; @@ -126,8 +134,141 @@ public abstract class PipeTransferTrackableHandler final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq req) throws TException; + protected final void transferWithOptionalRequestSlicing( + final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq req) + throws TException { + final int bodySizeLimit = PipeConfig.getInstance().getPipeSinkRequestSliceThresholdBytes(); + if (req.getVersion() != IoTDBSinkRequestVersion.VERSION_1.getVersion() + || req.body.limit() < bodySizeLimit) { + client.pipeTransfer(req, this); + return; + } + + LOGGER.warn( + "The body size of the request is too large. The request will be sliced. Origin req: {}-{}. " + + "Request body size: {}, threshold: {}", + req.getVersion(), + req.getType(), + req.body.limit(), + bodySizeLimit); + + final int sliceCount = + req.body.limit() / bodySizeLimit + (req.body.limit() % bodySizeLimit == 0 ? 0 : 1); + final boolean shouldReturnSelf = client.shouldReturnSelf(); + try { + transferSlicedRequest( + client, + req, + shouldReturnSelf, + SLICE_ORDER_ID_GENERATOR.getAndIncrement(), + 0, + sliceCount, + bodySizeLimit); + } catch (final Exception e) { + fallbackToWholeRequest(client, req, shouldReturnSelf, e); + } + } + public abstract void clearEventsReferenceCount(); + private void transferSlicedRequest( + final AsyncPipeDataTransferServiceClient client, + final TPipeTransferReq originalReq, + final boolean shouldReturnSelf, + final int sliceOrderId, + final int sliceIndex, + final int sliceCount, + final int bodySizeLimit) + throws Exception { + final int startIndexInBody = sliceIndex * bodySizeLimit; + final int endIndexInBody = Math.min((sliceIndex + 1) * bodySizeLimit, originalReq.body.limit()); + client.setShouldReturnSelf(shouldReturnSelf && sliceIndex == sliceCount - 1); + client.pipeTransfer( + PipeTransferSliceReq.toTPipeTransferReq( + sliceOrderId, + originalReq.getType(), + sliceIndex, + sliceCount, + originalReq.body.duplicate(), + startIndexInBody, + endIndexInBody), + new AsyncMethodCallback<TPipeTransferResp>() { + @Override + public void onComplete(final TPipeTransferResp response) { + if (sink.isClosed() || sliceIndex == sliceCount - 1) { + PipeTransferTrackableHandler.this.onComplete(response); + return; + } + + if (response == null) { + fallbackToWholeRequest( + client, + originalReq, + shouldReturnSelf, + new PipeException("TPipeTransferResp is null when transferring slice.")); + return; + } + + if (response.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + fallbackToWholeRequest( + client, + originalReq, + shouldReturnSelf, + new PipeConnectionException( + String.format( + "Failed to transfer slice. Origin req: %s-%s, slice index: %d, slice count: %d. Reason: %s", + originalReq.getVersion(), + originalReq.getType(), + sliceIndex, + sliceCount, + response.getStatus()))); + return; + } + + try { + transferSlicedRequest( + client, + originalReq, + shouldReturnSelf, + sliceOrderId, + sliceIndex + 1, + sliceCount, + bodySizeLimit); + } catch (final Exception e) { + fallbackToWholeRequest(client, originalReq, shouldReturnSelf, e); + } + } + + @Override + public void onError(final Exception exception) { + if (sink.isClosed() || sliceIndex == sliceCount - 1) { + PipeTransferTrackableHandler.this.onError(exception); + return; + } + fallbackToWholeRequest(client, originalReq, shouldReturnSelf, exception); + } + }); + } + + private void fallbackToWholeRequest( + final AsyncPipeDataTransferServiceClient client, + final TPipeTransferReq originalReq, + final boolean shouldReturnSelf, + final Exception exception) { + LOGGER.warn( + "Failed to transfer slice. Origin req: {}-{}. Retry the whole transfer.", + originalReq.getVersion(), + originalReq.getType(), + exception); + + try { + client.setShouldReturnSelf(shouldReturnSelf); + client.pipeTransfer(originalReq, this); + } catch (final Exception e) { + PipeTransferTrackableHandler.this.onError(e); + } + } + public void closeClient() { if (Objects.isNull(client)) { return; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java index 35a28d1413a..b6d3785de7d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java @@ -449,7 +449,7 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { return; } - client.pipeTransfer(req, this); + transferWithOptionalRequestSlicing(client, req); } @Override diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java new file mode 100644 index 00000000000..7d62d22a17a --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient; +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq; +import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; + +import org.apache.thrift.TException; +import org.apache.thrift.async.AsyncMethodCallback; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class PipeTransferTrackableHandlerTest { + + private final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig(); + + private int originalRequestSliceThresholdBytes; + + @Before + public void setUp() { + originalRequestSliceThresholdBytes = commonConfig.getPipeSinkRequestSliceThresholdBytes(); + commonConfig.setPipeSinkRequestSliceThresholdBytes(4); + } + + @After + public void tearDown() { + commonConfig.setPipeSinkRequestSliceThresholdBytes(originalRequestSliceThresholdBytes); + } + + @Test + public void testLargeRequestWillBeSlicedForAsyncTransfer() throws Exception { + final IoTDBDataRegionAsyncSink sink = Mockito.mock(IoTDBDataRegionAsyncSink.class); + final AsyncPipeDataTransferServiceClient client = + Mockito.mock(AsyncPipeDataTransferServiceClient.class); + Mockito.when(client.shouldReturnSelf()).thenReturn(true); + + final List<TPipeTransferReq> transferredRequests = new ArrayList<>(); + Mockito.doAnswer( + invocation -> { + final TPipeTransferReq req = invocation.getArgument(0); + final AsyncMethodCallback<TPipeTransferResp> callback = invocation.getArgument(1); + transferredRequests.add(req); + callback.onComplete(successResp()); + return null; + }) + .when(client) + .pipeTransfer(Mockito.any(TPipeTransferReq.class), Mockito.any()); + + final TestPipeTransferTrackableHandler handler = new TestPipeTransferTrackableHandler(sink); + final TPipeTransferReq originalReq = createReq(10); + + handler.transfer(client, originalReq); + + Assert.assertEquals(3, transferredRequests.size()); + Assert.assertEquals(1, handler.completeCount); + Assert.assertEquals(0, handler.errorCount); + + final PipeTransferSliceReq firstSlice = + PipeTransferSliceReq.fromTPipeTransferReq(transferredRequests.get(0)); + final PipeTransferSliceReq secondSlice = + PipeTransferSliceReq.fromTPipeTransferReq(transferredRequests.get(1)); + final PipeTransferSliceReq thirdSlice = + PipeTransferSliceReq.fromTPipeTransferReq(transferredRequests.get(2)); + + Assert.assertEquals(PipeRequestType.TRANSFER_SLICE.getType(), transferredRequests.get(0).getType()); + Assert.assertEquals(firstSlice.getOrderId(), secondSlice.getOrderId()); + Assert.assertEquals(firstSlice.getOrderId(), thirdSlice.getOrderId()); + Assert.assertEquals(originalReq.getType(), firstSlice.getOriginReqType()); + Assert.assertEquals(10, firstSlice.getOriginBodySize()); + Assert.assertEquals(3, firstSlice.getSliceCount()); + Assert.assertEquals(0, firstSlice.getSliceIndex()); + Assert.assertEquals(1, secondSlice.getSliceIndex()); + Assert.assertEquals(2, thirdSlice.getSliceIndex()); + Assert.assertEquals(4, firstSlice.getSliceBody().length); + Assert.assertEquals(4, secondSlice.getSliceBody().length); + Assert.assertEquals(2, thirdSlice.getSliceBody().length); + + final ArgumentCaptor<Boolean> shouldReturnSelfCaptor = ArgumentCaptor.forClass(Boolean.class); + Mockito.verify(client, Mockito.times(3)).setShouldReturnSelf(shouldReturnSelfCaptor.capture()); + Assert.assertEquals(Arrays.asList(false, false, true), shouldReturnSelfCaptor.getAllValues()); + } + + @Test + public void testLargeRequestFallsBackToWholeRequestWhenSliceTransferFails() throws Exception { + final IoTDBDataRegionAsyncSink sink = Mockito.mock(IoTDBDataRegionAsyncSink.class); + final AsyncPipeDataTransferServiceClient client = + Mockito.mock(AsyncPipeDataTransferServiceClient.class); + Mockito.when(client.shouldReturnSelf()).thenReturn(true); + + final List<TPipeTransferReq> transferredRequests = new ArrayList<>(); + Mockito.doAnswer( + invocation -> { + final TPipeTransferReq req = invocation.getArgument(0); + final AsyncMethodCallback<TPipeTransferResp> callback = invocation.getArgument(1); + transferredRequests.add(req); + if (req.getType() == PipeRequestType.TRANSFER_SLICE.getType()) { + callback.onComplete(failedResp()); + } else { + callback.onComplete(successResp()); + } + return null; + }) + .when(client) + .pipeTransfer(Mockito.any(TPipeTransferReq.class), Mockito.any()); + + final TestPipeTransferTrackableHandler handler = new TestPipeTransferTrackableHandler(sink); + final TPipeTransferReq originalReq = createReq(10); + + handler.transfer(client, originalReq); + + Assert.assertEquals(2, transferredRequests.size()); + Assert.assertEquals(PipeRequestType.TRANSFER_SLICE.getType(), transferredRequests.get(0).getType()); + Assert.assertEquals(originalReq.getType(), transferredRequests.get(1).getType()); + Assert.assertEquals(originalReq.getVersion(), transferredRequests.get(1).getVersion()); + Assert.assertArrayEquals(originalReq.getBody(), transferredRequests.get(1).getBody()); + Assert.assertEquals(1, handler.completeCount); + Assert.assertEquals(0, handler.errorCount); + } + + private static TPipeTransferReq createReq(final int bodySize) { + final byte[] body = new byte[bodySize]; + for (int i = 0; i < body.length; ++i) { + body[i] = (byte) i; + } + + final TPipeTransferReq req = new TPipeTransferReq(); + req.version = IoTDBSinkRequestVersion.VERSION_1.getVersion(); + req.type = (short) 123; + req.body = ByteBuffer.wrap(body); + return req; + } + + private static TPipeTransferResp successResp() { + final TPipeTransferResp resp = new TPipeTransferResp(); + resp.setStatus(new TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode())); + return resp; + } + + private static TPipeTransferResp failedResp() { + final TPipeTransferResp resp = new TPipeTransferResp(); + resp.setStatus( + new TSStatus().setCode(TSStatusCode.PIPE_TRANSFER_SLICE_OUT_OF_ORDER.getStatusCode())); + return resp; + } + + private static class TestPipeTransferTrackableHandler extends PipeTransferTrackableHandler { + + private int completeCount; + private int errorCount; + + private TestPipeTransferTrackableHandler(final IoTDBDataRegionAsyncSink sink) { + super(sink); + } + + private void transfer(final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq req) + throws TException { + tryTransfer(client, req); + } + + @Override + protected boolean onCompleteInternal(final TPipeTransferResp response) { + completeCount++; + return true; + } + + @Override + protected void onErrorInternal(final Exception exception) { + errorCount++; + } + + @Override + protected void doTransfer( + final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq req) + throws TException { + transferWithOptionalRequestSlicing(client, req); + } + + @Override + public void clearEventsReferenceCount() { + // Do nothing + } + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java index 36295ec8500..b7edc0c1088 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java @@ -135,6 +135,10 @@ public class AsyncPipeDataTransferServiceClient extends IClientRPCService.AsyncC this.shouldReturnSelf.set(shouldReturnSelf); } + public boolean shouldReturnSelf() { + return shouldReturnSelf.get(); + } + public void setTimeoutDynamically(final int timeout) { try { ((TNonblockingSocket) ___transport).setTimeout(timeout);
