This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch fix-legacy-compat-followup in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit cec0069e9cdb46f789107776af68e896d00dc9a5 Author: Caideyipi <[email protected]> AuthorDate: Fri Jun 12 14:19:52 2026 +0800 Hotfix --- .../request/PipeTransferTabletBatchReq.java | 55 +++++++++++--- .../request/PipeTransferTabletRawReq.java | 13 +++- .../pipe/sink/util/TabletStatementConverter.java | 87 ++++++++++++++++++---- .../pipe/sink/PipeDataNodeThriftRequestTest.java | 77 ++++++++++++++++++- 4 files changed, 203 insertions(+), 29 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java index 797ea509602..340232c8b62 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java @@ -154,10 +154,10 @@ public class PipeTransferTabletBatchReq extends TPipeTransferReq { final TabletStringInternPool tabletStringInternPool = new TabletStringInternPool(); // Legacy 1.3.x batch bodies may carry WAL binary requests before insert nodes and tablets. - int size = ReadWriteIOUtils.readInt(transferReq.body); + int size = readNonNegativeSize(transferReq.body, "binary request count"); for (int i = 0; i < size; ++i) { - final int length = ReadWriteIOUtils.readInt(transferReq.body); - if (length < 0 || length > transferReq.body.remaining()) { + final int length = readNonNegativeSize(transferReq.body, "binary request body length"); + if (length > transferReq.body.remaining()) { throw new IllegalArgumentException( String.format( "Invalid binary request body length %s, remaining body length %s.", @@ -169,17 +169,36 @@ public class PipeTransferTabletBatchReq extends TPipeTransferReq { PipeTransferTabletBinaryReq.toTPipeTransferReq(ByteBuffer.wrap(body))); } - size = ReadWriteIOUtils.readInt(transferReq.body); + size = readNonNegativeSize(transferReq.body, "insert node count"); for (int i = 0; i < size; ++i) { - batchReq.insertNodeReqs.add( - PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq( - (InsertNode) PlanFragment.deserializeHelper(transferReq.body, null))); + final int startPosition = transferReq.body.position(); + try { + batchReq.insertNodeReqs.add( + PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq( + (InsertNode) PlanFragment.deserializeHelper(transferReq.body, null))); + } catch (final RuntimeException e) { + throw new IllegalArgumentException( + String.format( + "Failed to deserialize insert node %s/%s in tablet batch at body position %s with remaining body length %s.", + i + 1, size, startPosition, transferReq.body.remaining()), + e); + } } - size = ReadWriteIOUtils.readInt(transferReq.body); + size = readNonNegativeSize(transferReq.body, "raw tablet count"); for (int i = 0; i < size; ++i) { - batchReq.tabletReqs.add( - PipeTransferTabletRawReq.toTPipeTransferRawReq(transferReq.body, tabletStringInternPool)); + final int startPosition = transferReq.body.position(); + try { + batchReq.tabletReqs.add( + PipeTransferTabletRawReq.toTPipeTransferRawReq( + transferReq.body, tabletStringInternPool)); + } catch (final RuntimeException e) { + throw new IllegalArgumentException( + String.format( + "Failed to deserialize raw tablet %s/%s in tablet batch at body position %s with remaining body length %s.", + i + 1, size, startPosition, transferReq.body.remaining()), + e); + } } batchReq.version = transferReq.version; @@ -188,6 +207,22 @@ public class PipeTransferTabletBatchReq extends TPipeTransferReq { return batchReq; } + private static int readNonNegativeSize(final ByteBuffer buffer, final String fieldName) { + if (buffer.remaining() < Integer.BYTES) { + throw new IllegalArgumentException( + String.format( + "Insufficient bytes to read %s in tablet batch, remaining body length %s.", + fieldName, buffer.remaining())); + } + + final int size = ReadWriteIOUtils.readInt(buffer); + if (size < 0) { + throw new IllegalArgumentException( + String.format("Invalid negative %s %s in tablet batch.", fieldName, size)); + } + return size; + } + /////////////////////////////// TestOnly /////////////////////////////// @TestOnly diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java index 63db24a37ba..e80b10c95e2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java @@ -188,8 +188,17 @@ public class PipeTransferTabletRawReq extends TPipeTransferReq { buffer.position(startPosition); } - tablet = PipeTabletUtils.internTablet(Tablet.deserialize(buffer), tabletStringInternPool); - isAligned = ReadWriteIOUtils.readBool(buffer); + try { + tablet = PipeTabletUtils.internTablet(Tablet.deserialize(buffer), tabletStringInternPool); + isAligned = ReadWriteIOUtils.readBool(buffer); + } catch (final RuntimeException e) { + buffer.position(startPosition); + throw new IllegalArgumentException( + String.format( + "Failed to deserialize raw tablet request at body position %s with remaining body length %s.", + startPosition, buffer.remaining()), + e); + } } private static void ensureStatementDeserializedFromCurrentTabletFormat( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java index 405859dd982..27032f4cfe3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java @@ -32,7 +32,6 @@ import org.apache.tsfile.enums.ColumnCategory; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.BitMap; -import org.apache.tsfile.utils.BytesUtils; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.apache.tsfile.write.UnSupportedDataTypeException; @@ -116,12 +115,19 @@ public class TabletStatementConverter { intern(ReadWriteIOUtils.readString(byteBuffer), tabletStringInternPool); final int rowSize = ReadWriteIOUtils.readInt(byteBuffer); + if (rowSize < 0) { + throw new IllegalArgumentException( + String.format("Invalid row size %s in tablet format deserialization.", rowSize)); + } // deserialize schemas final int schemaSize = - BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)) - ? ReadWriteIOUtils.readInt(byteBuffer) - : 0; + readBooleanByte(byteBuffer, "schema existence") ? ReadWriteIOUtils.readInt(byteBuffer) : 0; + if (schemaSize < 0) { + throw new IllegalArgumentException( + String.format("Invalid schema size %s in tablet format deserialization.", schemaSize)); + } + ensureRemaining(byteBuffer, schemaSize, "measurement schema existence flags"); final String[] measurement = new String[schemaSize]; final TsTableColumnCategory[] columnCategories = new TsTableColumnCategory[schemaSize]; final TSDataType[] dataTypes = new TSDataType[schemaSize]; @@ -148,15 +154,26 @@ public class TabletStatementConverter { // Deserialize and calculate memory in the same loop for (int i = 0; i < schemaSize; i++) { - final boolean hasSchema = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); + final boolean hasSchema = readBooleanByte(byteBuffer, "measurement schema existence"); if (hasSchema) { final Pair<String, TSDataType> pair = readMeasurement(byteBuffer, tabletStringInternPool); measurement[i] = pair.getLeft(); dataTypes[i] = pair.getRight(); if (readColumnCategory) { + if (!byteBuffer.hasRemaining()) { + throw new IllegalArgumentException( + "Missing column category in current tablet format deserialization."); + } + final byte columnCategory = byteBuffer.get(); + if (columnCategory < 0 || columnCategory >= ColumnCategory.values().length) { + throw new IllegalArgumentException( + String.format( + "Invalid column category %s in current tablet format deserialization.", + columnCategory)); + } columnCategories[i] = TsTableColumnCategory.fromTsFileColumnCategory( - ColumnCategory.values()[byteBuffer.get()]); + ColumnCategory.values()[columnCategory]); } // Calculate memory for each measurement string @@ -178,6 +195,15 @@ public class TabletStatementConverter { memorySize += measurementMemorySize; memorySize += dataTypesMemorySize; + final boolean isTimesNotNull = readBooleanByte(byteBuffer, "timestamp column existence"); + if (rowSize > 0 && !isTimesNotNull) { + throw new IllegalArgumentException( + "Missing timestamps in tablet format deserialization with non-empty rows."); + } + if (isTimesNotNull) { + ensureRemaining(byteBuffer, (long) Long.BYTES * rowSize, "timestamps"); + } + // deserialize times and calculate memory during deserialization final long[] times = new long[rowSize]; // Calculate memory: array header + long size * rowSize @@ -185,7 +211,6 @@ public class TabletStatementConverter { org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize( NUM_BYTES_ARRAY_HEADER + (long) Long.BYTES * rowSize); - final boolean isTimesNotNull = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); if (isTimesNotNull) { for (int i = 0; i < rowSize; i++) { times[i] = ReadWriteIOUtils.readLong(byteBuffer); @@ -199,7 +224,7 @@ public class TabletStatementConverter { final BitMap[] bitMaps; final long bitMapsMemorySize; - final boolean isBitMapsNotNull = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); + final boolean isBitMapsNotNull = readBooleanByte(byteBuffer, "bitmap column existence"); if (isBitMapsNotNull) { // Use the method that returns both BitMap array and memory size final Pair<BitMap[], Long> bitMapsAndMemory = @@ -218,7 +243,11 @@ public class TabletStatementConverter { final Object[] values; final long valuesMemorySize; - final boolean isValuesNotNull = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); + final boolean isValuesNotNull = readBooleanByte(byteBuffer, "value column existence"); + if (rowSize > 0 && schemaSize > 0 && !isValuesNotNull) { + throw new IllegalArgumentException( + "Missing values in tablet format deserialization with non-empty rows."); + } if (isValuesNotNull) { // Use the method that returns both values array and memory size final Pair<Object[], Long> valuesAndMemory = @@ -236,7 +265,7 @@ public class TabletStatementConverter { // Add values memory to total memorySize += valuesMemorySize; - final boolean isAligned = ReadWriteIOUtils.readBoolean(byteBuffer); + final boolean isAligned = readBooleanByte(byteBuffer, "alignment"); statement.setMeasurements(measurement); statement.setTimes(times); @@ -321,6 +350,31 @@ public class TabletStatementConverter { } } + private static boolean readBooleanByte(final ByteBuffer buffer, final String fieldName) { + if (!buffer.hasRemaining()) { + throw new IllegalArgumentException( + String.format("Missing %s flag in tablet format deserialization.", fieldName)); + } + + final byte value = ReadWriteIOUtils.readByte(buffer); + if (value != 0 && value != 1) { + throw new IllegalArgumentException( + String.format( + "Invalid %s flag %s in tablet format deserialization.", fieldName, value)); + } + return value == 1; + } + + private static void ensureRemaining( + final ByteBuffer buffer, final long expectedSize, final String fieldName) { + if (expectedSize > buffer.remaining()) { + throw new IllegalArgumentException( + String.format( + "Insufficient bytes for %s in tablet format deserialization, expected %s, remaining %s.", + fieldName, expectedSize, buffer.remaining())); + } + } + /** * Read measurement name and data type from buffer, skipping other measurement schema fields * (encoding, compression, and tags/attributes) that are not needed for InsertTabletStatement. @@ -364,9 +418,13 @@ public class TabletStatementConverter { boolean hasMarkedBitMap = false; for (int i = 0; i < columns; i++) { - final boolean hasBitMap = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); + final boolean hasBitMap = readBooleanByte(byteBuffer, "bitmap existence"); if (hasBitMap) { final int size = ReadWriteIOUtils.readInt(byteBuffer); + if (size < 0) { + throw new IllegalArgumentException( + String.format("Invalid bitmap size %s in tablet format deserialization.", size)); + } final Binary valueBinary = ReadWriteIOUtils.readBinary(byteBuffer); final byte[] byteArray = valueBinary.getValues(); final BitMap bitMap = new BitMap(size, byteArray); @@ -417,7 +475,7 @@ public class TabletStatementConverter { for (int i = 0; i < columns; i++) { final boolean isValueColumnsNotNull = - BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); + readBooleanByte(byteBuffer, "value column existence"); if (types[i] == null) { continue; } @@ -427,7 +485,7 @@ public class TabletStatementConverter { final boolean[] boolValues = new boolean[rowSize]; if (isValueColumnsNotNull) { for (int index = 0; index < rowSize; index++) { - boolValues[index] = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); + boolValues[index] = readBooleanByte(byteBuffer, "boolean value"); } } values[i] = boolValues; @@ -503,8 +561,7 @@ public class TabletStatementConverter { if (isValueColumnsNotNull) { for (int index = 0; index < rowSize; index++) { - final boolean isNotNull = - BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); + final boolean isNotNull = readBooleanByte(byteBuffer, "binary value existence"); if (isNotNull) { binaryValues[index] = ReadWriteIOUtils.readBinary(byteBuffer); // Calculate memory for each Binary object during deserialization diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java index 69c0c117d5e..b95852aa936 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java @@ -755,6 +755,52 @@ public class PipeDataNodeThriftRequestTest { assertLegacyTabletStatement(deserializedReq.constructStatement()); } + @Test + public void testPipeTransferTabletRawReqWithSingleColumnLegacyTabletFormat() + throws IOException { + final TPipeTransferReq req = new TPipeTransferReq(); + req.version = IoTDBSinkRequestVersion.VERSION_1.getVersion(); + req.type = PipeRequestType.TRANSFER_TABLET_RAW.getType(); + req.body = serializeSingleColumnLegacyTabletRawBuffer(false); + + final PipeTransferTabletRawReq deserializedReq = + PipeTransferTabletRawReq.fromTPipeTransferReq(req); + + Assert.assertFalse(deserializedReq.getIsAligned()); + final InsertTabletStatement statement = deserializedReq.constructStatement(); + Assert.assertEquals("root.sg.d", statement.getDevicePath().getFullPath()); + Assert.assertArrayEquals(new String[] {"s1"}, statement.getMeasurements()); + Assert.assertArrayEquals(new TSDataType[] {TSDataType.INT32}, statement.getDataTypes()); + Assert.assertEquals(2, statement.getRowCount()); + Assert.assertArrayEquals(new long[] {1700000000000L, 1700000000001L}, statement.getTimes()); + Assert.assertArrayEquals(new int[] {2, 1}, (int[]) statement.getColumns()[0]); + } + + @Test + public void testPipeTransferTabletBatchReqRejectsTruncatedRawTablet() throws IOException { + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + ReadWriteIOUtils.write(0, outputStream); + ReadWriteIOUtils.write(0, outputStream); + ReadWriteIOUtils.write(1, outputStream); + outputStream.write(new byte[] {1, 0, 0, 0, 0, 0}); + + final TPipeTransferReq req = + legacyTransferReq( + PipeRequestType.TRANSFER_TABLET_BATCH, + ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size())); + + try { + PipeTransferTabletBatchReq.fromTPipeTransferReq(req); + Assert.fail("Expected IllegalArgumentException"); + } catch (final IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains("Failed to deserialize raw tablet")); + Assert.assertTrue( + e.getCause().getMessage().contains("Failed to deserialize raw tablet request")); + } + } + } + @Test public void testPipeTransferTabletBatchReqV2() throws IOException { final List<ByteBuffer> insertNodeBuffers = new ArrayList<>(); @@ -1285,8 +1331,8 @@ public class PipeDataNodeThriftRequestTest { writeLegacyMeasurementSchema(outputStream, "s2", TSDataType.TEXT); ReadWriteIOUtils.write((byte) 1, outputStream); - ReadWriteIOUtils.write(2L, outputStream); - ReadWriteIOUtils.write(1L, outputStream); + ReadWriteIOUtils.write(1700000000000L, outputStream); + ReadWriteIOUtils.write(1700000000001L, outputStream); ReadWriteIOUtils.write((byte) 0, outputStream); @@ -1305,6 +1351,33 @@ public class PipeDataNodeThriftRequestTest { } } + private static ByteBuffer serializeSingleColumnLegacyTabletRawBuffer(final boolean isAligned) + throws IOException { + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + ReadWriteIOUtils.write("root.sg.d", outputStream); + ReadWriteIOUtils.write(2, outputStream); + + ReadWriteIOUtils.write((byte) 1, outputStream); + ReadWriteIOUtils.write(1, outputStream); + writeLegacyMeasurementSchema(outputStream, "s1", TSDataType.INT32); + + ReadWriteIOUtils.write((byte) 1, outputStream); + ReadWriteIOUtils.write(1700000000000L, outputStream); + ReadWriteIOUtils.write(1700000000001L, outputStream); + + ReadWriteIOUtils.write((byte) 0, outputStream); + + ReadWriteIOUtils.write((byte) 1, outputStream); + ReadWriteIOUtils.write((byte) 1, outputStream); + ReadWriteIOUtils.write(2, outputStream); + ReadWriteIOUtils.write(1, outputStream); + + ReadWriteIOUtils.write(isAligned, outputStream); + return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + } + } + private static ByteBuffer serializeLegacyTabletBatchBody( final List<ByteBuffer> insertNodeBuffers, final List<ByteBuffer> tabletBuffers) throws IOException {
