This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch fix-legacy-compat-followup
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit cec0069e9cdb46f789107776af68e896d00dc9a5
Author: Caideyipi <[email protected]>
AuthorDate: Fri Jun 12 14:19:52 2026 +0800

    Hotfix
---
 .../request/PipeTransferTabletBatchReq.java        | 55 +++++++++++---
 .../request/PipeTransferTabletRawReq.java          | 13 +++-
 .../pipe/sink/util/TabletStatementConverter.java   | 87 ++++++++++++++++++----
 .../pipe/sink/PipeDataNodeThriftRequestTest.java   | 77 ++++++++++++++++++-
 4 files changed, 203 insertions(+), 29 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
index 797ea509602..340232c8b62 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
@@ -154,10 +154,10 @@ public class PipeTransferTabletBatchReq extends 
TPipeTransferReq {
     final TabletStringInternPool tabletStringInternPool = new 
TabletStringInternPool();
 
     // Legacy 1.3.x batch bodies may carry WAL binary requests before insert 
nodes and tablets.
-    int size = ReadWriteIOUtils.readInt(transferReq.body);
+    int size = readNonNegativeSize(transferReq.body, "binary request count");
     for (int i = 0; i < size; ++i) {
-      final int length = ReadWriteIOUtils.readInt(transferReq.body);
-      if (length < 0 || length > transferReq.body.remaining()) {
+      final int length = readNonNegativeSize(transferReq.body, "binary request 
body length");
+      if (length > transferReq.body.remaining()) {
         throw new IllegalArgumentException(
             String.format(
                 "Invalid binary request body length %s, remaining body length 
%s.",
@@ -169,17 +169,36 @@ public class PipeTransferTabletBatchReq extends 
TPipeTransferReq {
           
PipeTransferTabletBinaryReq.toTPipeTransferReq(ByteBuffer.wrap(body)));
     }
 
-    size = ReadWriteIOUtils.readInt(transferReq.body);
+    size = readNonNegativeSize(transferReq.body, "insert node count");
     for (int i = 0; i < size; ++i) {
-      batchReq.insertNodeReqs.add(
-          PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq(
-              (InsertNode) PlanFragment.deserializeHelper(transferReq.body, 
null)));
+      final int startPosition = transferReq.body.position();
+      try {
+        batchReq.insertNodeReqs.add(
+            PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq(
+                (InsertNode) PlanFragment.deserializeHelper(transferReq.body, 
null)));
+      } catch (final RuntimeException e) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Failed to deserialize insert node %s/%s in tablet batch at 
body position %s with remaining body length %s.",
+                i + 1, size, startPosition, transferReq.body.remaining()),
+            e);
+      }
     }
 
-    size = ReadWriteIOUtils.readInt(transferReq.body);
+    size = readNonNegativeSize(transferReq.body, "raw tablet count");
     for (int i = 0; i < size; ++i) {
-      batchReq.tabletReqs.add(
-          PipeTransferTabletRawReq.toTPipeTransferRawReq(transferReq.body, 
tabletStringInternPool));
+      final int startPosition = transferReq.body.position();
+      try {
+        batchReq.tabletReqs.add(
+            PipeTransferTabletRawReq.toTPipeTransferRawReq(
+                transferReq.body, tabletStringInternPool));
+      } catch (final RuntimeException e) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Failed to deserialize raw tablet %s/%s in tablet batch at 
body position %s with remaining body length %s.",
+                i + 1, size, startPosition, transferReq.body.remaining()),
+            e);
+      }
     }
 
     batchReq.version = transferReq.version;
@@ -188,6 +207,22 @@ public class PipeTransferTabletBatchReq extends 
TPipeTransferReq {
     return batchReq;
   }
 
+  private static int readNonNegativeSize(final ByteBuffer buffer, final String 
fieldName) {
+    if (buffer.remaining() < Integer.BYTES) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Insufficient bytes to read %s in tablet batch, remaining body 
length %s.",
+              fieldName, buffer.remaining()));
+    }
+
+    final int size = ReadWriteIOUtils.readInt(buffer);
+    if (size < 0) {
+      throw new IllegalArgumentException(
+          String.format("Invalid negative %s %s in tablet batch.", fieldName, 
size));
+    }
+    return size;
+  }
+
   /////////////////////////////// TestOnly ///////////////////////////////
 
   @TestOnly
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
index 63db24a37ba..e80b10c95e2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
@@ -188,8 +188,17 @@ public class PipeTransferTabletRawReq extends 
TPipeTransferReq {
       buffer.position(startPosition);
     }
 
