This is an automated email from the ASF dual-hosted git repository.
Caideyipi pushed a commit to branch slice
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/slice by this push:
new 97a405d1df7 chew
97a405d1df7 is described below
commit 97a405d1df7421437a6e2d98070f5dad1a3fdcbf
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 14 17:01:04 2026 +0800
chew
---
.../handler/PipeTransferTrackableHandler.java | 28 ++----
.../handler/PipeTransferTrackableHandlerTest.java | 9 +-
.../commons/pipe/sink/client/IoTDBSyncClient.java | 31 ++----
.../thrift/common/PipeTransferSliceReqBuilder.java | 73 ++++++++++++++
.../common/PipeTransferSliceReqBuilderTest.java | 106 +++++++++++++++++++++
5 files changed, 199 insertions(+), 48 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
index 51d6135fc49..a0e6ad73fe7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
@@ -21,9 +21,7 @@ package
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler;
import org.apache.iotdb.commons.client.ThriftClient;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
-import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq;
+import
org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferSliceReqBuilder;
import
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -37,12 +35,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
-import java.util.concurrent.atomic.AtomicInteger;
public abstract class PipeTransferTrackableHandler
implements AsyncMethodCallback<TPipeTransferResp>, AutoCloseable {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTransferTsFileHandler.class);
- private static final AtomicInteger SLICE_ORDER_ID_GENERATOR = new
AtomicInteger(0);
protected final IoTDBDataRegionAsyncSink sink;
protected volatile AsyncPipeDataTransferServiceClient client;
@@ -137,9 +133,8 @@ public abstract class PipeTransferTrackableHandler
protected final void transferWithOptionalRequestSlicing(
final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq
req)
throws TException {
- final int bodySizeLimit =
PipeConfig.getInstance().getPipeSinkRequestSliceThresholdBytes();
- if (req.getVersion() != IoTDBSinkRequestVersion.VERSION_1.getVersion()
- || req.body.limit() < bodySizeLimit) {
+ final int bodySizeLimit = PipeTransferSliceReqBuilder.getBodySizeLimit();
+ if (!PipeTransferSliceReqBuilder.shouldSlice(req, bodySizeLimit)) {
client.pipeTransfer(req, this);
return;
}
@@ -152,15 +147,14 @@ public abstract class PipeTransferTrackableHandler
req.body.limit(),
bodySizeLimit);
- final int sliceCount =
- req.body.limit() / bodySizeLimit + (req.body.limit() % bodySizeLimit
== 0 ? 0 : 1);
+ final int sliceCount = PipeTransferSliceReqBuilder.getSliceCount(req,
bodySizeLimit);
final boolean shouldReturnSelf = client.shouldReturnSelf();
try {
transferSlicedRequest(
client,
req,
shouldReturnSelf,
- SLICE_ORDER_ID_GENERATOR.getAndIncrement(),
+ PipeTransferSliceReqBuilder.nextSliceOrderId(),
0,
sliceCount,
bodySizeLimit);
@@ -180,18 +174,10 @@ public abstract class PipeTransferTrackableHandler
final int sliceCount,
final int bodySizeLimit)
throws Exception {
- final int startIndexInBody = sliceIndex * bodySizeLimit;
- final int endIndexInBody = Math.min((sliceIndex + 1) * bodySizeLimit,
originalReq.body.limit());
client.setShouldReturnSelf(shouldReturnSelf && sliceIndex == sliceCount -
1);
client.pipeTransfer(
- PipeTransferSliceReq.toTPipeTransferReq(
- sliceOrderId,
- originalReq.getType(),
- sliceIndex,
- sliceCount,
- originalReq.body.duplicate(),
- startIndexInBody,
- endIndexInBody),
+ PipeTransferSliceReqBuilder.buildSliceReq(
+ originalReq, sliceOrderId, sliceIndex, sliceCount, bodySizeLimit),
new AsyncMethodCallback<TPipeTransferResp>() {
@Override
public void onComplete(final TPipeTransferResp response) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java
index 7d62d22a17a..60b69235085 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java
@@ -97,7 +97,8 @@ public class PipeTransferTrackableHandlerTest {
final PipeTransferSliceReq thirdSlice =
PipeTransferSliceReq.fromTPipeTransferReq(transferredRequests.get(2));
- Assert.assertEquals(PipeRequestType.TRANSFER_SLICE.getType(),
transferredRequests.get(0).getType());
+ Assert.assertEquals(
+ PipeRequestType.TRANSFER_SLICE.getType(),
transferredRequests.get(0).getType());
Assert.assertEquals(firstSlice.getOrderId(), secondSlice.getOrderId());
Assert.assertEquals(firstSlice.getOrderId(), thirdSlice.getOrderId());
Assert.assertEquals(originalReq.getType(), firstSlice.getOriginReqType());
@@ -144,7 +145,8 @@ public class PipeTransferTrackableHandlerTest {
handler.transfer(client, originalReq);
Assert.assertEquals(2, transferredRequests.size());
- Assert.assertEquals(PipeRequestType.TRANSFER_SLICE.getType(),
transferredRequests.get(0).getType());
+ Assert.assertEquals(
+ PipeRequestType.TRANSFER_SLICE.getType(),
transferredRequests.get(0).getType());
Assert.assertEquals(originalReq.getType(),
transferredRequests.get(1).getType());
Assert.assertEquals(originalReq.getVersion(),
transferredRequests.get(1).getVersion());
Assert.assertArrayEquals(originalReq.getBody(),
transferredRequests.get(1).getBody());
@@ -187,7 +189,8 @@ public class PipeTransferTrackableHandlerTest {
super(sink);
}
- private void transfer(final AsyncPipeDataTransferServiceClient client,
final TPipeTransferReq req)
+ private void transfer(
+ final AsyncPipeDataTransferServiceClient client, final
TPipeTransferReq req)
throws TException {
tryTransfer(client, req);
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
index b7f42295e6c..1ad5d0a855f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
@@ -22,9 +22,7 @@ package org.apache.iotdb.commons.pipe.sink.client;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.ThriftClient;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
-import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq;
+import
org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferSliceReqBuilder;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -39,15 +37,11 @@ import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-
public class IoTDBSyncClient extends IClientRPCService.Client
implements ThriftClient, AutoCloseable {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBSyncClient.class);
- private static final AtomicInteger SLICE_ORDER_ID_GENERATOR = new
AtomicInteger(0);
-
private final String ipAddress;
private final int port;
private final TEndPoint endPoint;
@@ -100,9 +94,8 @@ public class IoTDBSyncClient extends IClientRPCService.Client
@Override
public TPipeTransferResp pipeTransfer(final TPipeTransferReq req) throws
TException {
- final int bodySizeLimit =
PipeConfig.getInstance().getPipeSinkRequestSliceThresholdBytes();
- if (req.getVersion() != IoTDBSinkRequestVersion.VERSION_1.getVersion()
- || req.body.limit() < bodySizeLimit) {
+ final int bodySizeLimit = PipeTransferSliceReqBuilder.getBodySizeLimit();
+ if (!PipeTransferSliceReqBuilder.shouldSlice(req, bodySizeLimit)) {
return super.pipeTransfer(req);
}
@@ -115,23 +108,13 @@ public class IoTDBSyncClient extends
IClientRPCService.Client
bodySizeLimit);
try {
- final int sliceOrderId = SLICE_ORDER_ID_GENERATOR.getAndIncrement();
- // Slice the buffer to avoid the buffer being too large
- final int sliceCount =
- req.body.limit() / bodySizeLimit + (req.body.limit() % bodySizeLimit
== 0 ? 0 : 1);
+ final int sliceOrderId = PipeTransferSliceReqBuilder.nextSliceOrderId();
+ final int sliceCount = PipeTransferSliceReqBuilder.getSliceCount(req,
bodySizeLimit);
for (int i = 0; i < sliceCount; ++i) {
- final int startIndexInBody = i * bodySizeLimit;
- final int endIndexInBody = Math.min((i + 1) * bodySizeLimit,
req.body.limit());
final TPipeTransferResp sliceResp =
super.pipeTransfer(
- PipeTransferSliceReq.toTPipeTransferReq(
- sliceOrderId,
- req.getType(),
- i,
- sliceCount,
- req.body.duplicate(),
- startIndexInBody,
- endIndexInBody));
+ PipeTransferSliceReqBuilder.buildSliceReq(
+ req, sliceOrderId, i, sliceCount, bodySizeLimit));
if (i == sliceCount - 1) {
return sliceResp;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilder.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilder.java
new file mode 100644
index 00000000000..b108d6f1d3a
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilder.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.sink.payload.thrift.common;
+
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
+import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public final class PipeTransferSliceReqBuilder {
+
+ private static final AtomicInteger SLICE_ORDER_ID_GENERATOR = new
AtomicInteger(0);
+
+ private PipeTransferSliceReqBuilder() {
+ // Utility class
+ }
+
+ public static int getBodySizeLimit() {
+ return PipeConfig.getInstance().getPipeSinkRequestSliceThresholdBytes();
+ }
+
+ public static boolean shouldSlice(final TPipeTransferReq req, final int
bodySizeLimit) {
+ return req.getVersion() == IoTDBSinkRequestVersion.VERSION_1.getVersion()
+ && req.body.limit() >= bodySizeLimit;
+ }
+
+ public static int nextSliceOrderId() {
+ return SLICE_ORDER_ID_GENERATOR.getAndIncrement();
+ }
+
+ public static int getSliceCount(final TPipeTransferReq req, final int
bodySizeLimit) {
+ return req.body.limit() / bodySizeLimit + (req.body.limit() %
bodySizeLimit == 0 ? 0 : 1);
+ }
+
+ public static PipeTransferSliceReq buildSliceReq(
+ final TPipeTransferReq originalReq,
+ final int sliceOrderId,
+ final int sliceIndex,
+ final int sliceCount,
+ final int bodySizeLimit)
+ throws IOException {
+ final int startIndexInBody = sliceIndex * bodySizeLimit;
+ final int endIndexInBody = Math.min((sliceIndex + 1) * bodySizeLimit,
originalReq.body.limit());
+ return PipeTransferSliceReq.toTPipeTransferReq(
+ sliceOrderId,
+ originalReq.getType(),
+ sliceIndex,
+ sliceCount,
+ originalReq.body.duplicate(),
+ startIndexInBody,
+ endIndexInBody);
+ }
+}
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java
new file mode 100644
index 00000000000..290ce397980
--- /dev/null
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.sink.payload.thrift.common;
+
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
+import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+public class PipeTransferSliceReqBuilderTest {
+
+ private final CommonConfig commonConfig =
CommonDescriptor.getInstance().getConfig();
+
+ private int originalRequestSliceThresholdBytes;
+
+ @Before
+ public void setUp() {
+ originalRequestSliceThresholdBytes =
commonConfig.getPipeSinkRequestSliceThresholdBytes();
+ commonConfig.setPipeSinkRequestSliceThresholdBytes(4);
+ }
+
+ @After
+ public void tearDown() {
+
commonConfig.setPipeSinkRequestSliceThresholdBytes(originalRequestSliceThresholdBytes);
+ }
+
+ @Test
+ public void testBuildSliceReq() throws Exception {
+ final TPipeTransferReq req =
createReq(IoTDBSinkRequestVersion.VERSION_1.getVersion(), 10);
+ final int bodySizeLimit = PipeTransferSliceReqBuilder.getBodySizeLimit();
+
+ Assert.assertTrue(PipeTransferSliceReqBuilder.shouldSlice(req,
bodySizeLimit));
+ Assert.assertEquals(3, PipeTransferSliceReqBuilder.getSliceCount(req,
bodySizeLimit));
+
+ final PipeTransferSliceReq firstSlice =
+ PipeTransferSliceReqBuilder.buildSliceReq(req, 123, 0, 3,
bodySizeLimit);
+ final PipeTransferSliceReq secondSlice =
+ PipeTransferSliceReqBuilder.buildSliceReq(req, 123, 1, 3,
bodySizeLimit);
+ final PipeTransferSliceReq thirdSlice =
+ PipeTransferSliceReqBuilder.buildSliceReq(req, 123, 2, 3,
bodySizeLimit);
+
+ Assert.assertArrayEquals(new byte[] {0, 1, 2, 3},
firstSlice.getSliceBody());
+ Assert.assertArrayEquals(new byte[] {4, 5, 6, 7},
secondSlice.getSliceBody());
+ Assert.assertArrayEquals(new byte[] {8, 9}, thirdSlice.getSliceBody());
+ Assert.assertEquals(0, firstSlice.getSliceIndex());
+ Assert.assertEquals(1, secondSlice.getSliceIndex());
+ Assert.assertEquals(2, thirdSlice.getSliceIndex());
+ Assert.assertEquals(3, firstSlice.getSliceCount());
+ Assert.assertEquals(req.getType(), firstSlice.getOriginReqType());
+ Assert.assertEquals(10, firstSlice.getOriginBodySize());
+ }
+
+ @Test
+ public void testShouldSliceOnlyForVersion1RequestsAboveThreshold() {
+ final int bodySizeLimit = PipeTransferSliceReqBuilder.getBodySizeLimit();
+
+ Assert.assertFalse(
+ PipeTransferSliceReqBuilder.shouldSlice(
+ createReq(IoTDBSinkRequestVersion.VERSION_1.getVersion(), 3),
bodySizeLimit));
+ Assert.assertFalse(
+ PipeTransferSliceReqBuilder.shouldSlice(
+ createReq((byte) (IoTDBSinkRequestVersion.VERSION_1.getVersion() +
1), 10),
+ bodySizeLimit));
+ Assert.assertTrue(
+ PipeTransferSliceReqBuilder.shouldSlice(
+ createReq(IoTDBSinkRequestVersion.VERSION_1.getVersion(), 4),
bodySizeLimit));
+ }
+
+ private static TPipeTransferReq createReq(final byte version, final int
bodySize) {
+ final byte[] body = new byte[bodySize];
+ for (int i = 0; i < body.length; ++i) {
+ body[i] = (byte) i;
+ }
+
+ final TPipeTransferReq req = new TPipeTransferReq();
+ req.version = version;
+ req.type = (short) 123;
+ req.body = ByteBuffer.wrap(body);
+ return req;
+ }
+}