This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch make-sure-no-frame-size-problem
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to 
refs/heads/make-sure-no-frame-size-problem by this push:
     new ec5381be5e8 4th
ec5381be5e8 is described below

commit ec5381be5e8b6f82ac18674e3f37b50c0cd2c2e7
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sat Aug 24 04:39:36 2024 +0800

    4th
---
 .../pipe/connector/client/IoTDBSyncClient.java      |  6 ++++++
 .../thrift/common/PipeTransferSliceReqHandler.java  | 21 ++++++++++++++++++---
 .../thrift/request/PipeTransferSliceReq.java        | 17 ++++++++++++++++-
 3 files changed, 40 insertions(+), 4 deletions(-)

diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
index 45743e37cc4..70b2931d38a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
@@ -39,11 +39,15 @@ import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 public class IoTDBSyncClient extends IClientRPCService.Client
     implements ThriftClient, AutoCloseable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBSyncClient.class);
 
+  private static final AtomicInteger SLICE_ORDER_ID_GENERATOR = new 
AtomicInteger(0);
+
   private final String ipAddress;
   private final int port;
   private final TEndPoint endPoint;
@@ -104,6 +108,7 @@ public class IoTDBSyncClient extends 
IClientRPCService.Client
     }
 
     try {
+      final int sliceOrderId = SLICE_ORDER_ID_GENERATOR.getAndIncrement();
       // Slice the buffer to avoid the buffer being too large
       final int sliceCount =
           req.body.limit() / bodySizeLimit + (req.body.limit() % bodySizeLimit 
== 0 ? 0 : 1);
@@ -113,6 +118,7 @@ public class IoTDBSyncClient extends 
IClientRPCService.Client
         final TPipeTransferResp sliceResp =
             super.pipeTransfer(
                 PipeTransferSliceReq.toTPipeTransferReq(
+                    sliceOrderId,
                     req.getType(),
                     i,
                     sliceCount,
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferSliceReqHandler.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferSliceReqHandler.java
index 18c5be1ee53..2fa94423f13 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferSliceReqHandler.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferSliceReqHandler.java
@@ -35,6 +35,8 @@ public class PipeTransferSliceReqHandler {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTransferSliceReqHandler.class);
 
+  private int orderId = -1;
+
   private short originReqType = -1;
   private int originBodySize = -1;
 
@@ -42,17 +44,24 @@ public class PipeTransferSliceReqHandler {
   private final List<byte[]> sliceBodies = new ArrayList<>();
 
   public boolean receiveSlice(final PipeTransferSliceReq req) {
-    if (originReqType == -1 || originBodySize == -1 || sliceCount == -1 || 
sliceBodies.isEmpty()) {
-      if (originReqType == -1
+    if (orderId == -1
+        || originReqType == -1
+        || originBodySize == -1
+        || sliceCount == -1
+        || sliceBodies.isEmpty()) {
+      if (orderId == -1
+          && originReqType == -1
           && originBodySize == -1
           && sliceCount == -1
           && sliceBodies.isEmpty()) {
+        orderId = req.getOrderId();
         originReqType = req.getOriginReqType();
         originBodySize = req.getOriginBodySize();
         sliceCount = req.getSliceCount();
       } else {
         LOGGER.warn(
-            "Invalid state: originReqType={}, originBodySize={}, 
sliceCount={}, sliceBodies.size={}",
+            "Invalid state: orderId={}, originReqType={}, originBodySize={}, 
sliceCount={}, sliceBodies.size={}",
+            orderId,
             originReqType,
             originBodySize,
             sliceCount,
@@ -62,6 +71,11 @@ public class PipeTransferSliceReqHandler {
       }
     }
 
+    if (orderId != req.getOrderId()) {
+      LOGGER.warn("Order ID mismatch: expected {}, actual {}", orderId, 
req.getOrderId());
+      clear();
+      return false;
+    }
     if (originReqType != req.getOriginReqType()) {
       LOGGER.warn(
           "Origin request type mismatch: expected {}, actual {}",
@@ -115,6 +129,7 @@ public class PipeTransferSliceReqHandler {
   }
 
   public void clear() {
+    orderId = -1;
     originReqType = -1;
     originBodySize = -1;
     sliceCount = -1;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferSliceReq.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferSliceReq.java
index 78928578493..4df6008400d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferSliceReq.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferSliceReq.java
@@ -33,6 +33,8 @@ import java.util.Objects;
 
 public class PipeTransferSliceReq extends TPipeTransferReq {
 
+  private transient int orderId;
+
   private transient short originReqType;
   private transient int originBodySize;
 
@@ -41,6 +43,10 @@ public class PipeTransferSliceReq extends TPipeTransferReq {
   private transient int sliceIndex;
   private transient int sliceCount;
 
+  public int getOrderId() {
+    return orderId;
+  }
+
   public short getOriginReqType() {
     return originReqType;
   }
@@ -64,6 +70,7 @@ public class PipeTransferSliceReq extends TPipeTransferReq {
   /////////////////////////////// Thrift ///////////////////////////////
 
   public static PipeTransferSliceReq toTPipeTransferReq(
+      final int orderId,
       final short originReqType,
       final int sliceIndex,
       final int sliceCount,
@@ -73,6 +80,8 @@ public class PipeTransferSliceReq extends TPipeTransferReq {
       throws IOException {
     final PipeTransferSliceReq sliceReq = new PipeTransferSliceReq();
 
+    sliceReq.orderId = orderId;
+
     sliceReq.originReqType = originReqType;
     sliceReq.originBodySize = duplicatedOriginBody.limit();
 
@@ -87,6 +96,8 @@ public class PipeTransferSliceReq extends TPipeTransferReq {
     sliceReq.type = PipeRequestType.TRANSFER_SLICE.getType();
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(sliceReq.orderId, outputStream);
+
       ReadWriteIOUtils.write(sliceReq.originReqType, outputStream);
       ReadWriteIOUtils.write(sliceReq.originBodySize, outputStream);
 
@@ -105,6 +116,8 @@ public class PipeTransferSliceReq extends TPipeTransferReq {
   public static PipeTransferSliceReq fromTPipeTransferReq(final 
TPipeTransferReq transferReq) {
     final PipeTransferSliceReq sliceReq = new PipeTransferSliceReq();
 
+    sliceReq.orderId = ReadWriteIOUtils.readInt(transferReq.body);
+
     sliceReq.originReqType = ReadWriteIOUtils.readShort(transferReq.body);
     sliceReq.originBodySize = ReadWriteIOUtils.readInt(transferReq.body);
 
@@ -131,7 +144,8 @@ public class PipeTransferSliceReq extends TPipeTransferReq {
       return false;
     }
     final PipeTransferSliceReq that = (PipeTransferSliceReq) obj;
-    return Objects.equals(originReqType, that.originReqType)
+    return Objects.equals(orderId, that.orderId)
+        && Objects.equals(originReqType, that.originReqType)
         && Objects.equals(originBodySize, that.originBodySize)
         && Arrays.equals(sliceBody, that.sliceBody)
         && Objects.equals(sliceIndex, that.sliceIndex)
@@ -144,6 +158,7 @@ public class PipeTransferSliceReq extends TPipeTransferReq {
   @Override
   public int hashCode() {
     return Objects.hash(
+        orderId,
         originReqType,
         originBodySize,
         Arrays.hashCode(sliceBody),

Reply via email to