This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch fix-legacy-compat-followup
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7598fc41b366ff1256c6dcbc54309e108f9194ad
Author: Caideyipi <[email protected]>
AuthorDate: Fri Jun 12 10:49:04 2026 +0800

    Fix 1.3.7 binaryBuffers
---
 .../request/PipeTransferTabletBatchReq.java        | 49 +++++++++++++++++++--
 .../pipe/sink/PipeDataNodeThriftRequestTest.java   | 51 +++++++++++++++++++++-
 2 files changed, 95 insertions(+), 5 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
index ede3370f5b0..797ea509602 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
@@ -45,6 +45,7 @@ import java.util.Objects;
 
 public class PipeTransferTabletBatchReq extends TPipeTransferReq {
 
+  private final transient List<PipeTransferTabletBinaryReq> binaryReqs = new 
ArrayList<>();
   private final transient List<PipeTransferTabletInsertNodeReq> insertNodeReqs 
= new ArrayList<>();
   private final transient List<PipeTransferTabletRawReq> tabletReqs = new 
ArrayList<>();
 
@@ -60,6 +61,26 @@ public class PipeTransferTabletBatchReq extends 
TPipeTransferReq {
     final List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
     final List<InsertTabletStatement> insertTabletStatementList = new 
ArrayList<>();
 
+    for (final PipeTransferTabletBinaryReq binaryReq : binaryReqs) {
+      final InsertBaseStatement statement = binaryReq.constructStatement();
+      if (statement.isEmpty()) {
+        continue;
+      }
+      if (statement instanceof InsertRowStatement) {
+        insertRowStatementList.add((InsertRowStatement) statement);
+      } else if (statement instanceof InsertTabletStatement) {
+        insertTabletStatementList.add((InsertTabletStatement) statement);
+      } else if (statement instanceof InsertRowsStatement) {
+        insertRowStatementList.addAll(
+            ((InsertRowsStatement) statement).getInsertRowStatementList());
+      } else {
+        throw new UnsupportedOperationException(
+            String.format(
+                "Unknown InsertBaseStatement %s constructed from 
PipeTransferTabletBinaryReq.",
+                statement));
+      }
+    }
+
     for (final PipeTransferTabletInsertNodeReq insertNodeReq : insertNodeReqs) 
{
       final InsertBaseStatement statement = insertNodeReq.constructStatement();
       if (statement.isEmpty()) {
@@ -132,9 +153,23 @@ public class PipeTransferTabletBatchReq extends 
TPipeTransferReq {
     final PipeTransferTabletBatchReq batchReq = new 
PipeTransferTabletBatchReq();
     final TabletStringInternPool tabletStringInternPool = new 
TabletStringInternPool();
 
-    // Binary size, for rolling upgrade
-    ReadWriteIOUtils.readInt(transferReq.body);
+    // Legacy 1.3.x batch bodies may carry WAL binary requests before insert 
nodes and tablets.
     int size = ReadWriteIOUtils.readInt(transferReq.body);
+    for (int i = 0; i < size; ++i) {
+      final int length = ReadWriteIOUtils.readInt(transferReq.body);
+      if (length < 0 || length > transferReq.body.remaining()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Invalid binary request body length %s, remaining body length 
%s.",
+                length, transferReq.body.remaining()));
+      }
+      final byte[] body = new byte[length];
+      transferReq.body.get(body);
+      batchReq.binaryReqs.add(
+          
PipeTransferTabletBinaryReq.toTPipeTransferReq(ByteBuffer.wrap(body)));
+    }
+
+    size = ReadWriteIOUtils.readInt(transferReq.body);
     for (int i = 0; i < size; ++i) {
       batchReq.insertNodeReqs.add(
           PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq(
@@ -155,6 +190,11 @@ public class PipeTransferTabletBatchReq extends 
TPipeTransferReq {
 
   /////////////////////////////// TestOnly ///////////////////////////////
 
+  @TestOnly
+  public List<PipeTransferTabletBinaryReq> getBinaryReqs() {
+    return binaryReqs;
+  }
+
   @TestOnly
   public List<PipeTransferTabletInsertNodeReq> getInsertNodeReqs() {
     return insertNodeReqs;
@@ -176,7 +216,8 @@ public class PipeTransferTabletBatchReq extends 
TPipeTransferReq {
       return false;
     }
     final PipeTransferTabletBatchReq that = (PipeTransferTabletBatchReq) obj;
-    return insertNodeReqs.equals(that.insertNodeReqs)
+    return binaryReqs.equals(that.binaryReqs)
+        && insertNodeReqs.equals(that.insertNodeReqs)
         && tabletReqs.equals(that.tabletReqs)
         && version == that.version
         && type == that.type
@@ -185,6 +226,6 @@ public class PipeTransferTabletBatchReq extends 
TPipeTransferReq {
 
   @Override
   public int hashCode() {
-    return Objects.hash(insertNodeReqs, tabletReqs, version, type, body);
+    return Objects.hash(binaryReqs, insertNodeReqs, tabletReqs, version, type, 
body);
   }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
index 88f7353f519..5e7c6f279bf 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
@@ -702,6 +702,43 @@ public class PipeDataNodeThriftRequestTest {
     
assertLegacyTabletStatement(deserializedReq.getTabletReqs().get(0).constructStatement());
   }
 
+  @Test
+  public void testPipeTransferTabletBatchReqFromLegacyV13BodyWithBinaryReqs() 
throws IOException {
+    final InsertRowNode node =
+        new InsertRowNode(
+            new PlanNodeId(""),
+            new PartialPath(new String[] {"root", "sg", "d"}),
+            false,
+            new String[] {"s"},
+            new TSDataType[] {TSDataType.INT32},
+            1,
+            new Object[] {1},
+            false);
+    final ByteBuffer binaryBuffer = ByteBuffer.wrap(new byte[] {'a', 'b'});
+
+    final TPipeTransferReq req =
+        legacyTransferReq(
+            PipeRequestType.TRANSFER_TABLET_BATCH,
+            serializeLegacyTabletBatchBody(
+                Collections.singletonList(binaryBuffer),
+                Collections.singletonList(node.serializeToByteBuffer()),
+                
Collections.singletonList(serializeLegacyTabletRawBuffer(false))));
+
+    final PipeTransferTabletBatchReq deserializedReq =
+        PipeTransferTabletBatchReq.fromTPipeTransferReq(req);
+
+    Assert.assertEquals(req.getVersion(), deserializedReq.getVersion());
+    Assert.assertEquals(req.getType(), deserializedReq.getType());
+    Assert.assertEquals(1, deserializedReq.getBinaryReqs().size());
+    Assert.assertArrayEquals(
+        new byte[] {'a', 'b'},
+        
byteBufferToByteArray(deserializedReq.getBinaryReqs().get(0).getByteBuffer()));
+    Assert.assertEquals(1, deserializedReq.getInsertNodeReqs().size());
+    Assert.assertEquals(1, deserializedReq.getTabletReqs().size());
+    Assert.assertEquals(node, 
deserializedReq.getInsertNodeReqs().get(0).getInsertNode());
+    
assertLegacyTabletStatement(deserializedReq.getTabletReqs().get(0).constructStatement());
+  }
+
   @Test
   public void testPipeTransferTabletRawReqWithLegacyTabletFormat() throws 
IOException {
     final TPipeTransferReq req = new TPipeTransferReq();
@@ -1271,9 +1308,21 @@ public class PipeDataNodeThriftRequestTest {
   private static ByteBuffer serializeLegacyTabletBatchBody(
       final List<ByteBuffer> insertNodeBuffers, final List<ByteBuffer> 
tabletBuffers)
       throws IOException {
+    return serializeLegacyTabletBatchBody(Collections.emptyList(), 
insertNodeBuffers, tabletBuffers);
+  }
+
+  private static ByteBuffer serializeLegacyTabletBatchBody(
+      final List<ByteBuffer> binaryBuffers,
+      final List<ByteBuffer> insertNodeBuffers,
+      final List<ByteBuffer> tabletBuffers)
+      throws IOException {
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
-      ReadWriteIOUtils.write(0, outputStream);
+      ReadWriteIOUtils.write(binaryBuffers.size(), outputStream);
+      for (final ByteBuffer binaryBuffer : binaryBuffers) {
+        ReadWriteIOUtils.write(binaryBuffer.limit(), outputStream);
+        writeByteBuffer(outputStream, binaryBuffer);
+      }
 
       ReadWriteIOUtils.write(insertNodeBuffers.size(), outputStream);
       for (final ByteBuffer insertNodeBuffer : insertNodeBuffers) {

Reply via email to