This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new e480abc469d Pipe: Fix the inconsistency between schema and values 
columns in the process of building tsfile (#15625) (#15778)
e480abc469d is described below

commit e480abc469d6ce75abfdc9e4b94300cf4d9765ce
Author: Zhenyu Luo <[email protected]>
AuthorDate: Wed Jun 18 18:42:20 2025 +0800

    Pipe: Fix the inconsistency between schema and values columns in the 
process of building tsfile (#15625) (#15778)
---
 .../batch/PipeTabletEventTsFileBatch.java          | 41 ++++++++++++++++------
 1 file changed, 31 insertions(+), 10 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
index a03f2bd970a..fd4ce71b6f9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
@@ -47,6 +47,7 @@ import org.apache.tsfile.utils.DateUtils;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.write.TsFileWriter;
 import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
 import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.apache.tsfile.write.writer.RestorableTsFileIOWriter;
 import org.slf4j.Logger;
@@ -367,6 +368,7 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
     final Set<MeasurementSchema> seen = new HashSet<>();
     final List<Integer> distinctIndices =
         IntStream.range(0, aggregatedSchemas.size())
+            .filter(i -> Objects.nonNull(aggregatedSchemas.get(i)))
             .filter(i -> seen.add(aggregatedSchemas.get(i))) // Only keep the 
first occurrence index
             .boxed()
             .collect(Collectors.toList());
@@ -548,14 +550,23 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
         final IMemTable memTable, final RestorableTsFileIOWriter writer) 
throws Exception {
       for (int i = 0, size = tabletList.size(); i < size; ++i) {
         final Tablet tablet = tabletList.get(i);
+        MeasurementSchema[] measurementSchemas =
+            tablet.getSchemas().stream()
+                .map(schema -> (MeasurementSchema) schema)
+                .toArray(MeasurementSchema[]::new);
+        Object[] values = Arrays.copyOf(tablet.values, tablet.values.length);
+        BitMap[] bitMaps = Arrays.copyOf(tablet.bitMaps, 
tablet.bitMaps.length);
 
         // convert date value to int refer to
         // 
org.apache.iotdb.db.storageengine.dataregion.memtable.WritableMemChunk.writeNonAlignedTablet
-        final Object[] values = Arrays.copyOf(tablet.values, 
tablet.values.length);
+        int validatedIndex = 0;
         for (int j = 0; j < tablet.getSchemas().size(); ++j) {
-          final MeasurementSchema schema = tablet.getSchemas().get(j);
-          if (Objects.nonNull(schema)
-              && Objects.equals(TSDataType.DATE, schema.getType())
+          final IMeasurementSchema schema = measurementSchemas[j];
+          if (Objects.isNull(schema)) {
+            continue;
+          }
+
+          if (Objects.equals(TSDataType.DATE, schema.getType())
               && values[j] instanceof LocalDate[]) {
             final LocalDate[] dates = ((LocalDate[]) values[j]);
             final int[] dateValues = new int[dates.length];
@@ -564,6 +575,16 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
             }
             values[j] = dateValues;
           }
+          measurementSchemas[validatedIndex] = measurementSchemas[j];
+          values[validatedIndex] = values[j];
+          bitMaps[validatedIndex] = bitMaps[j];
+          validatedIndex++;
+        }
+
+        if (validatedIndex != measurementSchemas.length) {
+          values = Arrays.copyOf(values, validatedIndex);
+          measurementSchemas = Arrays.copyOf(measurementSchemas, 
validatedIndex);
+          bitMaps = Arrays.copyOf(bitMaps, validatedIndex);
         }
 
         final InsertTabletNode insertTabletNode =
@@ -571,16 +592,16 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
                 PLACEHOLDER_PLAN_NODE_ID,
                 new PartialPath(tablet.deviceId),
                 isTabletAlignedList.get(i),
-                tablet.getSchemas().stream()
-                    .map(m -> Objects.nonNull(m) ? m.getMeasurementId() : null)
+                Arrays.stream(measurementSchemas)
+                    .map(IMeasurementSchema::getMeasurementId)
                     .toArray(String[]::new),
-                tablet.getSchemas().stream()
-                    .map(m -> Objects.nonNull(m) ? m.getType() : null)
+                Arrays.stream(measurementSchemas)
+                    .map(IMeasurementSchema::getType)
                     .toArray(TSDataType[]::new),
                 // TODO: cast
-                tablet.getSchemas().toArray(new MeasurementSchema[0]),
+                measurementSchemas,
                 tablet.timestamps,
-                tablet.bitMaps,
+                bitMaps,
                 values,
                 tablet.rowSize);
 

Reply via email to