This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new e05d0bdaf0d Pipe: Implemented slice logic for async sink (#17668) 
(#17679)
e05d0bdaf0d is described below

commit e05d0bdaf0d61e165a925ca11797c9276cf760e3
Author: Caideyipi <[email protected]>
AuthorDate: Fri May 15 13:18:47 2026 +0800

    Pipe: Implemented slice logic for async sink (#17668) (#17679)
    
    * sl
    
    * chew
---
 .../PipeTransferTabletBatchEventHandler.java       |   2 +-
 .../PipeTransferTabletInsertNodeEventHandler.java  |   2 +-
 .../handler/PipeTransferTabletRawEventHandler.java |   2 +-
 .../handler/PipeTransferTrackableHandler.java      | 127 ++++++++++++
 .../async/handler/PipeTransferTsFileHandler.java   |   2 +-
 .../handler/PipeTransferTrackableHandlerTest.java  | 221 +++++++++++++++++++++
 .../async/AsyncPipeDataTransferServiceClient.java  |   4 +
 .../commons/pipe/sink/client/IoTDBSyncClient.java  |  31 +--
 .../thrift/common/PipeTransferSliceReqBuilder.java |  73 +++++++
 .../common/PipeTransferSliceReqBuilderTest.java    | 106 ++++++++++
 10 files changed, 542 insertions(+), 28 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index 8bcb9d47009..d5ee15c6367 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -129,7 +129,7 @@ public class PipeTransferTabletBatchEventHandler extends 
PipeTransferTrackableHa
   protected void doTransfer(
       final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq 
req)
       throws TException {
-    client.pipeTransfer(req, this);
+    transferWithOptionalRequestSlicing(client, req);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
index 912a1e724f7..56d1ce41b02 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
@@ -41,7 +41,7 @@ public class PipeTransferTabletInsertNodeEventHandler
   protected void doTransfer(
       final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq 
req)
       throws TException {
-    client.pipeTransfer(req, this);
+    transferWithOptionalRequestSlicing(client, req);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
index b64e446827a..eb4677de358 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
@@ -40,7 +40,7 @@ public class PipeTransferTabletRawEventHandler extends 
PipeTransferTabletInserti
   protected void doTransfer(
       final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq 
req)
       throws TException {
-    client.pipeTransfer(req, this);
+    transferWithOptionalRequestSlicing(client, req);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
index a8b4a3b7a79..a0e6ad73fe7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
@@ -21,7 +21,11 @@ package 
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler;
 
 import org.apache.iotdb.commons.client.ThriftClient;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferSliceReqBuilder;
 import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
+import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
@@ -126,8 +130,131 @@ public abstract class PipeTransferTrackableHandler
       final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq 
req)
       throws TException;
 
+  protected final void transferWithOptionalRequestSlicing(
+      final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq 
req)
+      throws TException {
+    final int bodySizeLimit = PipeTransferSliceReqBuilder.getBodySizeLimit();
+    if (!PipeTransferSliceReqBuilder.shouldSlice(req, bodySizeLimit)) {
+      client.pipeTransfer(req, this);
+      return;
+    }
+
+    LOGGER.warn(
+        "The body size of the request is too large. The request will be 
sliced. Origin req: {}-{}. "
+            + "Request body size: {}, threshold: {}",
+        req.getVersion(),
+        req.getType(),
+        req.body.limit(),
+        bodySizeLimit);
+
+    final int sliceCount = PipeTransferSliceReqBuilder.getSliceCount(req, 
bodySizeLimit);
+    final boolean shouldReturnSelf = client.shouldReturnSelf();
+    try {
+      transferSlicedRequest(
+          client,
+          req,
+          shouldReturnSelf,
+          PipeTransferSliceReqBuilder.nextSliceOrderId(),
+          0,
+          sliceCount,
+          bodySizeLimit);
+    } catch (final Exception e) {
+      fallbackToWholeRequest(client, req, shouldReturnSelf, e);
+    }
+  }
+
   public abstract void clearEventsReferenceCount();
 
+  private void transferSlicedRequest(
+      final AsyncPipeDataTransferServiceClient client,
+      final TPipeTransferReq originalReq,
+      final boolean shouldReturnSelf,
+      final int sliceOrderId,
+      final int sliceIndex,
+      final int sliceCount,
+      final int bodySizeLimit)
+      throws Exception {
+    client.setShouldReturnSelf(shouldReturnSelf && sliceIndex == sliceCount - 
1);
+    client.pipeTransfer(
+        PipeTransferSliceReqBuilder.buildSliceReq(
+            originalReq, sliceOrderId, sliceIndex, sliceCount, bodySizeLimit),
+        new AsyncMethodCallback<TPipeTransferResp>() {
+          @Override
+          public void onComplete(final TPipeTransferResp response) {
+            if (sink.isClosed() || sliceIndex == sliceCount - 1) {
+              PipeTransferTrackableHandler.this.onComplete(response);
+              return;
+            }
+
+            if (response == null) {
+              fallbackToWholeRequest(
+                  client,
+                  originalReq,
+                  shouldReturnSelf,
+                  new PipeException("TPipeTransferResp is null when 
transferring slice."));
+              return;
+            }
+
+            if (response.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+              fallbackToWholeRequest(
+                  client,
+                  originalReq,
+                  shouldReturnSelf,
+                  new PipeConnectionException(
+                      String.format(
+                          "Failed to transfer slice. Origin req: %s-%s, slice 
index: %d, slice count: %d. Reason: %s",
+                          originalReq.getVersion(),
+                          originalReq.getType(),
+                          sliceIndex,
+                          sliceCount,
+                          response.getStatus())));
+              return;
+            }
+
+            try {
+              transferSlicedRequest(
+                  client,
+                  originalReq,
+                  shouldReturnSelf,
+                  sliceOrderId,
+                  sliceIndex + 1,
+                  sliceCount,
+                  bodySizeLimit);
+            } catch (final Exception e) {
+              fallbackToWholeRequest(client, originalReq, shouldReturnSelf, e);
+            }
+          }
+
+          @Override
+          public void onError(final Exception exception) {
+            if (sink.isClosed() || sliceIndex == sliceCount - 1) {
+              PipeTransferTrackableHandler.this.onError(exception);
+              return;
+            }
+            fallbackToWholeRequest(client, originalReq, shouldReturnSelf, 
exception);
+          }
+        });
+  }
+
+  private void fallbackToWholeRequest(
+      final AsyncPipeDataTransferServiceClient client,
+      final TPipeTransferReq originalReq,
+      final boolean shouldReturnSelf,
+      final Exception exception) {
+    LOGGER.warn(
+        "Failed to transfer slice. Origin req: {}-{}. Retry the whole 
transfer.",
+        originalReq.getVersion(),
+        originalReq.getType(),
+        exception);
+
+    try {
+      client.setShouldReturnSelf(shouldReturnSelf);
+      client.pipeTransfer(originalReq, this);
+    } catch (final Exception e) {
+      PipeTransferTrackableHandler.this.onError(e);
+    }
+  }
+
   public void closeClient() {
     if (Objects.isNull(client)) {
       return;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 6742199ac10..c79a09ec239 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -441,7 +441,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
       return;
     }
 
-    client.pipeTransfer(req, this);
+    transferWithOptionalRequestSlicing(client, req);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java
new file mode 100644
index 00000000000..60b69235085
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
+import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
+import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq;
+import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class PipeTransferTrackableHandlerTest {
+
+  private final CommonConfig commonConfig = 
CommonDescriptor.getInstance().getConfig();
+
+  private int originalRequestSliceThresholdBytes;
+
+  @Before
+  public void setUp() {
+    originalRequestSliceThresholdBytes = 
commonConfig.getPipeSinkRequestSliceThresholdBytes();
+    commonConfig.setPipeSinkRequestSliceThresholdBytes(4);
+  }
+
+  @After
+  public void tearDown() {
+    
commonConfig.setPipeSinkRequestSliceThresholdBytes(originalRequestSliceThresholdBytes);
+  }
+
+  @Test
+  public void testLargeRequestWillBeSlicedForAsyncTransfer() throws Exception {
+    final IoTDBDataRegionAsyncSink sink = 
Mockito.mock(IoTDBDataRegionAsyncSink.class);
+    final AsyncPipeDataTransferServiceClient client =
+        Mockito.mock(AsyncPipeDataTransferServiceClient.class);
+    Mockito.when(client.shouldReturnSelf()).thenReturn(true);
+
+    final List<TPipeTransferReq> transferredRequests = new ArrayList<>();
+    Mockito.doAnswer(
+            invocation -> {
+              final TPipeTransferReq req = invocation.getArgument(0);
+              final AsyncMethodCallback<TPipeTransferResp> callback = 
invocation.getArgument(1);
+              transferredRequests.add(req);
+              callback.onComplete(successResp());
+              return null;
+            })
+        .when(client)
+        .pipeTransfer(Mockito.any(TPipeTransferReq.class), Mockito.any());
+
+    final TestPipeTransferTrackableHandler handler = new 
TestPipeTransferTrackableHandler(sink);
+    final TPipeTransferReq originalReq = createReq(10);
+
+    handler.transfer(client, originalReq);
+
+    Assert.assertEquals(3, transferredRequests.size());
+    Assert.assertEquals(1, handler.completeCount);
+    Assert.assertEquals(0, handler.errorCount);
+
+    final PipeTransferSliceReq firstSlice =
+        PipeTransferSliceReq.fromTPipeTransferReq(transferredRequests.get(0));
+    final PipeTransferSliceReq secondSlice =
+        PipeTransferSliceReq.fromTPipeTransferReq(transferredRequests.get(1));
+    final PipeTransferSliceReq thirdSlice =
+        PipeTransferSliceReq.fromTPipeTransferReq(transferredRequests.get(2));
+
+    Assert.assertEquals(
+        PipeRequestType.TRANSFER_SLICE.getType(), 
transferredRequests.get(0).getType());
+    Assert.assertEquals(firstSlice.getOrderId(), secondSlice.getOrderId());
+    Assert.assertEquals(firstSlice.getOrderId(), thirdSlice.getOrderId());
+    Assert.assertEquals(originalReq.getType(), firstSlice.getOriginReqType());
+    Assert.assertEquals(10, firstSlice.getOriginBodySize());
+    Assert.assertEquals(3, firstSlice.getSliceCount());
+    Assert.assertEquals(0, firstSlice.getSliceIndex());
+    Assert.assertEquals(1, secondSlice.getSliceIndex());
+    Assert.assertEquals(2, thirdSlice.getSliceIndex());
+    Assert.assertEquals(4, firstSlice.getSliceBody().length);
+    Assert.assertEquals(4, secondSlice.getSliceBody().length);
+    Assert.assertEquals(2, thirdSlice.getSliceBody().length);
+
+    final ArgumentCaptor<Boolean> shouldReturnSelfCaptor = 
ArgumentCaptor.forClass(Boolean.class);
+    Mockito.verify(client, 
Mockito.times(3)).setShouldReturnSelf(shouldReturnSelfCaptor.capture());
+    Assert.assertEquals(Arrays.asList(false, false, true), 
shouldReturnSelfCaptor.getAllValues());
+  }
+
+  @Test
+  public void testLargeRequestFallsBackToWholeRequestWhenSliceTransferFails() 
throws Exception {
+    final IoTDBDataRegionAsyncSink sink = 
Mockito.mock(IoTDBDataRegionAsyncSink.class);
+    final AsyncPipeDataTransferServiceClient client =
+        Mockito.mock(AsyncPipeDataTransferServiceClient.class);
+    Mockito.when(client.shouldReturnSelf()).thenReturn(true);
+
+    final List<TPipeTransferReq> transferredRequests = new ArrayList<>();
+    Mockito.doAnswer(
+            invocation -> {
+              final TPipeTransferReq req = invocation.getArgument(0);
+              final AsyncMethodCallback<TPipeTransferResp> callback = 
invocation.getArgument(1);
+              transferredRequests.add(req);
+              if (req.getType() == PipeRequestType.TRANSFER_SLICE.getType()) {
+                callback.onComplete(failedResp());
+              } else {
+                callback.onComplete(successResp());
+              }
+              return null;
+            })
+        .when(client)
+        .pipeTransfer(Mockito.any(TPipeTransferReq.class), Mockito.any());
+
+    final TestPipeTransferTrackableHandler handler = new 
TestPipeTransferTrackableHandler(sink);
+    final TPipeTransferReq originalReq = createReq(10);
+
+    handler.transfer(client, originalReq);
+
+    Assert.assertEquals(2, transferredRequests.size());
+    Assert.assertEquals(
+        PipeRequestType.TRANSFER_SLICE.getType(), 
transferredRequests.get(0).getType());
+    Assert.assertEquals(originalReq.getType(), 
transferredRequests.get(1).getType());
+    Assert.assertEquals(originalReq.getVersion(), 
transferredRequests.get(1).getVersion());
+    Assert.assertArrayEquals(originalReq.getBody(), 
transferredRequests.get(1).getBody());
+    Assert.assertEquals(1, handler.completeCount);
+    Assert.assertEquals(0, handler.errorCount);
+  }
+
+  private static TPipeTransferReq createReq(final int bodySize) {
+    final byte[] body = new byte[bodySize];
+    for (int i = 0; i < body.length; ++i) {
+      body[i] = (byte) i;
+    }
+
+    final TPipeTransferReq req = new TPipeTransferReq();
+    req.version = IoTDBSinkRequestVersion.VERSION_1.getVersion();
+    req.type = (short) 123;
+    req.body = ByteBuffer.wrap(body);
+    return req;
+  }
+
+  private static TPipeTransferResp successResp() {
+    final TPipeTransferResp resp = new TPipeTransferResp();
+    resp.setStatus(new 
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+    return resp;
+  }
+
+  private static TPipeTransferResp failedResp() {
+    final TPipeTransferResp resp = new TPipeTransferResp();
+    resp.setStatus(
+        new 
TSStatus().setCode(TSStatusCode.PIPE_TRANSFER_SLICE_OUT_OF_ORDER.getStatusCode()));
+    return resp;
+  }
+
+  private static class TestPipeTransferTrackableHandler extends 
PipeTransferTrackableHandler {
+
+    private int completeCount;
+    private int errorCount;
+
+    private TestPipeTransferTrackableHandler(final IoTDBDataRegionAsyncSink 
sink) {
+      super(sink);
+    }
+
+    private void transfer(
+        final AsyncPipeDataTransferServiceClient client, final 
TPipeTransferReq req)
+        throws TException {
+      tryTransfer(client, req);
+    }
+
+    @Override
+    protected boolean onCompleteInternal(final TPipeTransferResp response) {
+      completeCount++;
+      return true;
+    }
+
+    @Override
+    protected void onErrorInternal(final Exception exception) {
+      errorCount++;
+    }
+
+    @Override
+    protected void doTransfer(
+        final AsyncPipeDataTransferServiceClient client, final 
TPipeTransferReq req)
+        throws TException {
+      transferWithOptionalRequestSlicing(client, req);
+    }
+
+    @Override
+    public void clearEventsReferenceCount() {
+      // Do nothing
+    }
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
index 41e8c59511a..2cdfbd865c8 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
@@ -135,6 +135,10 @@ public class AsyncPipeDataTransferServiceClient extends 
IClientRPCService.AsyncC
     this.shouldReturnSelf.set(shouldReturnSelf);
   }
 
+  public boolean shouldReturnSelf() {
+    return shouldReturnSelf.get();
+  }
+
   public void setTimeoutDynamically(final int timeout) {
     try {
       ((TNonblockingSocket) ___transport).setTimeout(timeout);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
index b7f42295e6c..1ad5d0a855f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
@@ -22,9 +22,7 @@ package org.apache.iotdb.commons.pipe.sink.client;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.ThriftClient;
 import org.apache.iotdb.commons.client.property.ThriftClientProperty;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
-import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq;
+import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferSliceReqBuilder;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -39,15 +37,11 @@ import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.atomic.AtomicInteger;
-
 public class IoTDBSyncClient extends IClientRPCService.Client
     implements ThriftClient, AutoCloseable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBSyncClient.class);
 
-  private static final AtomicInteger SLICE_ORDER_ID_GENERATOR = new 
AtomicInteger(0);
-
   private final String ipAddress;
   private final int port;
   private final TEndPoint endPoint;
@@ -100,9 +94,8 @@ public class IoTDBSyncClient extends IClientRPCService.Client
 
   @Override
   public TPipeTransferResp pipeTransfer(final TPipeTransferReq req) throws 
TException {
-    final int bodySizeLimit = 
PipeConfig.getInstance().getPipeSinkRequestSliceThresholdBytes();
-    if (req.getVersion() != IoTDBSinkRequestVersion.VERSION_1.getVersion()
-        || req.body.limit() < bodySizeLimit) {
+    final int bodySizeLimit = PipeTransferSliceReqBuilder.getBodySizeLimit();
+    if (!PipeTransferSliceReqBuilder.shouldSlice(req, bodySizeLimit)) {
       return super.pipeTransfer(req);
     }
 
@@ -115,23 +108,13 @@ public class IoTDBSyncClient extends 
IClientRPCService.Client
         bodySizeLimit);
 
     try {
-      final int sliceOrderId = SLICE_ORDER_ID_GENERATOR.getAndIncrement();
-      // Slice the buffer to avoid the buffer being too large
-      final int sliceCount =
-          req.body.limit() / bodySizeLimit + (req.body.limit() % bodySizeLimit 
== 0 ? 0 : 1);
+      final int sliceOrderId = PipeTransferSliceReqBuilder.nextSliceOrderId();
+      final int sliceCount = PipeTransferSliceReqBuilder.getSliceCount(req, 
bodySizeLimit);
       for (int i = 0; i < sliceCount; ++i) {
-        final int startIndexInBody = i * bodySizeLimit;
-        final int endIndexInBody = Math.min((i + 1) * bodySizeLimit, 
req.body.limit());
         final TPipeTransferResp sliceResp =
             super.pipeTransfer(
-                PipeTransferSliceReq.toTPipeTransferReq(
-                    sliceOrderId,
-                    req.getType(),
-                    i,
-                    sliceCount,
-                    req.body.duplicate(),
-                    startIndexInBody,
-                    endIndexInBody));
+                PipeTransferSliceReqBuilder.buildSliceReq(
+                    req, sliceOrderId, i, sliceCount, bodySizeLimit));
 
         if (i == sliceCount - 1) {
           return sliceResp;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilder.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilder.java
new file mode 100644
index 00000000000..b108d6f1d3a
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilder.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.sink.payload.thrift.common;
+
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
+import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public final class PipeTransferSliceReqBuilder {
+
+  private static final AtomicInteger SLICE_ORDER_ID_GENERATOR = new 
AtomicInteger(0);
+
+  private PipeTransferSliceReqBuilder() {
+    // Utility class
+  }
+
+  public static int getBodySizeLimit() {
+    return PipeConfig.getInstance().getPipeSinkRequestSliceThresholdBytes();
+  }
+
+  public static boolean shouldSlice(final TPipeTransferReq req, final int 
bodySizeLimit) {
+    return req.getVersion() == IoTDBSinkRequestVersion.VERSION_1.getVersion()
+        && req.body.limit() >= bodySizeLimit;
+  }
+
+  public static int nextSliceOrderId() {
+    return SLICE_ORDER_ID_GENERATOR.getAndIncrement();
+  }
+
+  public static int getSliceCount(final TPipeTransferReq req, final int 
bodySizeLimit) {
+    return req.body.limit() / bodySizeLimit + (req.body.limit() % 
bodySizeLimit == 0 ? 0 : 1);
+  }
+
+  public static PipeTransferSliceReq buildSliceReq(
+      final TPipeTransferReq originalReq,
+      final int sliceOrderId,
+      final int sliceIndex,
+      final int sliceCount,
+      final int bodySizeLimit)
+      throws IOException {
+    final int startIndexInBody = sliceIndex * bodySizeLimit;
+    final int endIndexInBody = Math.min((sliceIndex + 1) * bodySizeLimit, 
originalReq.body.limit());
+    return PipeTransferSliceReq.toTPipeTransferReq(
+        sliceOrderId,
+        originalReq.getType(),
+        sliceIndex,
+        sliceCount,
+        originalReq.body.duplicate(),
+        startIndexInBody,
+        endIndexInBody);
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java
new file mode 100644
index 00000000000..290ce397980
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.sink.payload.thrift.common;
+
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
+import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+public class PipeTransferSliceReqBuilderTest {
+
+  private final CommonConfig commonConfig = 
CommonDescriptor.getInstance().getConfig();
+
+  private int originalRequestSliceThresholdBytes;
+
+  @Before
+  public void setUp() {
+    originalRequestSliceThresholdBytes = 
commonConfig.getPipeSinkRequestSliceThresholdBytes();
+    commonConfig.setPipeSinkRequestSliceThresholdBytes(4);
+  }
+
+  @After
+  public void tearDown() {
+    
commonConfig.setPipeSinkRequestSliceThresholdBytes(originalRequestSliceThresholdBytes);
+  }
+
+  @Test
+  public void testBuildSliceReq() throws Exception {
+    final TPipeTransferReq req = 
createReq(IoTDBSinkRequestVersion.VERSION_1.getVersion(), 10);
+    final int bodySizeLimit = PipeTransferSliceReqBuilder.getBodySizeLimit();
+
+    Assert.assertTrue(PipeTransferSliceReqBuilder.shouldSlice(req, 
bodySizeLimit));
+    Assert.assertEquals(3, PipeTransferSliceReqBuilder.getSliceCount(req, 
bodySizeLimit));
+
+    final PipeTransferSliceReq firstSlice =
+        PipeTransferSliceReqBuilder.buildSliceReq(req, 123, 0, 3, 
bodySizeLimit);
+    final PipeTransferSliceReq secondSlice =
+        PipeTransferSliceReqBuilder.buildSliceReq(req, 123, 1, 3, 
bodySizeLimit);
+    final PipeTransferSliceReq thirdSlice =
+        PipeTransferSliceReqBuilder.buildSliceReq(req, 123, 2, 3, 
bodySizeLimit);
+
+    Assert.assertArrayEquals(new byte[] {0, 1, 2, 3}, 
firstSlice.getSliceBody());
+    Assert.assertArrayEquals(new byte[] {4, 5, 6, 7}, 
secondSlice.getSliceBody());
+    Assert.assertArrayEquals(new byte[] {8, 9}, thirdSlice.getSliceBody());
+    Assert.assertEquals(0, firstSlice.getSliceIndex());
+    Assert.assertEquals(1, secondSlice.getSliceIndex());
+    Assert.assertEquals(2, thirdSlice.getSliceIndex());
+    Assert.assertEquals(3, firstSlice.getSliceCount());
+    Assert.assertEquals(req.getType(), firstSlice.getOriginReqType());
+    Assert.assertEquals(10, firstSlice.getOriginBodySize());
+  }
+
+  @Test
+  public void testShouldSliceOnlyForVersion1RequestsAboveThreshold() {
+    final int bodySizeLimit = PipeTransferSliceReqBuilder.getBodySizeLimit();
+
+    Assert.assertFalse(
+        PipeTransferSliceReqBuilder.shouldSlice(
+            createReq(IoTDBSinkRequestVersion.VERSION_1.getVersion(), 3), 
bodySizeLimit));
+    Assert.assertFalse(
+        PipeTransferSliceReqBuilder.shouldSlice(
+            createReq((byte) (IoTDBSinkRequestVersion.VERSION_1.getVersion() + 
1), 10),
+            bodySizeLimit));
+    Assert.assertTrue(
+        PipeTransferSliceReqBuilder.shouldSlice(
+            createReq(IoTDBSinkRequestVersion.VERSION_1.getVersion(), 4), 
bodySizeLimit));
+  }
+
+  private static TPipeTransferReq createReq(final byte version, final int 
bodySize) {
+    final byte[] body = new byte[bodySize];
+    for (int i = 0; i < body.length; ++i) {
+      body[i] = (byte) i;
+    }
+
+    final TPipeTransferReq req = new TPipeTransferReq();
+    req.version = version;
+    req.type = (short) 123;
+    req.body = ByteBuffer.wrap(body);
+    return req;
+  }
+}

Reply via email to