This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch make-sure-no-frame-size-problem
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c88e1571f2ad681b7b33a2448ddf9f0c207b864f
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Aug 23 20:52:09 2024 +0800

    1st commit
---
 .../pipe/connector/client/IoTDBSyncClient.java     |  65 +++++++++
 .../payload/thrift/request/PipeRequestType.java    |   3 +
 .../thrift/request/PipeTransferSliceReq.java       | 159 +++++++++++++++++++++
 3 files changed, 227 insertions(+)

diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
index f15934afd5e..70a6f16a9d9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
@@ -22,16 +22,28 @@ package org.apache.iotdb.commons.pipe.connector.client;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.ThriftClient;
 import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
+import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferSliceReq;
+import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.rpc.TimeoutChangeableTransport;
 import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
+import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class IoTDBSyncClient extends IClientRPCService.Client
     implements ThriftClient, AutoCloseable {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBSyncClient.class);
+
   private final String ipAddress;
   private final int port;
   private final TEndPoint endPoint;
@@ -82,6 +94,59 @@ public class IoTDBSyncClient extends IClientRPCService.Client
     ((TimeoutChangeableTransport) 
(getInputProtocol().getTransport())).setTimeout(timeout);
   }
 
+  @Override
+  public TPipeTransferResp pipeTransfer(final TPipeTransferReq req) throws 
TException {
+    final int bodySizeLimit = (int) (RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY * 
0.8);
+
+    if (req.getVersion() != IoTDBConnectorRequestVersion.VERSION_1.getVersion()
+        || req.body.limit() < bodySizeLimit) {
+      return super.pipeTransfer(req);
+    }
+
+    try {
+      // Slice the buffer to avoid the buffer being too large
+      final int sliceCount =
+          req.body.limit() / bodySizeLimit + (req.body.limit() % bodySizeLimit 
== 0 ? 0 : 1);
+      for (int i = 0; i < sliceCount; ++i) {
+        final int startIndexInBody = i * bodySizeLimit;
+        final int endIndexInBody = Math.min((i + 1) * bodySizeLimit, 
req.body.limit());
+        final TPipeTransferResp sliceResp =
+            super.pipeTransfer(
+                PipeTransferSliceReq.toTPipeTransferReq(
+                    i, sliceCount, req.body.duplicate(), startIndexInBody, 
endIndexInBody));
+
+        if (i == sliceCount - 1) {
+          return sliceResp;
+        }
+
+        if (sliceResp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          LOGGER.warn(
+              "Failed to transfer slice. Origin req: {}-{}, slice index: {}, 
slice count: {}. Reason: {}. Retry the whole transfer.",
+              req.getVersion(),
+              req.getType(),
+              i,
+              sliceCount,
+              sliceResp.getStatus());
+          throw new PipeConnectionException(
+              String.format(
+                  "Failed to transfer slice. Origin req: %s-%s, slice index: 
%d, slice count: %d. Reason: %s",
+                  req.getVersion(), req.getType(), i, sliceCount, 
sliceResp.getStatus()));
+        }
+      }
+
+      // Should not reach here
+      return super.pipeTransfer(req);
+    } catch (final Exception e) {
+      LOGGER.warn(
+          "Failed to transfer slice. Origin req: {}-{}. Retry the whole 
transfer.",
+          req.getVersion(),
+          req.getType(),
+          e);
+      // Fall back to the original behavior
+      return super.pipeTransfer(req);
+    }
+  }
+
   @Override
   public void close() throws Exception {
     invalidate();
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java
index 003c8b9afb3..bccf6787f4a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeRequestType.java
@@ -53,6 +53,9 @@ public enum PipeRequestType {
 
   // RPC Compression
   TRANSFER_COMPRESSED((short) 300),
+
+  // Fallback Handling
+  FALLBACK_SLICE((short) 400),
   ;
 
   private final short type;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferSliceReq.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferSliceReq.java
new file mode 100644
index 00000000000..ee5662f6170
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferSliceReq.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.connector.payload.thrift.request;
+
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Objects;
+
+public class PipeTransferSliceReq extends TPipeTransferReq {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTransferSliceReq.class);
+
+  private transient int originBodySize;
+
+  private transient int sliceSize;
+  private transient byte[] sliceBody;
+
+  private transient int sliceIndex;
+  private transient int sliceCount;
+
+  public int getOriginBodySize() {
+    return originBodySize;
+  }
+
+  public int getSliceSize() {
+    return sliceSize;
+  }
+
+  public byte[] getSliceBody() {
+    return sliceBody;
+  }
+
+  public int getSliceIndex() {
+    return sliceIndex;
+  }
+
+  public int getSliceCount() {
+    return sliceCount;
+  }
+
+  /////////////////////////////// Thrift ///////////////////////////////
+
+  public static PipeTransferSliceReq toTPipeTransferReq(
+      final int sliceIndex,
+      final int sliceCount,
+      final ByteBuffer duplicatedOriginBody,
+      final int startIndexInBody,
+      final int endIndexInBody)
+      throws IOException {
+    final PipeTransferSliceReq sliceReq = new PipeTransferSliceReq();
+
+    sliceReq.originBodySize = duplicatedOriginBody.limit();
+
+    sliceReq.sliceSize = endIndexInBody - startIndexInBody;
+    sliceReq.sliceBody = new byte[sliceReq.sliceSize];
+    duplicatedOriginBody.position(startIndexInBody);
+    duplicatedOriginBody.get(sliceReq.sliceBody);
+
+    sliceReq.sliceIndex = sliceIndex;
+    sliceReq.sliceCount = sliceCount;
+
+    sliceReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
+    sliceReq.type = PipeRequestType.FALLBACK_SLICE.getType();
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(sliceReq.originBodySize, outputStream);
+
+      ReadWriteIOUtils.write(sliceReq.sliceSize, outputStream);
+      ReadWriteIOUtils.write(new Binary(sliceReq.sliceBody), outputStream);
+
+      ReadWriteIOUtils.write(sliceReq.sliceIndex, outputStream);
+      ReadWriteIOUtils.write(sliceReq.sliceCount, outputStream);
+
+      sliceReq.body =
+          ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    }
+
+    return sliceReq;
+  }
+
+  public static PipeTransferSliceReq fromTPipeTransferReq(final 
TPipeTransferReq transferReq) {
+    final PipeTransferSliceReq sliceReq = new PipeTransferSliceReq();
+
+    sliceReq.originBodySize = ReadWriteIOUtils.readInt(transferReq.body);
+
+    sliceReq.sliceSize = ReadWriteIOUtils.readInt(transferReq.body);
+    sliceReq.sliceBody = 
ReadWriteIOUtils.readBinary(transferReq.body).getValues();
+
+    sliceReq.sliceIndex = ReadWriteIOUtils.readInt(transferReq.body);
+    sliceReq.sliceCount = ReadWriteIOUtils.readInt(transferReq.body);
+
+    sliceReq.version = transferReq.version;
+    sliceReq.type = transferReq.type;
+    sliceReq.body = transferReq.body;
+
+    return sliceReq;
+  }
+
+  /////////////////////////////// Object ///////////////////////////////
+
+  @Override
+  public boolean equals(final Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    final PipeTransferSliceReq that = (PipeTransferSliceReq) obj;
+    return Objects.equals(originBodySize, that.originBodySize)
+        && Objects.equals(sliceSize, that.sliceSize)
+        && Arrays.equals(sliceBody, that.sliceBody)
+        && Objects.equals(sliceIndex, that.sliceIndex)
+        && Objects.equals(sliceCount, that.sliceCount)
+        && Objects.equals(version, that.version)
+        && Objects.equals(type, that.type)
+        && Objects.equals(body, that.body);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(
+        originBodySize,
+        sliceSize,
+        Arrays.hashCode(sliceBody),
+        sliceIndex,
+        sliceCount,
+        version,
+        type,
+        body);
+  }
+}

Reply via email to