This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch slice
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/slice by this push:
     new 97a405d1df7 chew
97a405d1df7 is described below

commit 97a405d1df7421437a6e2d98070f5dad1a3fdcbf
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 14 17:01:04 2026 +0800

    chew
---
 .../handler/PipeTransferTrackableHandler.java      |  28 ++----
 .../handler/PipeTransferTrackableHandlerTest.java  |   9 +-
 .../commons/pipe/sink/client/IoTDBSyncClient.java  |  31 ++----
 .../thrift/common/PipeTransferSliceReqBuilder.java |  73 ++++++++++++++
 .../common/PipeTransferSliceReqBuilderTest.java    | 106 +++++++++++++++++++++
 5 files changed, 199 insertions(+), 48 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
index 51d6135fc49..a0e6ad73fe7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
@@ -21,9 +21,7 @@ package 
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler;
 
 import org.apache.iotdb.commons.client.ThriftClient;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
-import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq;
+import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferSliceReqBuilder;
 import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -37,12 +35,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Objects;
-import java.util.concurrent.atomic.AtomicInteger;
 
 public abstract class PipeTransferTrackableHandler
     implements AsyncMethodCallback<TPipeTransferResp>, AutoCloseable {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTransferTsFileHandler.class);
-  private static final AtomicInteger SLICE_ORDER_ID_GENERATOR = new 
AtomicInteger(0);
 
   protected final IoTDBDataRegionAsyncSink sink;
   protected volatile AsyncPipeDataTransferServiceClient client;
