This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch make-sure-no-frame-size-problem
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to 
refs/heads/make-sure-no-frame-size-problem by this push:
     new 28576967de1 3rd commit
28576967de1 is described below

commit 28576967de1e02b265b194601e8db6b24ade7057
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sat Aug 24 04:27:51 2024 +0800

    3rd commit
---
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   1 +
 .../protocol/thrift/IoTDBDataNodeReceiver.java     |  28 ++++-
 .../pipe/connector/client/IoTDBSyncClient.java     |   7 +-
 .../thrift/common/PipeTransferSliceReqHandler.java | 123 +++++++++++++++++++++
 .../thrift/request/PipeTransferSliceReq.java       |  29 +++--
 5 files changed, 168 insertions(+), 20 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 53aa7125775..c16f2878850 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -246,6 +246,7 @@ public enum TSStatusCode {
   PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION(1809),
   PIPE_RECEIVER_USER_CONFLICT_EXCEPTION(1810),
   PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED(1811),
+  PIPE_TRANSFER_SLICE_OUT_OF_ORDER(1812),
 
   // Subscription
   SUBSCRIPTION_VERSION_ERROR(1900),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 8f816df2e26..5c3910a77ac 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.pipe.connector.PipeReceiverStatusHandler;
 import 
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
+import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.common.PipeTransferSliceReqHandler;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeRequestType;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV1;
@@ -134,6 +135,8 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
   private static final AtomicLong CONFIG_RECEIVER_ID_GENERATOR = new 
AtomicLong(0);
   protected final AtomicReference<String> configReceiverId = new 
AtomicReference<>();
 
+  private final PipeTransferSliceReqHandler sliceReqHandler = new 
PipeTransferSliceReqHandler();
+
   static {
     try {
       folderManager =
@@ -152,7 +155,11 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
       final long startTime = System.nanoTime();
       final short rawRequestType = req.getType();
       if (PipeRequestType.isValidatedRequestType(rawRequestType)) {
-        switch (PipeRequestType.valueOf(rawRequestType)) {
+        final PipeRequestType requestType = 
PipeRequestType.valueOf(rawRequestType);
+        if (requestType != PipeRequestType.TRANSFER_SLICE) {
+          sliceReqHandler.clear();
+        }
+        switch (requestType) {
           case HANDSHAKE_DATANODE_V1:
             {
               try {
@@ -513,8 +520,23 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
     return configReceiverId.get();
   }
 
-  private TPipeTransferResp handleTransferSlice(PipeTransferSliceReq 
pipeTransferSliceReq) {
-    return null;
+  private TPipeTransferResp handleTransferSlice(final PipeTransferSliceReq 
pipeTransferSliceReq) {
+    final boolean isOutOfOrder = 
sliceReqHandler.receiveSlice(pipeTransferSliceReq);
+    if (isOutOfOrder) {
+      return new TPipeTransferResp(
+          RpcUtils.getStatus(
+              TSStatusCode.PIPE_TRANSFER_SLICE_OUT_OF_ORDER,
+              "Slice request is out of order, please check the request 
sequence."));
+    }
+    final Optional<TPipeTransferReq> req = sliceReqHandler.makeReqIfComplete();
+    if (!req.isPresent()) {
+      return new TPipeTransferResp(
+          RpcUtils.getStatus(
+              TSStatusCode.SUCCESS_STATUS,
+              "Slice received, waiting for more slices to complete the 
request."));
+    }
+    // sliceReqHandler will be cleared in the receive(req) method
+    return receive(req.get());
   }
 
   /**
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
index 4561a5b4bd1..45743e37cc4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
@@ -113,7 +113,12 @@ public class IoTDBSyncClient extends 
IClientRPCService.Client
         final TPipeTransferResp sliceResp =
             super.pipeTransfer(
                 PipeTransferSliceReq.toTPipeTransferReq(
-                    i, sliceCount, req.body.duplicate(), startIndexInBody, 
endIndexInBody));
+                    req.getType(),
+                    i,
+                    sliceCount,
+                    req.body.duplicate(),
+                    startIndexInBody,
+                    endIndexInBody));
 
         if (i == sliceCount - 1) {
           return sliceResp;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferSliceReqHandler.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferSliceReqHandler.java
new file mode 100644
index 00000000000..18c5be1ee53
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferSliceReqHandler.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.connector.payload.thrift.common;
+
+import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
+import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferSliceReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+public class PipeTransferSliceReqHandler {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTransferSliceReqHandler.class);
+
+  private short originReqType = -1;
+  private int originBodySize = -1;
+
+  private int sliceCount = -1;
+  private final List<byte[]> sliceBodies = new ArrayList<>();
+
+  public boolean receiveSlice(final PipeTransferSliceReq req) {
+    if (originReqType == -1 || originBodySize == -1 || sliceCount == -1 || 
sliceBodies.isEmpty()) {
+      if (originReqType == -1
+          && originBodySize == -1
+          && sliceCount == -1
+          && sliceBodies.isEmpty()) {
+        originReqType = req.getOriginReqType();
+        originBodySize = req.getOriginBodySize();
+        sliceCount = req.getSliceCount();
+      } else {
+        LOGGER.warn(
+            "Invalid state: originReqType={}, originBodySize={}, 
sliceCount={}, sliceBodies.size={}",
+            originReqType,
+            originBodySize,
+            sliceCount,
+            sliceBodies.size());
+        clear();
+        return false;
+      }
+    }
+
+    if (originReqType != req.getOriginReqType()) {
+      LOGGER.warn(
+          "Origin request type mismatch: expected {}, actual {}",
+          originReqType,
+          req.getOriginReqType());
+      clear();
+      return false;
+    }
+    if (originBodySize != req.getOriginBodySize()) {
+      LOGGER.warn(
+          "Origin body size mismatch: expected {}, actual {}",
+          originBodySize,
+          req.getOriginBodySize());
+      clear();
+      return false;
+    }
+    if (sliceCount != req.getSliceCount()) {
+      LOGGER.warn("Slice count mismatch: expected {}, actual {}", sliceCount, 
req.getSliceCount());
+      clear();
+      return false;
+    }
+
+    sliceBodies.add(req.getSliceBody());
+    if (sliceBodies.size() != req.getSliceIndex() + 1) {
+      LOGGER.warn(
+          "Invalid slice index: expected {}, actual {}",
+          sliceBodies.size() - 1,
+          req.getSliceIndex());
+      clear();
+      return false;
+    }
+
+    return true;
+  }
+
+  public Optional<TPipeTransferReq> makeReqIfComplete() {
+    if (sliceBodies.size() != sliceCount) {
+      return Optional.empty();
+    }
+
+    final TPipeTransferReq req = new TPipeTransferReq();
+    req.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
+    req.type = originReqType;
+
+    final ByteBuffer body = ByteBuffer.allocate(originBodySize);
+    sliceBodies.forEach(body::put);
+    body.flip();
+    req.body = body;
+
+    return Optional.of(req);
+  }
+
+  public void clear() {
+    originReqType = -1;
+    originBodySize = -1;
+    sliceCount = -1;
+    sliceBodies.clear();
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferSliceReq.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferSliceReq.java
index f26e2e7337c..78928578493 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferSliceReq.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferSliceReq.java
@@ -24,8 +24,6 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.PublicBAOS;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -35,22 +33,20 @@ import java.util.Objects;
 
 public class PipeTransferSliceReq extends TPipeTransferReq {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTransferSliceReq.class);
-
+  private transient short originReqType;
   private transient int originBodySize;
 
-  private transient int sliceSize;
   private transient byte[] sliceBody;
 
   private transient int sliceIndex;
   private transient int sliceCount;
 
-  public int getOriginBodySize() {
-    return originBodySize;
+  public short getOriginReqType() {
+    return originReqType;
   }
 
-  public int getSliceSize() {
-    return sliceSize;
+  public int getOriginBodySize() {
+    return originBodySize;
   }
 
   public byte[] getSliceBody() {
@@ -68,6 +64,7 @@ public class PipeTransferSliceReq extends TPipeTransferReq {
   /////////////////////////////// Thrift ///////////////////////////////
 
   public static PipeTransferSliceReq toTPipeTransferReq(
+      final short originReqType,
       final int sliceIndex,
       final int sliceCount,
       final ByteBuffer duplicatedOriginBody,
@@ -76,10 +73,10 @@ public class PipeTransferSliceReq extends TPipeTransferReq {
       throws IOException {
     final PipeTransferSliceReq sliceReq = new PipeTransferSliceReq();
 
+    sliceReq.originReqType = originReqType;
     sliceReq.originBodySize = duplicatedOriginBody.limit();
 
-    sliceReq.sliceSize = endIndexInBody - startIndexInBody;
-    sliceReq.sliceBody = new byte[sliceReq.sliceSize];
+    sliceReq.sliceBody = new byte[endIndexInBody - startIndexInBody];
     duplicatedOriginBody.position(startIndexInBody);
     duplicatedOriginBody.get(sliceReq.sliceBody);
 
@@ -90,9 +87,9 @@ public class PipeTransferSliceReq extends TPipeTransferReq {
     sliceReq.type = PipeRequestType.TRANSFER_SLICE.getType();
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(sliceReq.originReqType, outputStream);
       ReadWriteIOUtils.write(sliceReq.originBodySize, outputStream);
 
-      ReadWriteIOUtils.write(sliceReq.sliceSize, outputStream);
       ReadWriteIOUtils.write(new Binary(sliceReq.sliceBody), outputStream);
 
       ReadWriteIOUtils.write(sliceReq.sliceIndex, outputStream);
@@ -108,9 +105,9 @@ public class PipeTransferSliceReq extends TPipeTransferReq {
   public static PipeTransferSliceReq fromTPipeTransferReq(final 
TPipeTransferReq transferReq) {
     final PipeTransferSliceReq sliceReq = new PipeTransferSliceReq();
 
+    sliceReq.originReqType = ReadWriteIOUtils.readShort(transferReq.body);
     sliceReq.originBodySize = ReadWriteIOUtils.readInt(transferReq.body);
 
-    sliceReq.sliceSize = ReadWriteIOUtils.readInt(transferReq.body);
     sliceReq.sliceBody = 
ReadWriteIOUtils.readBinary(transferReq.body).getValues();
 
     sliceReq.sliceIndex = ReadWriteIOUtils.readInt(transferReq.body);
@@ -134,8 +131,8 @@ public class PipeTransferSliceReq extends TPipeTransferReq {
       return false;
     }
     final PipeTransferSliceReq that = (PipeTransferSliceReq) obj;
-    return Objects.equals(originBodySize, that.originBodySize)
-        && Objects.equals(sliceSize, that.sliceSize)
+    return Objects.equals(originReqType, that.originReqType)
+        && Objects.equals(originBodySize, that.originBodySize)
         && Arrays.equals(sliceBody, that.sliceBody)
         && Objects.equals(sliceIndex, that.sliceIndex)
         && Objects.equals(sliceCount, that.sliceCount)
@@ -147,8 +144,8 @@ public class PipeTransferSliceReq extends TPipeTransferReq {
   @Override
   public int hashCode() {
     return Objects.hash(
+        originReqType,
         originBodySize,
-        sliceSize,
         Arrays.hashCode(sliceBody),
         sliceIndex,
         sliceCount,

Reply via email to