This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch force_ci/support_schema_evolution
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 12b959ae2439a633fce5a54895d2727c07455d58
Author: Tian Jiang <[email protected]>
AuthorDate: Wed Dec 31 16:35:07 2025 +0800

    temp
---
 .../performer/impl/FastCompactionPerformer.java    | 14 ++++-
 .../impl/ReadChunkCompactionPerformer.java         |  2 +-
 .../impl/ReadPointCompactionPerformer.java         | 13 ++++-
 .../execute/utils/CompactionTableSchema.java       |  9 +++
 .../utils/CompactionTableSchemaCollector.java      | 20 ++++++-
 .../execute/utils/MultiTsFileDeviceIterator.java   | 18 +++++-
 .../utils/ReorderedTsFileDeviceIterator.java       | 65 ++++++++++++++++++++++
 .../utils/TransformedTsFileDeviceIterator.java     | 51 +++++++++++++++++
 .../utils/writer/AbstractCompactionWriter.java     |  4 +-
 .../writer/AbstractCrossCompactionWriter.java      | 25 ++++++++-
 .../writer/AbstractInnerCompactionWriter.java      | 16 +++++-
 .../compaction/io/CompactionTsFileWriter.java      | 13 ++++-
 .../dataregion/tsfile/TsFileManager.java           | 33 ++++++-----
 .../dataregion/tsfile/TsFileResource.java          | 49 ++++++++++++++--
 .../dataregion/tsfile/evolution/EvolvedSchema.java | 41 ++++++++++++++
 15 files changed, 335 insertions(+), 38 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java
