This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch patch-2094 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 04b3dc5bc37ba7046df9b376ffd689257d83d364 Author: Caideyipi <[email protected]> AuthorDate: Thu Jun 11 14:51:06 2026 +0800 Fix legacy raw tablet fallback --- .../request/PipeTransferTabletRawReq.java | 42 ++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java index 4d701d0b6fb..5b0831903b1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java @@ -27,6 +27,7 @@ import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; +import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.PublicBAOS; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.apache.tsfile.write.record.Tablet; @@ -156,6 +157,9 @@ public class PipeTransferTabletRawReq extends TPipeTransferReq { // V1: no databaseName, readDatabaseName = false final InsertTabletStatement insertTabletStatement = TabletStatementConverter.deserializeStatementFromTabletFormat(buffer, false); + // Legacy tablets do not serialize column categories. Since hasSchema=1 can be + // misread as FIELD, the current reader may return a corrupt statement instead of failing. + ensureStatementDeserializedFromCurrentTabletFormat(insertTabletStatement); isAligned = insertTabletStatement.isAligned(); // devicePath is already set in deserializeStatementFromTabletFormat for V1 format statement = insertTabletStatement; @@ -178,6 +182,44 @@ public class PipeTransferTabletRawReq extends TPipeTransferReq { isAligned = ReadWriteIOUtils.readBool(buffer); } + private static void ensureStatementDeserializedFromCurrentTabletFormat( + final InsertTabletStatement statement) { + final String[] measurements = statement.getMeasurements(); + final TSDataType[] dataTypes = statement.getDataTypes(); + + if (Objects.isNull(measurements) + || Objects.isNull(dataTypes) + || measurements.length != dataTypes.length) { + throw new IllegalArgumentException( + "Incomplete schema in current tablet format deserialization."); + } + + final Object[] columns = statement.getColumns(); + if (Objects.nonNull(columns) && columns.length != measurements.length) { + throw new IllegalArgumentException( + "Column count is inconsistent with schema count in current tablet format deserialization."); + } + + for (int i = 0; i < measurements.length; ++i) { + if (Objects.isNull(measurements[i]) || Objects.isNull(dataTypes[i])) { + throw new IllegalArgumentException( + "Incomplete measurement schema in current tablet format deserialization."); + } + if (statement.getRowCount() > 0 && (Objects.isNull(columns) || Objects.isNull(columns[i]))) { + throw new IllegalArgumentException( + "Incomplete column values in current tablet format deserialization."); + } + } + + final long[] times = statement.getTimes(); + if (statement.getRowCount() > 0 + && measurements.length > 0 + && (Objects.isNull(times) || times.length < statement.getRowCount())) { + throw new IllegalArgumentException( + "Incomplete timestamps in current tablet format deserialization."); + } + } + /////////////////////////////// Air Gap /////////////////////////////// /**
