This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch make-sure-no-frame-size-problem
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to
refs/heads/make-sure-no-frame-size-problem by this push:
new ec5381be5e8 4th
ec5381be5e8 is described below
commit ec5381be5e8b6f82ac18674e3f37b50c0cd2c2e7
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sat Aug 24 04:39:36 2024 +0800
4th
---
.../pipe/connector/client/IoTDBSyncClient.java | 6 ++++++
.../thrift/common/PipeTransferSliceReqHandler.java | 21 ++++++++++++++++++---
.../thrift/request/PipeTransferSliceReq.java | 17 ++++++++++++++++-
3 files changed, 40 insertions(+), 4 deletions(-)
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
index 45743e37cc4..70b2931d38a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
@@ -39,11 +39,15 @@ import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
public class IoTDBSyncClient extends IClientRPCService.Client
implements ThriftClient, AutoCloseable {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBSyncClient.class);
+ private static final AtomicInteger SLICE_ORDER_ID_GENERATOR = new
AtomicInteger(0);
+
private final String ipAddress;
private final int port;
private final TEndPoint endPoint;
@@ -104,6 +108,7 @@ public class IoTDBSyncClient extends
IClientRPCService.Client
}
try {
+ final int sliceOrderId = SLICE_ORDER_ID_GENERATOR.getAndIncrement();
// Slice the buffer to avoid the buffer being too large
final int sliceCount =
req.body.limit() / bodySizeLimit + (req.body.limit() % bodySizeLimit
== 0 ? 0 : 1);
@@ -113,6 +118,7 @@ public class IoTDBSyncClient extends
IClientRPCService.Client
final TPipeTransferResp sliceResp =
super.pipeTransfer(
PipeTransferSliceReq.toTPipeTransferReq(
+ sliceOrderId,
req.getType(),
i,
sliceCount,
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferSliceReqHandler.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferSliceReqHandler.java
index 18c5be1ee53..2fa94423f13 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferSliceReqHandler.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferSliceReqHandler.java
@@ -35,6 +35,8 @@ public class PipeTransferSliceReqHandler {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTransferSliceReqHandler.class);
+ private int orderId = -1;
+
private short originReqType = -1;
private int originBodySize = -1;
@@ -42,17 +44,24 @@ public class PipeTransferSliceReqHandler {
private final List<byte[]> sliceBodies = new ArrayList<>();
public boolean receiveSlice(final PipeTransferSliceReq req) {
- if (originReqType == -1 || originBodySize == -1 || sliceCount == -1 ||
sliceBodies.isEmpty()) {
- if (originReqType == -1
+ if (orderId == -1
+ || originReqType == -1
+ || originBodySize == -1
+ || sliceCount == -1
+ || sliceBodies.isEmpty()) {
+ if (orderId == -1
+ && originReqType == -1
&& originBodySize == -1
&& sliceCount == -1
&& sliceBodies.isEmpty()) {
+ orderId = req.getOrderId();
originReqType = req.getOriginReqType();
originBodySize = req.getOriginBodySize();
sliceCount = req.getSliceCount();
} else {
LOGGER.warn(
- "Invalid state: originReqType={}, originBodySize={},
sliceCount={}, sliceBodies.size={}",
+ "Invalid state: orderId={}, originReqType={}, originBodySize={},
sliceCount={}, sliceBodies.size={}",
+ orderId,
originReqType,
originBodySize,
sliceCount,
@@ -62,6 +71,11 @@ public class PipeTransferSliceReqHandler {
}
}
+ if (orderId != req.getOrderId()) {
+ LOGGER.warn("Order ID mismatch: expected {}, actual {}", orderId,
req.getOrderId());
+ clear();
+ return false;
+ }
if (originReqType != req.getOriginReqType()) {
LOGGER.warn(
"Origin request type mismatch: expected {}, actual {}",
@@ -115,6 +129,7 @@ public class PipeTransferSliceReqHandler {
}
public void clear() {
+ orderId = -1;
originReqType = -1;
originBodySize = -1;
sliceCount = -1;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferSliceReq.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferSliceReq.java
index 78928578493..4df6008400d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferSliceReq.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/request/PipeTransferSliceReq.java
@@ -33,6 +33,8 @@ import java.util.Objects;
public class PipeTransferSliceReq extends TPipeTransferReq {
+ private transient int orderId;
+
private transient short originReqType;
private transient int originBodySize;
@@ -41,6 +43,10 @@ public class PipeTransferSliceReq extends TPipeTransferReq {
private transient int sliceIndex;
private transient int sliceCount;
+ public int getOrderId() {
+ return orderId;
+ }
+
public short getOriginReqType() {
return originReqType;
}
@@ -64,6 +70,7 @@ public class PipeTransferSliceReq extends TPipeTransferReq {
/////////////////////////////// Thrift ///////////////////////////////
public static PipeTransferSliceReq toTPipeTransferReq(
+ final int orderId,
final short originReqType,
final int sliceIndex,
final int sliceCount,
@@ -73,6 +80,8 @@ public class PipeTransferSliceReq extends TPipeTransferReq {
throws IOException {
final PipeTransferSliceReq sliceReq = new PipeTransferSliceReq();
+ sliceReq.orderId = orderId;
+
sliceReq.originReqType = originReqType;
sliceReq.originBodySize = duplicatedOriginBody.limit();
@@ -87,6 +96,8 @@ public class PipeTransferSliceReq extends TPipeTransferReq {
sliceReq.type = PipeRequestType.TRANSFER_SLICE.getType();
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(sliceReq.orderId, outputStream);
+
ReadWriteIOUtils.write(sliceReq.originReqType, outputStream);
ReadWriteIOUtils.write(sliceReq.originBodySize, outputStream);
@@ -105,6 +116,8 @@ public class PipeTransferSliceReq extends TPipeTransferReq {
public static PipeTransferSliceReq fromTPipeTransferReq(final
TPipeTransferReq transferReq) {
final PipeTransferSliceReq sliceReq = new PipeTransferSliceReq();
+ sliceReq.orderId = ReadWriteIOUtils.readInt(transferReq.body);
+
sliceReq.originReqType = ReadWriteIOUtils.readShort(transferReq.body);
sliceReq.originBodySize = ReadWriteIOUtils.readInt(transferReq.body);
@@ -131,7 +144,8 @@ public class PipeTransferSliceReq extends TPipeTransferReq {
return false;
}
final PipeTransferSliceReq that = (PipeTransferSliceReq) obj;
- return Objects.equals(originReqType, that.originReqType)
+ return Objects.equals(orderId, that.orderId)
+ && Objects.equals(originReqType, that.originReqType)
&& Objects.equals(originBodySize, that.originBodySize)
&& Arrays.equals(sliceBody, that.sliceBody)
&& Objects.equals(sliceIndex, that.sliceIndex)
@@ -144,6 +158,7 @@ public class PipeTransferSliceReq extends TPipeTransferReq {
@Override
public int hashCode() {
return Objects.hash(
+ orderId,
originReqType,
originBodySize,
Arrays.hashCode(sliceBody),