index 91184aaec82..bc950a27114 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java
@@ -19,6 +19,8 @@
 
 package 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl;
 
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PatternTreeMap;
@@ -162,10 +164,18 @@ public class FastCompactionPerformer
                 ? new FastCrossCompactionWriter(
                     targetFiles, seqFiles, readerCacheMap, encryptParameter)
                 : new FastInnerCompactionWriter(targetFiles, 
encryptParameter)) {
+
+      List<TsFileResource> allSourceFiles = Stream.concat(seqFiles.stream(), 
unseqFiles.stream())
+          .sorted(TsFileResource::compareFileName)
+          .collect(Collectors.toList());
+      Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource = 
TsFileResource.getMaxTsFileSetEndVersionAndMinResource(
+          allSourceFiles);
       List<Schema> schemas =
           CompactionTableSchemaCollector.collectSchema(
-              seqFiles, unseqFiles, readerCacheMap, 
deviceIterator.getDeprecatedTableSchemaMap());
-      compactionWriter.setSchemaForAllTargetFile(schemas);
+              seqFiles, unseqFiles, readerCacheMap, 
deviceIterator.getDeprecatedTableSchemaMap(),
+              maxTsFileSetEndVersionAndMinResource);
+
+      compactionWriter.setSchemaForAllTargetFile(schemas, 
maxTsFileSetEndVersionAndMinResource);
       readModification(seqFiles);
       readModification(unseqFiles);
       while (deviceIterator.hasNextDevice()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
index eaed8a7f6eb..2fefbba9ab1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
@@ -205,7 +205,7 @@ public class ReadChunkCompactionPerformer implements 
ISeqCompactionPerformer {
   private void useNewWriter() throws IOException {
     currentWriter =
         new CompactionTsFileWriter(
-            targetResources.get(currentTargetFileIndex).getTsFile(),
+            targetResources.get(currentTargetFileIndex),
             memoryBudgetForFileWriter,
             CompactionType.INNER_SEQ_COMPACTION,
             firstEncryptParameter);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java
index c58870357d9..824bb7e5196 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java
@@ -18,6 +18,7 @@
  */
 package 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl;
 
+import java.util.stream.Stream;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.AlignedFullPath;
@@ -153,13 +154,21 @@ public class ReadPointCompactionPerformer
       // Do not close device iterator, because tsfile reader is managed by 
FileReaderManager.
       MultiTsFileDeviceIterator deviceIterator =
           new MultiTsFileDeviceIterator(seqFiles, unseqFiles);
+      List<TsFileResource> allSourceFiles = Stream.concat(seqFiles.stream(), 
unseqFiles.stream())
+          .sorted(TsFileResource::compareFileName)
+          .collect(Collectors.toList());
+      Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource = 
TsFileResource.getMaxTsFileSetEndVersionAndMinResource(
+          allSourceFiles);
+
       List<Schema> schemas =
           CompactionTableSchemaCollector.collectSchema(
               seqFiles,
               unseqFiles,
               deviceIterator.getReaderMap(),
-              deviceIterator.getDeprecatedTableSchemaMap());
-      compactionWriter.setSchemaForAllTargetFile(schemas);
+              deviceIterator.getDeprecatedTableSchemaMap(),
+              maxTsFileSetEndVersionAndMinResource);
+
+      compactionWriter.setSchemaForAllTargetFile(schemas, 
maxTsFileSetEndVersionAndMinResource);
       while (deviceIterator.hasNextDevice()) {
         checkThreadInterrupted();
         Pair<IDeviceID, Boolean> deviceInfo = deviceIterator.nextDevice();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchema.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchema.java
index 3f6e83cbe96..99c4286c759 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchema.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchema.java
@@ -33,6 +33,15 @@ public class CompactionTableSchema extends TableSchema {
     super(tableName);
   }
 
+  public CompactionTableSchema(TableSchema tableSchema) {
+    this(tableSchema.getTableName(), tableSchema.getColumnSchemas(), 
tableSchema.getColumnTypes());
+  }
+
+  public CompactionTableSchema(String tableName, List<IMeasurementSchema> 
columnSchemas,
+      List<ColumnCategory> columnCategories) {
+    super(tableName, columnSchemas, columnCategories);
+  }
+
   public boolean merge(TableSchema tableSchema) {
     if (tableSchema == null) {
       return true;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchemaCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchemaCollector.java
index 55640c3fcfa..d5eb8d08d98 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchemaCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchemaCollector.java
@@ -21,8 +21,10 @@ package 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils;
 
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchema;
 import org.apache.tsfile.file.metadata.TableSchema;
 import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.write.schema.Schema;
 
 import java.io.IOException;
@@ -42,7 +44,8 @@ public class CompactionTableSchemaCollector {
       List<TsFileResource> seqFiles,
       List<TsFileResource> unseqFiles,
       Map<TsFileResource, TsFileSequenceReader> readerMap,
-      Map<TsFileResource, Set<String>> deprecatedTableSchemaMap)
+      Map<TsFileResource, Set<String>> deprecatedTableSchemaMap,
+      Pair<Long, TsFileResource> maxTsFileSetEndVersionAndAssociatedResource)
       throws IOException {
     List<Schema> targetSchemas = new ArrayList<>(seqFiles.size());
     Schema schema =
@@ -51,7 +54,8 @@ public class CompactionTableSchemaCollector {
                 .sorted(TsFileResource::compareFileName)
                 .collect(Collectors.toList()),
             readerMap,
-            deprecatedTableSchemaMap);
+            deprecatedTableSchemaMap,
+            maxTsFileSetEndVersionAndAssociatedResource);
 
     targetSchemas.add(schema);
     for (int i = 1; i < seqFiles.size(); i++) {
@@ -72,10 +76,12 @@ public class CompactionTableSchemaCollector {
   public static Schema collectSchema(
       List<TsFileResource> sourceFiles,
       Map<TsFileResource, TsFileSequenceReader> readerMap,
-      Map<TsFileResource, Set<String>> deprecatedTableSchemaMap)
+      Map<TsFileResource, Set<String>> deprecatedTableSchemaMap,
+      Pair<Long, TsFileResource> maxTsFileSetEndVersionAndAssociatedResource)
       throws IOException {
     Schema targetSchema = new Schema();
     Map<String, TableSchema> targetTableSchemaMap = new HashMap<>();
+
     for (int i = 0; i < sourceFiles.size(); i++) {
       TsFileResource resource = sourceFiles.get(i);
       TsFileSequenceReader reader = readerMap.get(resource);
@@ -84,12 +90,20 @@ public class CompactionTableSchemaCollector {
         // v3 tsfile
         continue;
       }
+
+      EvolvedSchema evolvedSchema = resource.getMergedEvolvedSchema(
+          maxTsFileSetEndVersionAndAssociatedResource.getLeft());
+
       for (Map.Entry<String, TableSchema> entry : tableSchemaMap.entrySet()) {
         String tableName = entry.getKey();
         TableSchema currentTableSchema = entry.getValue();
         if (isTreeModel(currentTableSchema)) {
           continue;
         }
+        if (evolvedSchema != null) {
+          currentTableSchema = 
evolvedSchema.rewriteToFinal(currentTableSchema);
+        }
+
         // merge all id columns, measurement schema will be generated 
automatically when end chunk
         // group
         CompactionTableSchema collectedTableSchema =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
index 1889182a2db..aef15ad3dd4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
@@ -32,6 +32,8 @@ import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry;
 import 
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchema;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.fileset.TsFileSet;
 import org.apache.iotdb.db.utils.EncryptDBUtils;
 import org.apache.iotdb.db.utils.ModificationUtils;
 import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
@@ -95,6 +97,12 @@ public class MultiTsFileDeviceIterator implements 
AutoCloseable {
     // sort the files from the newest to the oldest
     Collections.sort(
         this.tsFileResourcesSortedByDesc, 
TsFileResource::compareFileCreationOrderByDesc);
+    long maxTsFileSetEndVersion = 
this.tsFileResourcesSortedByDesc.stream().mapToLong(
+        // max endVersion of all filesets of a TsFile
+        resource -> 
resource.getTsFileSets().stream().mapToLong(TsFileSet::getEndVersion).max()
+            .orElse(Long.MAX_VALUE))
+        // overall max endVersion
+        .max().orElse(Long.MAX_VALUE);
     try {
       for (TsFileResource tsFileResource : this.tsFileResourcesSortedByDesc) {
         CompactionTsFileReader reader =
@@ -103,7 +111,15 @@ public class MultiTsFileDeviceIterator implements 
AutoCloseable {
                 CompactionType.INNER_SEQ_COMPACTION,
                 
EncryptDBUtils.getFirstEncryptParamFromTSFilePath(tsFileResource.getTsFilePath()));
         readerMap.put(tsFileResource, reader);
-        deviceIteratorMap.put(tsFileResource, 
reader.getAllDevicesIteratorWithIsAligned());
+        TsFileDeviceIterator tsFileDeviceIterator;
+        EvolvedSchema evolvedSchema = 
tsFileResource.getMergedEvolvedSchema(maxTsFileSetEndVersion);
+        if (evolvedSchema != null) {
+          tsFileDeviceIterator = new ReorderedTsFileDeviceIterator(reader,
+              evolvedSchema::rewriteToFinal);
+        } else {
+          tsFileDeviceIterator = reader.getAllDevicesIteratorWithIsAligned();
+        }
+        deviceIteratorMap.put(tsFileResource, tsFileDeviceIterator);
       }
     } catch (Exception e) {
       // if there is any exception occurs
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/ReorderedTsFileDeviceIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/ReorderedTsFileDeviceIterator.java
new file mode 100644
index 00000000000..14d2db8a191
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/ReorderedTsFileDeviceIterator.java
@@ -0,0 +1,65 @@
+package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.utils.Pair;
+
+public class ReorderedTsFileDeviceIterator extends 
TransformedTsFileDeviceIterator {
+
+  private final List<Pair<Pair<IDeviceID, Boolean>, MetadataIndexNode>> 
deviceIDAndFirstMeasurementNodeList = new ArrayList<>();
+  private Iterator<Pair<Pair<IDeviceID, Boolean>, MetadataIndexNode>> 
deviceIDListIterator;
+  private Pair<Pair<IDeviceID, Boolean>, MetadataIndexNode> current;
+
+  public ReorderedTsFileDeviceIterator(TsFileSequenceReader reader,
+      Function<IDeviceID, IDeviceID> transformer)
+      throws IOException {
+    super(reader, transformer);
+    collectAndSort();
+  }
+
+  public ReorderedTsFileDeviceIterator(TsFileSequenceReader reader, String 
tableName,
+      Function<IDeviceID, IDeviceID> transformer) throws IOException {
+    super(reader, tableName, transformer);
+    collectAndSort();
+  }
+
+  private void collectAndSort() {
+    while (super.hasNext()) {
+      Pair<IDeviceID, Boolean> next = super.next();
+      next.left = transformer.apply(next.left);
+      deviceIDAndFirstMeasurementNodeList.add(new Pair<>(next, 
super.getFirstMeasurementNodeOfCurrentDevice()));
+    }
+    deviceIDAndFirstMeasurementNodeList.sort(Comparator.comparing(p -> 
p.getLeft().getLeft()));
+    deviceIDListIterator = deviceIDAndFirstMeasurementNodeList.iterator();
+  }
+
+  @Override
+  public boolean hasNext() {
+    return deviceIDListIterator.hasNext();
+  }
+
+  @Override
+  public Pair<IDeviceID, Boolean> next() {
+    Pair<Pair<IDeviceID, Boolean>, MetadataIndexNode> next = 
deviceIDListIterator.next();
+    current = next;
+    return next.left;
+  }
+
+  @Override
+  public Pair<IDeviceID, Boolean> current() {
+    return current.left;
+  }
+
+  @Override
+  public MetadataIndexNode getFirstMeasurementNodeOfCurrentDevice() {
+    // the devices have been reordered, cannot use the measurementNode
+    return current.right;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/TransformedTsFileDeviceIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/TransformedTsFileDeviceIterator.java
new file mode 100644
index 00000000000..f1af028226d
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/TransformedTsFileDeviceIterator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils;
+
+import java.io.IOException;
+import java.util.function.Function;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.read.TsFileDeviceIterator;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.utils.Pair;
+
+public class TransformedTsFileDeviceIterator extends TsFileDeviceIterator {
+
+  protected Function<IDeviceID, IDeviceID> transformer;
+
+  public TransformedTsFileDeviceIterator(TsFileSequenceReader reader, 
Function<IDeviceID, IDeviceID> transformer)
+      throws IOException {
+    super(reader);
+    this.transformer = transformer;
+  }
+
+  public TransformedTsFileDeviceIterator(TsFileSequenceReader reader, String 
tableName, Function<IDeviceID, IDeviceID> transformer)
+      throws IOException {
+    super(reader, tableName);
+    this.transformer = transformer;
+  }
+
+  @Override
+  public Pair<IDeviceID, Boolean> next() {
+    Pair<IDeviceID, Boolean> next = super.next();
+    next.left = transformer.apply(next.left);
+    return next;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java
index 3458f9870d4..623f2b3287d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.wri
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileWriter;
 import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
 
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.tsfile.encrypt.EncryptParameter;
 import org.apache.tsfile.exception.write.PageException;
 import org.apache.tsfile.file.header.PageHeader;
@@ -38,6 +39,7 @@ import org.apache.tsfile.file.metadata.statistics.Statistics;
 import org.apache.tsfile.read.TimeValuePair;
 import org.apache.tsfile.read.common.Chunk;
 import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.TsPrimitiveType;
 import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
 import org.apache.tsfile.write.chunk.ChunkWriterImpl;
@@ -338,5 +340,5 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
     }
   }
 
-  public abstract void setSchemaForAllTargetFile(List<Schema> schemas);
+  public abstract void setSchemaForAllTargetFile(List<Schema> schemas, 
Pair<Long, TsFileResource> maxTsFileSetEndVersionAndAssociatedResource);
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
index f970ad65e56..6584513f606 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
@@ -19,12 +19,17 @@
 
 package 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer;
 
+import java.util.stream.Collectors;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionTableSchema;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileWriter;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchema;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.fileset.TsFileSet;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
 import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
 import org.apache.iotdb.db.utils.EncryptDBUtils;
@@ -35,6 +40,7 @@ import org.apache.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.tsfile.read.TimeValuePair;
 import org.apache.tsfile.read.TsFileSequenceReader;
 import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.TsPrimitiveType;
 import org.apache.tsfile.write.schema.Schema;
 
@@ -99,7 +105,7 @@ public abstract class AbstractCrossCompactionWriter extends 
AbstractCompactionWr
     for (int i = 0; i < targetResources.size(); i++) {
       this.targetFileWriters.add(
           new CompactionTsFileWriter(
-              targetResources.get(i).getTsFile(),
+              targetResources.get(i),
               memorySizeForEachWriter,
               CompactionType.CROSS_COMPACTION,
               this.encryptParameter));
@@ -266,9 +272,22 @@ public abstract class AbstractCrossCompactionWriter 
extends AbstractCompactionWr
   }
 
   @Override
-  public void setSchemaForAllTargetFile(List<Schema> schemas) {
+  public void setSchemaForAllTargetFile(List<Schema> schemas, Pair<Long, 
TsFileResource> maxTsFileSetEndVersionAndMinResource) {
     for (int i = 0; i < targetFileWriters.size(); i++) {
-      targetFileWriters.get(i).setSchema(schemas.get(i));
+      CompactionTsFileWriter compactionTsFileWriter = targetFileWriters.get(i);
+      Schema schema = schemas.get(i);
+      TsFileResource targetResource = 
compactionTsFileWriter.getTsFileResource();
+      if (maxTsFileSetEndVersionAndMinResource.right != null) {
+        long maxTsFileSetEndVersion = 
maxTsFileSetEndVersionAndMinResource.left;
+        TsFileResource minVersionResource = 
maxTsFileSetEndVersionAndMinResource.getRight();
+        targetResource.setTsFileManager(minVersionResource.getTsFileManager());
+        EvolvedSchema evolvedSchema = 
targetResource.getMergedEvolvedSchema(maxTsFileSetEndVersion);
+
+        schema = evolvedSchema.rewriteToOriginal(schema, 
CompactionTableSchema::new);
+        compactionTsFileWriter.setSchema(schema);
+      } else {
+        compactionTsFileWriter.setSchema(schema);
+      }
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
index 6573bb7e96e..d7d7a44c00b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
@@ -21,11 +21,13 @@ package 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.wr
 
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionTableSchema;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionTableSchemaCollector;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileWriter;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchema;
 import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
 import org.apache.iotdb.db.utils.EncryptDBUtils;
 
@@ -33,6 +35,7 @@ import org.apache.tsfile.encrypt.EncryptParameter;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.read.TimeValuePair;
 import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.write.schema.Schema;
 
 import java.io.IOException;
@@ -46,6 +49,7 @@ public abstract class AbstractInnerCompactionWriter extends 
AbstractCompactionWr
   protected long endedFileSize = 0;
   protected List<Schema> schemas;
   protected EncryptParameter encryptParameter;
+  protected Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource;
 
   protected final long memoryBudgetForFileWriter =
       (long)
@@ -115,13 +119,18 @@ public abstract class AbstractInnerCompactionWriter 
extends AbstractCompactionWr
   private void useNewWriter() throws IOException {
     fileWriter =
         new CompactionTsFileWriter(
-            targetResources.get(currentFileIndex).getTsFile(),
+            targetResources.get(currentFileIndex),
             memoryBudgetForFileWriter,
             targetResources.get(currentFileIndex).isSeq()
                 ? CompactionType.INNER_SEQ_COMPACTION
                 : CompactionType.INNER_UNSEQ_COMPACTION,
             encryptParameter);
-    
fileWriter.setSchema(CompactionTableSchemaCollector.copySchema(schemas.get(0)));
+    Schema schema = CompactionTableSchemaCollector.copySchema(schemas.get(0));
+    long maxTsFileSetEndVersion = maxTsFileSetEndVersionAndMinResource.left;
+    TsFileResource minVersionResource = 
maxTsFileSetEndVersionAndMinResource.getRight();
+    
fileWriter.getTsFileResource().setTsFileManager(minVersionResource.getTsFileManager());
+    EvolvedSchema evolvedSchema = 
fileWriter.getTsFileResource().getMergedEvolvedSchema(maxTsFileSetEndVersion);
+    fileWriter.setSchema(evolvedSchema.rewriteToOriginal(schema, 
CompactionTableSchema::new));
   }
 
   @Override
@@ -174,8 +183,9 @@ public abstract class AbstractInnerCompactionWriter extends 
AbstractCompactionWr
   }
 
   @Override
-  public void setSchemaForAllTargetFile(List<Schema> schemas) {
+  public void setSchemaForAllTargetFile(List<Schema> schemas, Pair<Long, 
TsFileResource> maxTsFileSetEndVersionAndMinResource) {
     this.schemas = schemas;
+    this.maxTsFileSetEndVersionAndMinResource = 
maxTsFileSetEndVersionAndMinResource;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java
index 1f822e3b934..7d7bf5cba82 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.service.metrics.CompactionMetrics;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionIoDataType;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.db.utils.EncryptDBUtils;
 
 import org.apache.tsfile.encrypt.EncryptParameter;
@@ -54,19 +55,21 @@ public class CompactionTsFileWriter extends TsFileIOWriter {
   private volatile boolean isWritingAligned = false;
   private boolean isEmptyTargetFile = true;
   private IDeviceID currentDeviceId;
+  private TsFileResource tsFileResource;
 
   private EncryptParameter firstEncryptParameter;
 
   @TestOnly
   public CompactionTsFileWriter(File file, long maxMetadataSize, 
CompactionType type)
       throws IOException {
-    this(file, maxMetadataSize, type, 
EncryptDBUtils.getDefaultFirstEncryptParam());
+    this(new TsFileResource(file), maxMetadataSize, type, 
EncryptDBUtils.getDefaultFirstEncryptParam());
   }
 
   public CompactionTsFileWriter(
-      File file, long maxMetadataSize, CompactionType type, EncryptParameter 
encryptParameter)
+      TsFileResource tsFile, long maxMetadataSize, CompactionType type, 
EncryptParameter encryptParameter)
       throws IOException {
-    super(file, maxMetadataSize, encryptParameter);
+    super(tsFile.getTsFile(), maxMetadataSize, encryptParameter);
+    this.tsFileResource = tsFile;
     this.firstEncryptParameter = encryptParameter;
     this.type = type;
     super.out =
@@ -192,4 +195,8 @@ public class CompactionTsFileWriter extends TsFileIOWriter {
       iterator.remove();
     }
   }
+
+  public TsFileResource getTsFileResource() {
+    return tsFileResource;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
index 4466668ad5f..11dcde29b37 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.tsfile;
 
+import java.util.stream.Collectors;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModFileManagement;
@@ -56,6 +57,7 @@ public class TsFileManager {
   private final TreeMap<Long, TsFileResourceList> sequenceFiles = new 
TreeMap<>();
   private final TreeMap<Long, TsFileResourceList> unsequenceFiles = new 
TreeMap<>();
   private final TreeMap<Long, ModFileManagement> modFileManagementMap = new 
TreeMap<>();
+  private final TreeMap<Long, List<TsFileSet>> tsfileSets = new TreeMap<>();
 
   private volatile boolean allowCompaction = true;
   private final AtomicLong currentCompactionTaskSerialId = new AtomicLong(0);
@@ -237,6 +239,7 @@ public class TsFileManager {
             modFileManagementMap.computeIfAbsent(
                 timePartition, t -> new PartitionLevelModFileManager()));
       }
+      tsFileResource.setTsFileManager(this);
     } finally {
       writeUnlock();
     }
@@ -255,6 +258,7 @@ public class TsFileManager {
             modFileManagementMap.computeIfAbsent(
                 tsFileResource.getTimePartition(), t -> new 
PartitionLevelModFileManager()));
       }
+      tsFileResource.setTsFileManager(this);
     } finally {
       writeUnlock();
     }
@@ -273,6 +277,7 @@ public class TsFileManager {
             modFileManagementMap.computeIfAbsent(
                 tsFileResource.getTimePartition(), t -> new 
PartitionLevelModFileManager()));
       }
+      tsFileResource.setTsFileManager(this);
     } finally {
       writeUnlock();
     }
@@ -333,6 +338,7 @@ public class TsFileManager {
                 modFileManagementMap.computeIfAbsent(
                     resource.getTimePartition(), t -> new 
PartitionLevelModFileManager()));
           }
+          resource.setTsFileManager(this);
         }
       }
     } finally {
@@ -512,21 +518,22 @@ public class TsFileManager {
   public void addTsFileSet(TsFileSet newSet, long partitionId) {
     writeLock("addTsFileSet");
     try {
-      TsFileResourceList tsFileResources = sequenceFiles.get(partitionId);
-      if (tsFileResources != null) {
-        for (TsFileResource tsFileResource : tsFileResources) {
-          tsFileResource.addFileSet(newSet);
-        }
-      }
-
-      tsFileResources = unsequenceFiles.get(partitionId);
-      if (tsFileResources != null) {
-        for (TsFileResource tsFileResource : tsFileResources) {
-          tsFileResource.addFileSet(newSet);
-        }
-      }
+      List<TsFileSet> tsFileSetList = tsfileSets.computeIfAbsent(partitionId,
+          p -> new ArrayList<>());
+      tsFileSetList.add(newSet);
     } finally {
       writeUnlock();
     }
   }
+
+  public List<TsFileSet> getTsFileSet(long partitionId, long 
minFileVersionIncluded, long maxFileVersionExcluded) {
+    readLock();
+    try {
+      List<TsFileSet> tsFileSetList = tsfileSets.get(partitionId);
+      return tsFileSetList.stream().filter(s -> s.getEndVersion() < 
maxFileVersionExcluded && s.getEndVersion() >= minFileVersionIncluded).collect(
+          Collectors.toList());
+    } finally {
+      readUnlock();
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index 060c16fea7d..a387db509c6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -211,8 +211,7 @@ public class TsFileResource implements PersistentResource, 
Cloneable {
 
   private Map<IDeviceID, List<Pair<String, TimeValuePair>>> lastValues;
 
-  // TsFileSets this TsFile belongs to
-  private final List<TsFileSet> tsFileSets = new ArrayList<>();
+  private TsFileManager tsFileManager = null;
 
   @TestOnly
   public TsFileResource() {
@@ -1632,17 +1631,25 @@ public class TsFileResource implements 
PersistentResource, Cloneable {
     return (TsFileResource) clone();
   }
 
-  public void addFileSet(TsFileSet tsFileSet) {
-    tsFileSets.add(tsFileSet);
+  public List<TsFileSet> getTsFileSets() {
+    return tsFileManager.getTsFileSet(tsFileID.timePartitionId, 
tsFileID.fileVersion, Long.MAX_VALUE);
   }
 
-  public List<TsFileSet> getTsFileSets() {
-    return tsFileSets;
+  public List<TsFileSet> getTsFileSets(long maxEndVersionExcluded) {
+    return tsFileManager.getTsFileSet(tsFileID.timePartitionId, 
tsFileID.fileVersion, maxEndVersionExcluded);
   }
 
   public EvolvedSchema getMergedEvolvedSchema() {
+    return getMergedEvolvedSchema(Long.MAX_VALUE);
+  }
+
+  public EvolvedSchema getMergedEvolvedSchema(long excludedMaxFileVersion) {
     List<EvolvedSchema> list = new ArrayList<>();
     for (TsFileSet fileSet : getTsFileSets()) {
+      if (fileSet.getEndVersion() >= excludedMaxFileVersion) {
+        continue;
+      }
+
       try {
         EvolvedSchema readEvolvedSchema = fileSet.readEvolvedSchema();
         list.add(readEvolvedSchema);
@@ -1653,4 +1660,34 @@ public class TsFileResource implements 
PersistentResource, Cloneable {
 
     return EvolvedSchema.merge(list.toArray(new EvolvedSchema[0]));
   }
+
+  public static Pair<Long, TsFileResource> 
getMaxTsFileSetEndVersionAndMinResource(List<TsFileResource> tsFileResources) {
+    long maxTsFileSetEndVersion = Long.MIN_VALUE;
+    long minResourceVersion = Long.MAX_VALUE;
+    TsFileResource minTsFileResource = null;
+    for (TsFileResource tsFileResource : tsFileResources) {
+      List<TsFileSet> tsFileSets = tsFileResource.getTsFileSets();
+      if (tsFileSets.isEmpty()) {
+        continue;
+      }
+      TsFileSet lastTsFileSet = tsFileSets.get(tsFileSets.size() - 1);
+      if (lastTsFileSet.getEndVersion() > maxTsFileSetEndVersion) {
+        maxTsFileSetEndVersion = lastTsFileSet.getEndVersion();
+      }
+      if (tsFileResource.getTsFileID().fileVersion < minResourceVersion) {
+        minTsFileResource = tsFileResource;
+        minResourceVersion = tsFileResource.getTsFileID().fileVersion;
+      }
+    }
+    return new Pair<>(maxTsFileSetEndVersion, minTsFileResource);
+  }
+
+  public void setTsFileManager(
+      TsFileManager tsFileManager) {
+    this.tsFileManager = tsFileManager;
+  }
+
+  public TsFileManager getTsFileManager() {
+    return tsFileManager;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java
index 7d04bb1aa3e..e447a1fc630 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution;
 
+import java.util.function.Function;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionTableSchema;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate;
 import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry.ModType;
@@ -42,6 +44,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.stream.Collectors;
+import org.apache.tsfile.write.schema.Schema;
 
 public class EvolvedSchema {
   // the evolved table names after applying all schema evolution operations
@@ -243,6 +246,27 @@ public class EvolvedSchema {
     return finalTableSchemas;
   }
 
+  private TableSchema rewriteToOriginal(TableSchema tableSchema) {
+    String originalTableName = 
getOriginalTableName(tableSchema.getTableName());
+
+    List<IMeasurementSchema> measurementSchemas =
+        new ArrayList<>(tableSchema.getColumnSchemas().size());
+    List<ColumnCategory> columnCategories = new 
ArrayList<>(tableSchema.getColumnTypes().size());
+    List<IMeasurementSchema> columnSchemas = tableSchema.getColumnSchemas();
+    for (int i = 0, columnSchemasSize = columnSchemas.size(); i < 
columnSchemasSize; i++) {
+      IMeasurementSchema measurementSchema = columnSchemas.get(i);
+      measurementSchemas.add(
+          new MeasurementSchema(
+              getOriginalColumnName(
+                  tableSchema.getTableName(), 
measurementSchema.getMeasurementName()),
+              measurementSchema.getType(),
+              measurementSchema.getEncodingType(), 
measurementSchema.getCompressor()));
+      columnCategories.add(tableSchema.getColumnTypes().get(i));
+    }
+
+    return new TableSchema(originalTableName, measurementSchemas, 
columnCategories);
+  }
+
   public TableSchema rewriteToFinal(TableSchema tableSchema) {
     String finalTableName = getFinalTableName(tableSchema.getTableName());
 
@@ -328,4 +352,21 @@ public class EvolvedSchema {
     }
     return mergedSchema;
   }
+
+  public Schema rewriteToOriginal(Schema schema) {
+    return rewriteToOriginal(schema, null);
+  }
+  public Schema rewriteToOriginal(Schema schema, Function<TableSchema, 
TableSchema> tableSchemaTransformer) {
+    Schema copySchema = new Schema();
+    for (TableSchema tableSchema : schema.getTableSchemaMap().values()) {
+      TableSchema originalSchema = rewriteToOriginal(tableSchema);
+      if (tableSchemaTransformer != null) {
+        originalSchema = tableSchemaTransformer.apply(originalSchema);
+      }
+      copySchema.registerTableSchema(originalSchema);
+    }
+    return copySchema;
+  }
+
+
 }


Reply via email to