This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b9f59fc9e8 [Pipe] Fix omitted legacy tablet compatibility follow-ups 
(#17929)
3b9f59fc9e8 is described below

commit 3b9f59fc9e8c7f95af18d639569b32e73fdfec03
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 16 09:37:27 2026 +0800

    [Pipe] Fix omitted legacy tablet compatibility follow-ups (#17929)
    
    * Fix 1.3.7 binaryBuffers
    
    * Update PipeDataNodeThriftRequestTest.java
    
    * Hotfix
    
    * spotless
---
 .../request/PipeTransferTabletBatchReq.java        |  98 ++++++++++++++--
 .../request/PipeTransferTabletRawReq.java          |  13 ++-
 .../pipe/sink/util/TabletStatementConverter.java   |  87 +++++++++++---
 .../pipe/sink/PipeDataNodeThriftRequestTest.java   | 128 ++++++++++++++++++++-
 4 files changed, 294 insertions(+), 32 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
index ede3370f5b0..340232c8b62 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
@@ -45,6 +45,7 @@ import java.util.Objects;
 
 public class PipeTransferTabletBatchReq extends TPipeTransferReq {
 
+  private final transient List<PipeTransferTabletBinaryReq> binaryReqs = new 
ArrayList<>();
   private final transient List<PipeTransferTabletInsertNodeReq> insertNodeReqs 
= new ArrayList<>();
   private final transient List<PipeTransferTabletRawReq> tabletReqs = new 
ArrayList<>();
 
@@ -60,6 +61,26 @@ public class PipeTransferTabletBatchReq extends 
TPipeTransferReq {
     final List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
     final List<InsertTabletStatement> insertTabletStatementList = new 
ArrayList<>();
 
+    for (final PipeTransferTabletBinaryReq binaryReq : binaryReqs) {
+      final InsertBaseStatement statement = binaryReq.constructStatement();
+      if (statement.isEmpty()) {
+        continue;
+      }
+      if (statement instanceof InsertRowStatement) {
+        insertRowStatementList.add((InsertRowStatement) statement);
+      } else if (statement instanceof InsertTabletStatement) {
+        insertTabletStatementList.add((InsertTabletStatement) statement);
+      } else if (statement instanceof InsertRowsStatement) {
+        insertRowStatementList.addAll(
+            ((InsertRowsStatement) statement).getInsertRowStatementList());
+      } else {
+        throw new UnsupportedOperationException(
+            String.format(
+                "Unknown InsertBaseStatement %s constructed from 
PipeTransferTabletBinaryReq.",
+                statement));
+      }
+    }
+
     for (final PipeTransferTabletInsertNodeReq insertNodeReq : insertNodeReqs) 
{
       final InsertBaseStatement statement = insertNodeReq.constructStatement();
       if (statement.isEmpty()) {
@@ -132,19 +153,52 @@ public class PipeTransferTabletBatchReq extends 
TPipeTransferReq {
     final PipeTransferTabletBatchReq batchReq = new 
PipeTransferTabletBatchReq();
     final TabletStringInternPool tabletStringInternPool = new 
TabletStringInternPool();
 
-    // Binary size, for rolling upgrade
-    ReadWriteIOUtils.readInt(transferReq.body);
-    int size = ReadWriteIOUtils.readInt(transferReq.body);
+    // Legacy 1.3.x batch bodies may carry WAL binary requests before insert 
nodes and tablets.
+    int size = readNonNegativeSize(transferReq.body, "binary request count");
     for (int i = 0; i < size; ++i) {
-      batchReq.insertNodeReqs.add(
-          PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq(
-              (InsertNode) PlanFragment.deserializeHelper(transferReq.body, 
null)));
+      final int length = readNonNegativeSize(transferReq.body, "binary request 
body length");
+      if (length > transferReq.body.remaining()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Invalid binary request body length %s, remaining body length 
%s.",
+                length, transferReq.body.remaining()));
+      }
+      final byte[] body = new byte[length];
+      transferReq.body.get(body);
+      batchReq.binaryReqs.add(
+          
PipeTransferTabletBinaryReq.toTPipeTransferReq(ByteBuffer.wrap(body)));
     }
 
-    size = ReadWriteIOUtils.readInt(transferReq.body);
+    size = readNonNegativeSize(transferReq.body, "insert node count");
     for (int i = 0; i < size; ++i) {
-      batchReq.tabletReqs.add(
-          PipeTransferTabletRawReq.toTPipeTransferRawReq(transferReq.body, 
tabletStringInternPool));
+      final int startPosition = transferReq.body.position();
+      try {
+        batchReq.insertNodeReqs.add(
+            PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq(
+                (InsertNode) PlanFragment.deserializeHelper(transferReq.body, 
null)));
+      } catch (final RuntimeException e) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Failed to deserialize insert node %s/%s in tablet batch at 
body position %s with remaining body length %s.",
+                i + 1, size, startPosition, transferReq.body.remaining()),
+            e);
+      }
+    }
+
+    size = readNonNegativeSize(transferReq.body, "raw tablet count");
+    for (int i = 0; i < size; ++i) {
+      final int startPosition = transferReq.body.position();
+      try {
+        batchReq.tabletReqs.add(
+            PipeTransferTabletRawReq.toTPipeTransferRawReq(
+                transferReq.body, tabletStringInternPool));
+      } catch (final RuntimeException e) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Failed to deserialize raw tablet %s/%s in tablet batch at 
body position %s with remaining body length %s.",
+                i + 1, size, startPosition, transferReq.body.remaining()),
+            e);
+      }
     }
 
     batchReq.version = transferReq.version;
