This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new e480abc469d Pipe: Fix the inconsistency between schema and values
columns in the process of building tsfile (#15625) (#15778)
e480abc469d is described below
commit e480abc469d6ce75abfdc9e4b94300cf4d9765ce
Author: Zhenyu Luo <[email protected]>
AuthorDate: Wed Jun 18 18:42:20 2025 +0800
Pipe: Fix the inconsistency between schema and values columns in the
process of building tsfile (#15625) (#15778)
---
.../batch/PipeTabletEventTsFileBatch.java | 41 ++++++++++++++++------
1 file changed, 31 insertions(+), 10 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
index a03f2bd970a..fd4ce71b6f9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
@@ -47,6 +47,7 @@ import org.apache.tsfile.utils.DateUtils;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.TsFileWriter;
import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.apache.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
@@ -367,6 +368,7 @@ public class PipeTabletEventTsFileBatch extends
PipeTabletEventBatch {
final Set<MeasurementSchema> seen = new HashSet<>();
final List<Integer> distinctIndices =
IntStream.range(0, aggregatedSchemas.size())
+ .filter(i -> Objects.nonNull(aggregatedSchemas.get(i)))
.filter(i -> seen.add(aggregatedSchemas.get(i))) // Only keep the
first occurrence index
.boxed()
.collect(Collectors.toList());
@@ -548,14 +550,23 @@ public class PipeTabletEventTsFileBatch extends
PipeTabletEventBatch {
final IMemTable memTable, final RestorableTsFileIOWriter writer)
throws Exception {
for (int i = 0, size = tabletList.size(); i < size; ++i) {
final Tablet tablet = tabletList.get(i);
+ MeasurementSchema[] measurementSchemas =
+ tablet.getSchemas().stream()
+ .map(schema -> (MeasurementSchema) schema)
+ .toArray(MeasurementSchema[]::new);
+ Object[] values = Arrays.copyOf(tablet.values, tablet.values.length);
+ BitMap[] bitMaps = Arrays.copyOf(tablet.bitMaps,
tablet.bitMaps.length);
// convert date value to int refer to
//
org.apache.iotdb.db.storageengine.dataregion.memtable.WritableMemChunk.writeNonAlignedTablet
- final Object[] values = Arrays.copyOf(tablet.values,
tablet.values.length);
+ int validatedIndex = 0;
for (int j = 0; j < tablet.getSchemas().size(); ++j) {
- final MeasurementSchema schema = tablet.getSchemas().get(j);
- if (Objects.nonNull(schema)
- && Objects.equals(TSDataType.DATE, schema.getType())
+ final IMeasurementSchema schema = measurementSchemas[j];
+ if (Objects.isNull(schema)) {
+ continue;
+ }
+
+ if (Objects.equals(TSDataType.DATE, schema.getType())
&& values[j] instanceof LocalDate[]) {
final LocalDate[] dates = ((LocalDate[]) values[j]);
final int[] dateValues = new int[dates.length];
@@ -564,6 +575,16 @@ public class PipeTabletEventTsFileBatch extends
PipeTabletEventBatch {
}
values[j] = dateValues;
}
+ measurementSchemas[validatedIndex] = measurementSchemas[j];
+ values[validatedIndex] = values[j];
+ bitMaps[validatedIndex] = bitMaps[j];
+ validatedIndex++;
+ }
+
+ if (validatedIndex != measurementSchemas.length) {
+ values = Arrays.copyOf(values, validatedIndex);
+ measurementSchemas = Arrays.copyOf(measurementSchemas,
validatedIndex);
+ bitMaps = Arrays.copyOf(bitMaps, validatedIndex);
}
final InsertTabletNode insertTabletNode =
@@ -571,16 +592,16 @@ public class PipeTabletEventTsFileBatch extends
PipeTabletEventBatch {
PLACEHOLDER_PLAN_NODE_ID,
new PartialPath(tablet.deviceId),
isTabletAlignedList.get(i),
- tablet.getSchemas().stream()
- .map(m -> Objects.nonNull(m) ? m.getMeasurementId() : null)
+ Arrays.stream(measurementSchemas)
+ .map(IMeasurementSchema::getMeasurementId)
.toArray(String[]::new),
- tablet.getSchemas().stream()
- .map(m -> Objects.nonNull(m) ? m.getType() : null)
+ Arrays.stream(measurementSchemas)
+ .map(IMeasurementSchema::getType)
.toArray(TSDataType[]::new),
// TODO: cast
- tablet.getSchemas().toArray(new MeasurementSchema[0]),
+ measurementSchemas,
tablet.timestamps,
- tablet.bitMaps,
+ bitMaps,
values,
tablet.rowSize);