-    tablet = PipeTabletUtils.internTablet(Tablet.deserialize(buffer), 
tabletStringInternPool);
-    isAligned = ReadWriteIOUtils.readBool(buffer);
+    try {
+      tablet = PipeTabletUtils.internTablet(Tablet.deserialize(buffer), 
tabletStringInternPool);
+      isAligned = ReadWriteIOUtils.readBool(buffer);
+    } catch (final RuntimeException e) {
+      buffer.position(startPosition);
+      throw new IllegalArgumentException(
+          String.format(
+              "Failed to deserialize raw tablet request at body position %s 
with remaining body length %s.",
+              startPosition, buffer.remaining()),
+          e);
+    }
   }
 
   private static void ensureStatementDeserializedFromCurrentTabletFormat(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
index 405859dd982..27032f4cfe3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
@@ -32,7 +32,6 @@ import org.apache.tsfile.enums.ColumnCategory;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.BitMap;
-import org.apache.tsfile.utils.BytesUtils;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.apache.tsfile.write.UnSupportedDataTypeException;
@@ -116,12 +115,19 @@ public class TabletStatementConverter {
         intern(ReadWriteIOUtils.readString(byteBuffer), 
tabletStringInternPool);
 
     final int rowSize = ReadWriteIOUtils.readInt(byteBuffer);
+    if (rowSize < 0) {
+      throw new IllegalArgumentException(
+          String.format("Invalid row size %s in tablet format 
deserialization.", rowSize));
+    }
 
     // deserialize schemas
     final int schemaSize =
-        BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer))
-            ? ReadWriteIOUtils.readInt(byteBuffer)
-            : 0;
+        readBooleanByte(byteBuffer, "schema existence") ? 
ReadWriteIOUtils.readInt(byteBuffer) : 0;
+    if (schemaSize < 0) {
+      throw new IllegalArgumentException(
+          String.format("Invalid schema size %s in tablet format 
deserialization.", schemaSize));
+    }
+    ensureRemaining(byteBuffer, schemaSize, "measurement schema existence 
flags");
     final String[] measurement = new String[schemaSize];
     final TsTableColumnCategory[] columnCategories = new 
TsTableColumnCategory[schemaSize];
     final TSDataType[] dataTypes = new TSDataType[schemaSize];
@@ -148,15 +154,26 @@ public class TabletStatementConverter {
 
     // Deserialize and calculate memory in the same loop
     for (int i = 0; i < schemaSize; i++) {
-      final boolean hasSchema = 
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+      final boolean hasSchema = readBooleanByte(byteBuffer, "measurement 
schema existence");
       if (hasSchema) {
         final Pair<String, TSDataType> pair = readMeasurement(byteBuffer, 
tabletStringInternPool);
         measurement[i] = pair.getLeft();
         dataTypes[i] = pair.getRight();
         if (readColumnCategory) {
+          if (!byteBuffer.hasRemaining()) {
+            throw new IllegalArgumentException(
+                "Missing column category in current tablet format 
deserialization.");
+          }
+          final byte columnCategory = byteBuffer.get();
+          if (columnCategory < 0 || columnCategory >= 
ColumnCategory.values().length) {
+            throw new IllegalArgumentException(
+                String.format(
+                    "Invalid column category %s in current tablet format 
deserialization.",
+                    columnCategory));
+          }
           columnCategories[i] =
               TsTableColumnCategory.fromTsFileColumnCategory(
-                  ColumnCategory.values()[byteBuffer.get()]);
+                  ColumnCategory.values()[columnCategory]);
         }
 
         // Calculate memory for each measurement string
@@ -178,6 +195,15 @@ public class TabletStatementConverter {
     memorySize += measurementMemorySize;
     memorySize += dataTypesMemorySize;
 
+    final boolean isTimesNotNull = readBooleanByte(byteBuffer, "timestamp 
column existence");
+    if (rowSize > 0 && !isTimesNotNull) {
+      throw new IllegalArgumentException(
+          "Missing timestamps in tablet format deserialization with non-empty 
rows.");
+    }
+    if (isTimesNotNull) {
+      ensureRemaining(byteBuffer, (long) Long.BYTES * rowSize, "timestamps");
+    }
+
     // deserialize times and calculate memory during deserialization
     final long[] times = new long[rowSize];
     // Calculate memory: array header + long size * rowSize
@@ -185,7 +211,6 @@ public class TabletStatementConverter {
         org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
             NUM_BYTES_ARRAY_HEADER + (long) Long.BYTES * rowSize);
 
-    final boolean isTimesNotNull = 
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
     if (isTimesNotNull) {
       for (int i = 0; i < rowSize; i++) {
         times[i] = ReadWriteIOUtils.readLong(byteBuffer);
@@ -199,7 +224,7 @@ public class TabletStatementConverter {
     final BitMap[] bitMaps;
     final long bitMapsMemorySize;
 
-    final boolean isBitMapsNotNull = 
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+    final boolean isBitMapsNotNull = readBooleanByte(byteBuffer, "bitmap 
column existence");
     if (isBitMapsNotNull) {
       // Use the method that returns both BitMap array and memory size
       final Pair<BitMap[], Long> bitMapsAndMemory =
@@ -218,7 +243,11 @@ public class TabletStatementConverter {
     final Object[] values;
     final long valuesMemorySize;
 
-    final boolean isValuesNotNull = 
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+    final boolean isValuesNotNull = readBooleanByte(byteBuffer, "value column 
existence");
+    if (rowSize > 0 && schemaSize > 0 && !isValuesNotNull) {
+      throw new IllegalArgumentException(
+          "Missing values in tablet format deserialization with non-empty 
rows.");
+    }
     if (isValuesNotNull) {
       // Use the method that returns both values array and memory size
       final Pair<Object[], Long> valuesAndMemory =
@@ -236,7 +265,7 @@ public class TabletStatementConverter {
     // Add values memory to total
     memorySize += valuesMemorySize;
 
-    final boolean isAligned = ReadWriteIOUtils.readBoolean(byteBuffer);
+    final boolean isAligned = readBooleanByte(byteBuffer, "alignment");
 
     statement.setMeasurements(measurement);
     statement.setTimes(times);
@@ -321,6 +350,31 @@ public class TabletStatementConverter {
     }
   }
 
+  private static boolean readBooleanByte(final ByteBuffer buffer, final String 
fieldName) {
+    if (!buffer.hasRemaining()) {
+      throw new IllegalArgumentException(
+          String.format("Missing %s flag in tablet format deserialization.", 
fieldName));
+    }
+
+    final byte value = ReadWriteIOUtils.readByte(buffer);
+    if (value != 0 && value != 1) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Invalid %s flag %s in tablet format deserialization.", 
fieldName, value));
+    }
+    return value == 1;
+  }
+
+  private static void ensureRemaining(
+      final ByteBuffer buffer, final long expectedSize, final String 
fieldName) {
+    if (expectedSize > buffer.remaining()) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Insufficient bytes for %s in tablet format deserialization, 
expected %s, remaining %s.",
+              fieldName, expectedSize, buffer.remaining()));
+    }
+  }
+
   /**
    * Read measurement name and data type from buffer, skipping other 
measurement schema fields
    * (encoding, compression, and tags/attributes) that are not needed for 
InsertTabletStatement.
@@ -364,9 +418,13 @@ public class TabletStatementConverter {
     boolean hasMarkedBitMap = false;
 
     for (int i = 0; i < columns; i++) {
-      final boolean hasBitMap = 
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+      final boolean hasBitMap = readBooleanByte(byteBuffer, "bitmap 
existence");
       if (hasBitMap) {
         final int size = ReadWriteIOUtils.readInt(byteBuffer);
+        if (size < 0) {
+          throw new IllegalArgumentException(
+              String.format("Invalid bitmap size %s in tablet format 
deserialization.", size));
+        }
         final Binary valueBinary = ReadWriteIOUtils.readBinary(byteBuffer);
         final byte[] byteArray = valueBinary.getValues();
         final BitMap bitMap = new BitMap(size, byteArray);
@@ -417,7 +475,7 @@ public class TabletStatementConverter {
 
     for (int i = 0; i < columns; i++) {
       final boolean isValueColumnsNotNull =
-          BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+          readBooleanByte(byteBuffer, "value column existence");
       if (types[i] == null) {
         continue;
       }
@@ -427,7 +485,7 @@ public class TabletStatementConverter {
           final boolean[] boolValues = new boolean[rowSize];
           if (isValueColumnsNotNull) {
             for (int index = 0; index < rowSize; index++) {
-              boolValues[index] = 
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+              boolValues[index] = readBooleanByte(byteBuffer, "boolean value");
             }
           }
           values[i] = boolValues;
@@ -503,8 +561,7 @@ public class TabletStatementConverter {
 
           if (isValueColumnsNotNull) {
             for (int index = 0; index < rowSize; index++) {
-              final boolean isNotNull =
-                  BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+              final boolean isNotNull = readBooleanByte(byteBuffer, "binary 
value existence");
               if (isNotNull) {
                 binaryValues[index] = ReadWriteIOUtils.readBinary(byteBuffer);
                 // Calculate memory for each Binary object during 
deserialization
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
index 69c0c117d5e..b95852aa936 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
@@ -755,6 +755,52 @@ public class PipeDataNodeThriftRequestTest {
     assertLegacyTabletStatement(deserializedReq.constructStatement());
   }
 
+  @Test
+  public void testPipeTransferTabletRawReqWithSingleColumnLegacyTabletFormat()
+      throws IOException {
+    final TPipeTransferReq req = new TPipeTransferReq();
+    req.version = IoTDBSinkRequestVersion.VERSION_1.getVersion();
+    req.type = PipeRequestType.TRANSFER_TABLET_RAW.getType();
+    req.body = serializeSingleColumnLegacyTabletRawBuffer(false);
+
+    final PipeTransferTabletRawReq deserializedReq =
+        PipeTransferTabletRawReq.fromTPipeTransferReq(req);
+
+    Assert.assertFalse(deserializedReq.getIsAligned());
+    final InsertTabletStatement statement = 
deserializedReq.constructStatement();
+    Assert.assertEquals("root.sg.d", statement.getDevicePath().getFullPath());
+    Assert.assertArrayEquals(new String[] {"s1"}, statement.getMeasurements());
+    Assert.assertArrayEquals(new TSDataType[] {TSDataType.INT32}, 
statement.getDataTypes());
+    Assert.assertEquals(2, statement.getRowCount());
+    Assert.assertArrayEquals(new long[] {1700000000000L, 1700000000001L}, 
statement.getTimes());
+    Assert.assertArrayEquals(new int[] {2, 1}, (int[]) 
statement.getColumns()[0]);
+  }
+
+  @Test
+  public void testPipeTransferTabletBatchReqRejectsTruncatedRawTablet() throws 
IOException {
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(0, outputStream);
+      ReadWriteIOUtils.write(0, outputStream);
+      ReadWriteIOUtils.write(1, outputStream);
+      outputStream.write(new byte[] {1, 0, 0, 0, 0, 0});
+
+      final TPipeTransferReq req =
+          legacyTransferReq(
+              PipeRequestType.TRANSFER_TABLET_BATCH,
+              ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size()));
+
+      try {
+        PipeTransferTabletBatchReq.fromTPipeTransferReq(req);
+        Assert.fail("Expected IllegalArgumentException");
+      } catch (final IllegalArgumentException e) {
+        Assert.assertTrue(e.getMessage().contains("Failed to deserialize raw 
tablet"));
+        Assert.assertTrue(
+            e.getCause().getMessage().contains("Failed to deserialize raw 
tablet request"));
+      }
+    }
+  }
+
   @Test
   public void testPipeTransferTabletBatchReqV2() throws IOException {
     final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
@@ -1285,8 +1331,8 @@ public class PipeDataNodeThriftRequestTest {
       writeLegacyMeasurementSchema(outputStream, "s2", TSDataType.TEXT);
 
       ReadWriteIOUtils.write((byte) 1, outputStream);
-      ReadWriteIOUtils.write(2L, outputStream);
-      ReadWriteIOUtils.write(1L, outputStream);
+      ReadWriteIOUtils.write(1700000000000L, outputStream);
+      ReadWriteIOUtils.write(1700000000001L, outputStream);
 
       ReadWriteIOUtils.write((byte) 0, outputStream);
 
@@ -1305,6 +1351,33 @@ public class PipeDataNodeThriftRequestTest {
     }
   }
 
+  private static ByteBuffer serializeSingleColumnLegacyTabletRawBuffer(final 
boolean isAligned)
+      throws IOException {
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write("root.sg.d", outputStream);
+      ReadWriteIOUtils.write(2, outputStream);
+
+      ReadWriteIOUtils.write((byte) 1, outputStream);
+      ReadWriteIOUtils.write(1, outputStream);
+      writeLegacyMeasurementSchema(outputStream, "s1", TSDataType.INT32);
+
+      ReadWriteIOUtils.write((byte) 1, outputStream);
+      ReadWriteIOUtils.write(1700000000000L, outputStream);
+      ReadWriteIOUtils.write(1700000000001L, outputStream);
+
+      ReadWriteIOUtils.write((byte) 0, outputStream);
+
+      ReadWriteIOUtils.write((byte) 1, outputStream);
+      ReadWriteIOUtils.write((byte) 1, outputStream);
+      ReadWriteIOUtils.write(2, outputStream);
+      ReadWriteIOUtils.write(1, outputStream);
+
+      ReadWriteIOUtils.write(isAligned, outputStream);
+      return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    }
+  }
+
   private static ByteBuffer serializeLegacyTabletBatchBody(
       final List<ByteBuffer> insertNodeBuffers, final List<ByteBuffer> 
tabletBuffers)
       throws IOException {

Reply via email to