@@ -153,8 +207,29 @@ public class PipeTransferTabletBatchReq extends 
TPipeTransferReq {
     return batchReq;
   }
 
+  private static int readNonNegativeSize(final ByteBuffer buffer, final String 
fieldName) {
+    if (buffer.remaining() < Integer.BYTES) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Insufficient bytes to read %s in tablet batch, remaining body 
length %s.",
+              fieldName, buffer.remaining()));
+    }
+
+    final int size = ReadWriteIOUtils.readInt(buffer);
+    if (size < 0) {
+      throw new IllegalArgumentException(
+          String.format("Invalid negative %s %s in tablet batch.", fieldName, 
size));
+    }
+    return size;
+  }
+
   /////////////////////////////// TestOnly ///////////////////////////////
 
+  @TestOnly
+  public List<PipeTransferTabletBinaryReq> getBinaryReqs() {
+    return binaryReqs;
+  }
+
   @TestOnly
   public List<PipeTransferTabletInsertNodeReq> getInsertNodeReqs() {
     return insertNodeReqs;
@@ -176,7 +251,8 @@ public class PipeTransferTabletBatchReq extends 
TPipeTransferReq {
       return false;
     }
     final PipeTransferTabletBatchReq that = (PipeTransferTabletBatchReq) obj;
-    return insertNodeReqs.equals(that.insertNodeReqs)
+    return binaryReqs.equals(that.binaryReqs)
+        && insertNodeReqs.equals(that.insertNodeReqs)
         && tabletReqs.equals(that.tabletReqs)
         && version == that.version
         && type == that.type
@@ -185,6 +261,6 @@ public class PipeTransferTabletBatchReq extends 
TPipeTransferReq {
 
   @Override
   public int hashCode() {
-    return Objects.hash(insertNodeReqs, tabletReqs, version, type, body);
+    return Objects.hash(binaryReqs, insertNodeReqs, tabletReqs, version, type, 
body);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
index 63db24a37ba..e80b10c95e2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
@@ -188,8 +188,17 @@ public class PipeTransferTabletRawReq extends 
TPipeTransferReq {
       buffer.position(startPosition);
     }
 
-    tablet = PipeTabletUtils.internTablet(Tablet.deserialize(buffer), 
tabletStringInternPool);
-    isAligned = ReadWriteIOUtils.readBool(buffer);
+    try {
+      tablet = PipeTabletUtils.internTablet(Tablet.deserialize(buffer), 
tabletStringInternPool);
+      isAligned = ReadWriteIOUtils.readBool(buffer);
+    } catch (final RuntimeException e) {
+      buffer.position(startPosition);
+      throw new IllegalArgumentException(
+          String.format(
+              "Failed to deserialize raw tablet request at body position %s 
with remaining body length %s.",
+              startPosition, buffer.remaining()),
+          e);
+    }
   }
 
   private static void ensureStatementDeserializedFromCurrentTabletFormat(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
index 405859dd982..5afe02658d9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
@@ -32,7 +32,6 @@ import org.apache.tsfile.enums.ColumnCategory;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.BitMap;
-import org.apache.tsfile.utils.BytesUtils;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.apache.tsfile.write.UnSupportedDataTypeException;
@@ -116,12 +115,19 @@ public class TabletStatementConverter {
         intern(ReadWriteIOUtils.readString(byteBuffer), 
tabletStringInternPool);
 
     final int rowSize = ReadWriteIOUtils.readInt(byteBuffer);
+    if (rowSize < 0) {
+      throw new IllegalArgumentException(
+          String.format("Invalid row size %s in tablet format 
deserialization.", rowSize));
+    }
 
     // deserialize schemas
     final int schemaSize =
-        BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer))
-            ? ReadWriteIOUtils.readInt(byteBuffer)
-            : 0;
+        readBooleanByte(byteBuffer, "schema existence") ? 
ReadWriteIOUtils.readInt(byteBuffer) : 0;
+    if (schemaSize < 0) {
+      throw new IllegalArgumentException(
+          String.format("Invalid schema size %s in tablet format 
deserialization.", schemaSize));
+    }
+    ensureRemaining(byteBuffer, schemaSize, "measurement schema existence 
flags");
     final String[] measurement = new String[schemaSize];
     final TsTableColumnCategory[] columnCategories = new 
TsTableColumnCategory[schemaSize];
     final TSDataType[] dataTypes = new TSDataType[schemaSize];
@@ -148,15 +154,26 @@ public class TabletStatementConverter {
 
     // Deserialize and calculate memory in the same loop
     for (int i = 0; i < schemaSize; i++) {
-      final boolean hasSchema = 
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+      final boolean hasSchema = readBooleanByte(byteBuffer, "measurement 
schema existence");
       if (hasSchema) {
         final Pair<String, TSDataType> pair = readMeasurement(byteBuffer, 
tabletStringInternPool);
         measurement[i] = pair.getLeft();
         dataTypes[i] = pair.getRight();
         if (readColumnCategory) {
+          if (!byteBuffer.hasRemaining()) {
+            throw new IllegalArgumentException(
+                "Missing column category in current tablet format 
deserialization.");
+          }
+          final byte columnCategory = byteBuffer.get();
+          if (columnCategory < 0 || columnCategory >= 
ColumnCategory.values().length) {
+            throw new IllegalArgumentException(
+                String.format(
+                    "Invalid column category %s in current tablet format 
deserialization.",
+                    columnCategory));
+          }
           columnCategories[i] =
               TsTableColumnCategory.fromTsFileColumnCategory(
-                  ColumnCategory.values()[byteBuffer.get()]);
+                  ColumnCategory.values()[columnCategory]);
         }
 
         // Calculate memory for each measurement string
@@ -178,6 +195,15 @@ public class TabletStatementConverter {
     memorySize += measurementMemorySize;
     memorySize += dataTypesMemorySize;
 
+    final boolean isTimesNotNull = readBooleanByte(byteBuffer, "timestamp 
column existence");
+    if (rowSize > 0 && !isTimesNotNull) {
+      throw new IllegalArgumentException(
+          "Missing timestamps in tablet format deserialization with non-empty 
rows.");
+    }
+    if (isTimesNotNull) {
+      ensureRemaining(byteBuffer, (long) Long.BYTES * rowSize, "timestamps");
+    }
+
     // deserialize times and calculate memory during deserialization
     final long[] times = new long[rowSize];
     // Calculate memory: array header + long size * rowSize
@@ -185,7 +211,6 @@ public class TabletStatementConverter {
         org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
             NUM_BYTES_ARRAY_HEADER + (long) Long.BYTES * rowSize);
 
-    final boolean isTimesNotNull = 
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
     if (isTimesNotNull) {
       for (int i = 0; i < rowSize; i++) {
         times[i] = ReadWriteIOUtils.readLong(byteBuffer);
@@ -199,7 +224,7 @@ public class TabletStatementConverter {
     final BitMap[] bitMaps;
     final long bitMapsMemorySize;
 
-    final boolean isBitMapsNotNull = 
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+    final boolean isBitMapsNotNull = readBooleanByte(byteBuffer, "bitmap 
column existence");
     if (isBitMapsNotNull) {
       // Use the method that returns both BitMap array and memory size
       final Pair<BitMap[], Long> bitMapsAndMemory =
@@ -218,7 +243,11 @@ public class TabletStatementConverter {
     final Object[] values;
     final long valuesMemorySize;
 
-    final boolean isValuesNotNull = 
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+    final boolean isValuesNotNull = readBooleanByte(byteBuffer, "value column 
existence");
+    if (rowSize > 0 && schemaSize > 0 && !isValuesNotNull) {
+      throw new IllegalArgumentException(
+          "Missing values in tablet format deserialization with non-empty 
rows.");
+    }
     if (isValuesNotNull) {
       // Use the method that returns both values array and memory size
       final Pair<Object[], Long> valuesAndMemory =
@@ -236,7 +265,7 @@ public class TabletStatementConverter {
     // Add values memory to total
     memorySize += valuesMemorySize;
 
-    final boolean isAligned = ReadWriteIOUtils.readBoolean(byteBuffer);
+    final boolean isAligned = readBooleanByte(byteBuffer, "alignment");
 
     statement.setMeasurements(measurement);
     statement.setTimes(times);
@@ -321,6 +350,30 @@ public class TabletStatementConverter {
     }
   }
 
+  private static boolean readBooleanByte(final ByteBuffer buffer, final String 
fieldName) {
+    if (!buffer.hasRemaining()) {
+      throw new IllegalArgumentException(
+          String.format("Missing %s flag in tablet format deserialization.", 
fieldName));
+    }
+
+    final byte value = ReadWriteIOUtils.readByte(buffer);
+    if (value != 0 && value != 1) {
+      throw new IllegalArgumentException(
+          String.format("Invalid %s flag %s in tablet format 
deserialization.", fieldName, value));
+    }
+    return value == 1;
+  }
+
+  private static void ensureRemaining(
+      final ByteBuffer buffer, final long expectedSize, final String 
fieldName) {
+    if (expectedSize > buffer.remaining()) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Insufficient bytes for %s in tablet format deserialization, 
expected %s, remaining %s.",
+              fieldName, expectedSize, buffer.remaining()));
+    }
+  }
+
   /**
    * Read measurement name and data type from buffer, skipping other 
measurement schema fields
    * (encoding, compression, and tags/attributes) that are not needed for 
InsertTabletStatement.
@@ -364,9 +417,13 @@ public class TabletStatementConverter {
     boolean hasMarkedBitMap = false;
 
     for (int i = 0; i < columns; i++) {
-      final boolean hasBitMap = 
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+      final boolean hasBitMap = readBooleanByte(byteBuffer, "bitmap 
existence");
       if (hasBitMap) {
         final int size = ReadWriteIOUtils.readInt(byteBuffer);
+        if (size < 0) {
+          throw new IllegalArgumentException(
+              String.format("Invalid bitmap size %s in tablet format 
deserialization.", size));
+        }
         final Binary valueBinary = ReadWriteIOUtils.readBinary(byteBuffer);
         final byte[] byteArray = valueBinary.getValues();
         final BitMap bitMap = new BitMap(size, byteArray);
@@ -416,8 +473,7 @@ public class TabletStatementConverter {
             NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * columns);
 
     for (int i = 0; i < columns; i++) {
-      final boolean isValueColumnsNotNull =
-          BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+      final boolean isValueColumnsNotNull = readBooleanByte(byteBuffer, "value 
column existence");
       if (types[i] == null) {
         continue;
       }
@@ -427,7 +483,7 @@ public class TabletStatementConverter {
           final boolean[] boolValues = new boolean[rowSize];
           if (isValueColumnsNotNull) {
             for (int index = 0; index < rowSize; index++) {
-              boolValues[index] = 
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+              boolValues[index] = readBooleanByte(byteBuffer, "boolean value");
             }
           }
           values[i] = boolValues;
@@ -503,8 +559,7 @@ public class TabletStatementConverter {
 
           if (isValueColumnsNotNull) {
             for (int index = 0; index < rowSize; index++) {
-              final boolean isNotNull =
-                  BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+              final boolean isNotNull = readBooleanByte(byteBuffer, "binary 
value existence");
               if (isNotNull) {
                 binaryValues[index] = ReadWriteIOUtils.readBinary(byteBuffer);
                 // Calculate memory for each Binary object during 
deserialization
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
index 88f7353f519..7641070800f 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
@@ -702,6 +702,43 @@ public class PipeDataNodeThriftRequestTest {
     
assertLegacyTabletStatement(deserializedReq.getTabletReqs().get(0).constructStatement());
   }
 
+  @Test
+  public void testPipeTransferTabletBatchReqFromLegacyV13BodyWithBinaryReqs() 
throws IOException {
+    final InsertRowNode node =
+        new InsertRowNode(
+            new PlanNodeId(""),
+            new PartialPath(new String[] {"root", "sg", "d"}),
+            false,
+            new String[] {"s"},
+            new TSDataType[] {TSDataType.INT32},
+            1,
+            new Object[] {1},
+            false);
+    final ByteBuffer binaryBuffer = ByteBuffer.wrap(new byte[] {'a', 'b'});
+
+    final TPipeTransferReq req =
+        legacyTransferReq(
+            PipeRequestType.TRANSFER_TABLET_BATCH,
+            serializeLegacyTabletBatchBody(
+                Collections.singletonList(binaryBuffer),
+                Collections.singletonList(node.serializeToByteBuffer()),
+                
Collections.singletonList(serializeLegacyTabletRawBuffer(false))));
+
+    final PipeTransferTabletBatchReq deserializedReq =
+        PipeTransferTabletBatchReq.fromTPipeTransferReq(req);
+
+    Assert.assertEquals(req.getVersion(), deserializedReq.getVersion());
+    Assert.assertEquals(req.getType(), deserializedReq.getType());
+    Assert.assertEquals(1, deserializedReq.getBinaryReqs().size());
+    Assert.assertArrayEquals(
+        new byte[] {'a', 'b'},
+        
byteBufferToByteArray(deserializedReq.getBinaryReqs().get(0).getByteBuffer()));
+    Assert.assertEquals(1, deserializedReq.getInsertNodeReqs().size());
+    Assert.assertEquals(1, deserializedReq.getTabletReqs().size());
+    Assert.assertEquals(node, 
deserializedReq.getInsertNodeReqs().get(0).getInsertNode());
+    
assertLegacyTabletStatement(deserializedReq.getTabletReqs().get(0).constructStatement());
+  }
+
   @Test
   public void testPipeTransferTabletRawReqWithLegacyTabletFormat() throws 
IOException {
     final TPipeTransferReq req = new TPipeTransferReq();
@@ -718,6 +755,51 @@ public class PipeDataNodeThriftRequestTest {
     assertLegacyTabletStatement(deserializedReq.constructStatement());
   }
 
+  @Test
+  public void testPipeTransferTabletRawReqWithSingleColumnLegacyTabletFormat() 
throws IOException {
+    final TPipeTransferReq req = new TPipeTransferReq();
+    req.version = IoTDBSinkRequestVersion.VERSION_1.getVersion();
+    req.type = PipeRequestType.TRANSFER_TABLET_RAW.getType();
+    req.body = serializeSingleColumnLegacyTabletRawBuffer(false);
+
+    final PipeTransferTabletRawReq deserializedReq =
+        PipeTransferTabletRawReq.fromTPipeTransferReq(req);
+
+    Assert.assertFalse(deserializedReq.getIsAligned());
+    final InsertTabletStatement statement = 
deserializedReq.constructStatement();
+    Assert.assertEquals("root.sg.d", statement.getDevicePath().getFullPath());
+    Assert.assertArrayEquals(new String[] {"s1"}, statement.getMeasurements());
+    Assert.assertArrayEquals(new TSDataType[] {TSDataType.INT32}, 
statement.getDataTypes());
+    Assert.assertEquals(2, statement.getRowCount());
+    Assert.assertArrayEquals(new long[] {1700000000000L, 1700000000001L}, 
statement.getTimes());
+    Assert.assertArrayEquals(new int[] {2, 1}, (int[]) 
statement.getColumns()[0]);
+  }
+
+  @Test
+  public void testPipeTransferTabletBatchReqRejectsTruncatedRawTablet() throws 
IOException {
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(0, outputStream);
+      ReadWriteIOUtils.write(0, outputStream);
+      ReadWriteIOUtils.write(1, outputStream);
+      outputStream.write(new byte[] {1, 0, 0, 0, 0, 0});
+
+      final TPipeTransferReq req =
+          legacyTransferReq(
+              PipeRequestType.TRANSFER_TABLET_BATCH,
+              ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size()));
+
+      try {
+        PipeTransferTabletBatchReq.fromTPipeTransferReq(req);
+        Assert.fail("Expected IllegalArgumentException");
+      } catch (final IllegalArgumentException e) {
+        Assert.assertTrue(e.getMessage().contains("Failed to deserialize raw 
tablet"));
+        Assert.assertTrue(
+            e.getCause().getMessage().contains("Failed to deserialize raw 
tablet request"));
+      }
+    }
+  }
+
   @Test
   public void testPipeTransferTabletBatchReqV2() throws IOException {
     final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
@@ -1248,8 +1330,8 @@ public class PipeDataNodeThriftRequestTest {
       writeLegacyMeasurementSchema(outputStream, "s2", TSDataType.TEXT);
 
       ReadWriteIOUtils.write((byte) 1, outputStream);
-      ReadWriteIOUtils.write(2L, outputStream);
-      ReadWriteIOUtils.write(1L, outputStream);
+      ReadWriteIOUtils.write(1700000000000L, outputStream);
+      ReadWriteIOUtils.write(1700000000001L, outputStream);
 
       ReadWriteIOUtils.write((byte) 0, outputStream);
 
@@ -1268,12 +1350,52 @@ public class PipeDataNodeThriftRequestTest {
     }
   }
 
+  private static ByteBuffer serializeSingleColumnLegacyTabletRawBuffer(final 
boolean isAligned)
+      throws IOException {
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write("root.sg.d", outputStream);
+      ReadWriteIOUtils.write(2, outputStream);
+
+      ReadWriteIOUtils.write((byte) 1, outputStream);
+      ReadWriteIOUtils.write(1, outputStream);
+      writeLegacyMeasurementSchema(outputStream, "s1", TSDataType.INT32);
+
+      ReadWriteIOUtils.write((byte) 1, outputStream);
+      ReadWriteIOUtils.write(1700000000000L, outputStream);
+      ReadWriteIOUtils.write(1700000000001L, outputStream);
+
+      ReadWriteIOUtils.write((byte) 0, outputStream);
+
+      ReadWriteIOUtils.write((byte) 1, outputStream);
+      ReadWriteIOUtils.write((byte) 1, outputStream);
+      ReadWriteIOUtils.write(2, outputStream);
+      ReadWriteIOUtils.write(1, outputStream);
+
+      ReadWriteIOUtils.write(isAligned, outputStream);
+      return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    }
+  }
+
   private static ByteBuffer serializeLegacyTabletBatchBody(
       final List<ByteBuffer> insertNodeBuffers, final List<ByteBuffer> 
tabletBuffers)
       throws IOException {
+    return serializeLegacyTabletBatchBody(
+        Collections.emptyList(), insertNodeBuffers, tabletBuffers);
+  }
+
+  private static ByteBuffer serializeLegacyTabletBatchBody(
+      final List<ByteBuffer> binaryBuffers,
+      final List<ByteBuffer> insertNodeBuffers,
+      final List<ByteBuffer> tabletBuffers)
+      throws IOException {
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
-      ReadWriteIOUtils.write(0, outputStream);
+      ReadWriteIOUtils.write(binaryBuffers.size(), outputStream);
+      for (final ByteBuffer binaryBuffer : binaryBuffers) {
+        ReadWriteIOUtils.write(binaryBuffer.limit(), outputStream);
+        writeByteBuffer(outputStream, binaryBuffer);
+      }
 
       ReadWriteIOUtils.write(insertNodeBuffers.size(), outputStream);
       for (final ByteBuffer insertNodeBuffer : insertNodeBuffers) {

Reply via email to