@@ -137,9 +133,8 @@ public abstract class PipeTransferTrackableHandler
   protected final void transferWithOptionalRequestSlicing(
       final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq 
req)
       throws TException {
-    final int bodySizeLimit = 
PipeConfig.getInstance().getPipeSinkRequestSliceThresholdBytes();
-    if (req.getVersion() != IoTDBSinkRequestVersion.VERSION_1.getVersion()
-        || req.body.limit() < bodySizeLimit) {
+    final int bodySizeLimit = PipeTransferSliceReqBuilder.getBodySizeLimit();
+    if (!PipeTransferSliceReqBuilder.shouldSlice(req, bodySizeLimit)) {
       client.pipeTransfer(req, this);
       return;
     }
@@ -152,15 +147,14 @@ public abstract class PipeTransferTrackableHandler
         req.body.limit(),
         bodySizeLimit);
 
-    final int sliceCount =
-        req.body.limit() / bodySizeLimit + (req.body.limit() % bodySizeLimit 
== 0 ? 0 : 1);
+    final int sliceCount = PipeTransferSliceReqBuilder.getSliceCount(req, 
bodySizeLimit);
     final boolean shouldReturnSelf = client.shouldReturnSelf();
     try {
       transferSlicedRequest(
           client,
           req,
           shouldReturnSelf,
-          SLICE_ORDER_ID_GENERATOR.getAndIncrement(),
+          PipeTransferSliceReqBuilder.nextSliceOrderId(),
           0,
           sliceCount,
           bodySizeLimit);
@@ -180,18 +174,10 @@ public abstract class PipeTransferTrackableHandler
       final int sliceCount,
       final int bodySizeLimit)
       throws Exception {
-    final int startIndexInBody = sliceIndex * bodySizeLimit;
-    final int endIndexInBody = Math.min((sliceIndex + 1) * bodySizeLimit, 
originalReq.body.limit());
     client.setShouldReturnSelf(shouldReturnSelf && sliceIndex == sliceCount - 
1);
     client.pipeTransfer(
-        PipeTransferSliceReq.toTPipeTransferReq(
-            sliceOrderId,
-            originalReq.getType(),
-            sliceIndex,
-            sliceCount,
-            originalReq.body.duplicate(),
-            startIndexInBody,
-            endIndexInBody),
+        PipeTransferSliceReqBuilder.buildSliceReq(
+            originalReq, sliceOrderId, sliceIndex, sliceCount, bodySizeLimit),
         new AsyncMethodCallback<TPipeTransferResp>() {
           @Override
           public void onComplete(final TPipeTransferResp response) {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java
index 7d62d22a17a..60b69235085 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java
@@ -97,7 +97,8 @@ public class PipeTransferTrackableHandlerTest {
     final PipeTransferSliceReq thirdSlice =
         PipeTransferSliceReq.fromTPipeTransferReq(transferredRequests.get(2));
 
-    Assert.assertEquals(PipeRequestType.TRANSFER_SLICE.getType(), 
transferredRequests.get(0).getType());
+    Assert.assertEquals(
+        PipeRequestType.TRANSFER_SLICE.getType(), 
transferredRequests.get(0).getType());
     Assert.assertEquals(firstSlice.getOrderId(), secondSlice.getOrderId());
     Assert.assertEquals(firstSlice.getOrderId(), thirdSlice.getOrderId());
     Assert.assertEquals(originalReq.getType(), firstSlice.getOriginReqType());
@@ -144,7 +145,8 @@ public class PipeTransferTrackableHandlerTest {
     handler.transfer(client, originalReq);
 
     Assert.assertEquals(2, transferredRequests.size());
-    Assert.assertEquals(PipeRequestType.TRANSFER_SLICE.getType(), 
transferredRequests.get(0).getType());
+    Assert.assertEquals(
+        PipeRequestType.TRANSFER_SLICE.getType(), 
transferredRequests.get(0).getType());
     Assert.assertEquals(originalReq.getType(), 
transferredRequests.get(1).getType());
     Assert.assertEquals(originalReq.getVersion(), 
transferredRequests.get(1).getVersion());
     Assert.assertArrayEquals(originalReq.getBody(), 
transferredRequests.get(1).getBody());
@@ -187,7 +189,8 @@ public class PipeTransferTrackableHandlerTest {
       super(sink);
     }
 
-    private void transfer(final AsyncPipeDataTransferServiceClient client, 
final TPipeTransferReq req)
+    private void transfer(
+        final AsyncPipeDataTransferServiceClient client, final 
TPipeTransferReq req)
         throws TException {
       tryTransfer(client, req);
     }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
index b7f42295e6c..1ad5d0a855f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
@@ -22,9 +22,7 @@ package org.apache.iotdb.commons.pipe.sink.client;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.ThriftClient;
 import org.apache.iotdb.commons.client.property.ThriftClientProperty;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
-import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq;
+import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferSliceReqBuilder;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -39,15 +37,11 @@ import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.atomic.AtomicInteger;
-
 public class IoTDBSyncClient extends IClientRPCService.Client
     implements ThriftClient, AutoCloseable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBSyncClient.class);
 
-  private static final AtomicInteger SLICE_ORDER_ID_GENERATOR = new 
AtomicInteger(0);
-
   private final String ipAddress;
   private final int port;
   private final TEndPoint endPoint;
@@ -100,9 +94,8 @@ public class IoTDBSyncClient extends IClientRPCService.Client
 
   @Override
   public TPipeTransferResp pipeTransfer(final TPipeTransferReq req) throws 
TException {
-    final int bodySizeLimit = 
PipeConfig.getInstance().getPipeSinkRequestSliceThresholdBytes();
-    if (req.getVersion() != IoTDBSinkRequestVersion.VERSION_1.getVersion()
-        || req.body.limit() < bodySizeLimit) {
+    final int bodySizeLimit = PipeTransferSliceReqBuilder.getBodySizeLimit();
+    if (!PipeTransferSliceReqBuilder.shouldSlice(req, bodySizeLimit)) {
       return super.pipeTransfer(req);
     }
 
@@ -115,23 +108,13 @@ public class IoTDBSyncClient extends 
IClientRPCService.Client
         bodySizeLimit);
 
     try {
-      final int sliceOrderId = SLICE_ORDER_ID_GENERATOR.getAndIncrement();
-      // Slice the buffer to avoid the buffer being too large
-      final int sliceCount =
-          req.body.limit() / bodySizeLimit + (req.body.limit() % bodySizeLimit 
== 0 ? 0 : 1);
+      final int sliceOrderId = PipeTransferSliceReqBuilder.nextSliceOrderId();
+      final int sliceCount = PipeTransferSliceReqBuilder.getSliceCount(req, 
bodySizeLimit);
       for (int i = 0; i < sliceCount; ++i) {
-        final int startIndexInBody = i * bodySizeLimit;
-        final int endIndexInBody = Math.min((i + 1) * bodySizeLimit, 
req.body.limit());
         final TPipeTransferResp sliceResp =
             super.pipeTransfer(
-                PipeTransferSliceReq.toTPipeTransferReq(
-                    sliceOrderId,
-                    req.getType(),
-                    i,
-                    sliceCount,
-                    req.body.duplicate(),
-                    startIndexInBody,
-                    endIndexInBody));
+                PipeTransferSliceReqBuilder.buildSliceReq(
+                    req, sliceOrderId, i, sliceCount, bodySizeLimit));
 
         if (i == sliceCount - 1) {
           return sliceResp;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilder.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilder.java
new file mode 100644
index 00000000000..b108d6f1d3a
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilder.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.sink.payload.thrift.common;
+
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
+import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public final class PipeTransferSliceReqBuilder {
+
+  private static final AtomicInteger SLICE_ORDER_ID_GENERATOR = new 
AtomicInteger(0);
+
+  private PipeTransferSliceReqBuilder() {
+    // Utility class
+  }
+
+  public static int getBodySizeLimit() {
+    return PipeConfig.getInstance().getPipeSinkRequestSliceThresholdBytes();
+  }
+
+  public static boolean shouldSlice(final TPipeTransferReq req, final int 
bodySizeLimit) {
+    return req.getVersion() == IoTDBSinkRequestVersion.VERSION_1.getVersion()
+        && req.body.limit() >= bodySizeLimit;
+  }
+
+  public static int nextSliceOrderId() {
+    return SLICE_ORDER_ID_GENERATOR.getAndIncrement();
+  }
+
+  public static int getSliceCount(final TPipeTransferReq req, final int 
bodySizeLimit) {
+    return req.body.limit() / bodySizeLimit + (req.body.limit() % 
bodySizeLimit == 0 ? 0 : 1);
+  }
+
+  public static PipeTransferSliceReq buildSliceReq(
+      final TPipeTransferReq originalReq,
+      final int sliceOrderId,
+      final int sliceIndex,
+      final int sliceCount,
+      final int bodySizeLimit)
+      throws IOException {
+    final int startIndexInBody = sliceIndex * bodySizeLimit;
+    final int endIndexInBody = Math.min((sliceIndex + 1) * bodySizeLimit, 
originalReq.body.limit());
+    return PipeTransferSliceReq.toTPipeTransferReq(
+        sliceOrderId,
+        originalReq.getType(),
+        sliceIndex,
+        sliceCount,
+        originalReq.body.duplicate(),
+        startIndexInBody,
+        endIndexInBody);
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java
new file mode 100644
index 00000000000..290ce397980
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.sink.payload.thrift.common;
+
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
+import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+public class PipeTransferSliceReqBuilderTest {
+
+  private final CommonConfig commonConfig = 
CommonDescriptor.getInstance().getConfig();
+
+  private int originalRequestSliceThresholdBytes;
+
+  @Before
+  public void setUp() {
+    originalRequestSliceThresholdBytes = 
commonConfig.getPipeSinkRequestSliceThresholdBytes();
+    commonConfig.setPipeSinkRequestSliceThresholdBytes(4);
+  }
+
+  @After
+  public void tearDown() {
+    
commonConfig.setPipeSinkRequestSliceThresholdBytes(originalRequestSliceThresholdBytes);
+  }
+
+  @Test
+  public void testBuildSliceReq() throws Exception {
+    final TPipeTransferReq req = 
createReq(IoTDBSinkRequestVersion.VERSION_1.getVersion(), 10);
+    final int bodySizeLimit = PipeTransferSliceReqBuilder.getBodySizeLimit();
+
+    Assert.assertTrue(PipeTransferSliceReqBuilder.shouldSlice(req, 
bodySizeLimit));
+    Assert.assertEquals(3, PipeTransferSliceReqBuilder.getSliceCount(req, 
bodySizeLimit));
+
+    final PipeTransferSliceReq firstSlice =
+        PipeTransferSliceReqBuilder.buildSliceReq(req, 123, 0, 3, 
bodySizeLimit);
+    final PipeTransferSliceReq secondSlice =
+        PipeTransferSliceReqBuilder.buildSliceReq(req, 123, 1, 3, 
bodySizeLimit);
+    final PipeTransferSliceReq thirdSlice =
+        PipeTransferSliceReqBuilder.buildSliceReq(req, 123, 2, 3, 
bodySizeLimit);
+
+    Assert.assertArrayEquals(new byte[] {0, 1, 2, 3}, 
firstSlice.getSliceBody());
+    Assert.assertArrayEquals(new byte[] {4, 5, 6, 7}, 
secondSlice.getSliceBody());
+    Assert.assertArrayEquals(new byte[] {8, 9}, thirdSlice.getSliceBody());
+    Assert.assertEquals(0, firstSlice.getSliceIndex());
+    Assert.assertEquals(1, secondSlice.getSliceIndex());
+    Assert.assertEquals(2, thirdSlice.getSliceIndex());
+    Assert.assertEquals(3, firstSlice.getSliceCount());
+    Assert.assertEquals(req.getType(), firstSlice.getOriginReqType());
+    Assert.assertEquals(10, firstSlice.getOriginBodySize());
+  }
+
+  @Test
+  public void testShouldSliceOnlyForVersion1RequestsAboveThreshold() {
+    final int bodySizeLimit = PipeTransferSliceReqBuilder.getBodySizeLimit();
+
+    Assert.assertFalse(
+        PipeTransferSliceReqBuilder.shouldSlice(
+            createReq(IoTDBSinkRequestVersion.VERSION_1.getVersion(), 3), 
bodySizeLimit));
+    Assert.assertFalse(
+        PipeTransferSliceReqBuilder.shouldSlice(
+            createReq((byte) (IoTDBSinkRequestVersion.VERSION_1.getVersion() + 
1), 10),
+            bodySizeLimit));
+    Assert.assertTrue(
+        PipeTransferSliceReqBuilder.shouldSlice(
+            createReq(IoTDBSinkRequestVersion.VERSION_1.getVersion(), 4), 
bodySizeLimit));
+  }
+
+  private static TPipeTransferReq createReq(final byte version, final int 
bodySize) {
+    final byte[] body = new byte[bodySize];
+    for (int i = 0; i < body.length; ++i) {
+      body[i] = (byte) i;
+    }
+
+    final TPipeTransferReq req = new TPipeTransferReq();
+    req.version = version;
+    req.type = (short) 123;
+    req.body = ByteBuffer.wrap(body);
+    return req;
+  }
+}

Reply via email to