This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch force_ci/support_schema_evolution
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f71b7156cae45d762d6849239c9199c032749bd0
Author: Tian Jiang <[email protected]>
AuthorDate: Wed Dec 10 16:27:57 2025 +0800

    Add DataRegionTask
---
 .../db/storageengine/dataregion/DataRegion.java    |  76 ++++++++++-----
 .../dataregion/task/DataRegionTask.java            |  61 ++++++++++++
 .../dataregion/task/DataRegionTaskManager.java     | 108 +++++++++++++++++++++
 .../dataregion/task/SchemaEvolutionTask.java       |  86 ++++++++++++++++
 .../dataregion/tsfile/TsFileResource.java          |  10 +-
 .../dataregion/tsfile/evolution/ColumnRename.java  |  17 ++++
 .../dataregion/tsfile/fileset/TsFileSet.java       |   8 ++
 .../storageengine/dataregion/DataRegionTest.java   |   2 +-
 .../org/apache/iotdb/commons/utils/FileUtils.java  |  81 ++++++++++++++++
 9 files changed, 420 insertions(+), 29 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index f288e81f82c..b2a340d14bd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -122,6 +122,8 @@ import 
org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceForRegio
 import 
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
 import 
org.apache.iotdb.db.storageengine.dataregion.read.filescan.IFileScanHandle;
 import 
org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl.ClosedFileScanHandleImpl;
+import org.apache.iotdb.db.storageengine.dataregion.task.DataRegionTaskManager;
+import org.apache.iotdb.db.storageengine.dataregion.task.SchemaEvolutionTask;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -365,6 +367,8 @@ public class DataRegion implements IDataRegionForQuery {
 
   private Map<Long, TsFileSet> lastTsFileSetMap = new ConcurrentHashMap<>();
 
+  private DataRegionTaskManager dataRegionTaskManager;
+
   /**
    * Construct a database processor.
    *
@@ -727,6 +731,9 @@ public class DataRegion implements IDataRegionForQuery {
       throw new DataRegionException(e);
     }
 
+    dataRegionTaskManager = new DataRegionTaskManager(this);
+    dataRegionTaskManager.recover();
+
     if (asyncTsFileResourceRecoverTaskList.isEmpty()) {
       initCompactionSchedule();
     }
@@ -756,9 +763,13 @@ public class DataRegion implements IDataRegionForQuery {
   protected void updateDeviceLastFlushTime(TsFileResource resource) {
     long timePartitionId = resource.getTimePartition();
     Map<IDeviceID, Long> endTimeMap = new HashMap<>();
+    EvolvedSchema mergedEvolvedSchema = resource.getMergedEvolvedSchema();
     for (IDeviceID deviceId : resource.getDevices()) {
       @SuppressWarnings("OptionalGetWithoutIsPresent") // checked above
       long endTime = resource.getEndTime(deviceId).get();
+      if (mergedEvolvedSchema != null) {
+        deviceId = mergedEvolvedSchema.rewriteDeviceId(deviceId);
+      }
       endTimeMap.put(deviceId, endTime);
     }
     if (config.isEnableSeparateData()) {
@@ -773,10 +784,14 @@ public class DataRegion implements IDataRegionForQuery {
       long timePartitionId, List<TsFileResource> resources) {
     Map<IDeviceID, Long> endTimeMap = new HashMap<>();
     for (TsFileResource resource : resources) {
+      EvolvedSchema mergedEvolvedSchema = resource.getMergedEvolvedSchema();
       for (IDeviceID deviceId : resource.getDevices()) {
         // checked above
         //noinspection OptionalGetWithoutIsPresent
         long endTime = resource.getEndTime(deviceId).get();
+        if (mergedEvolvedSchema != null) {
+          deviceId = mergedEvolvedSchema.rewriteDeviceId(deviceId);
+        }
         endTimeMap.put(deviceId, endTime);
       }
     }
@@ -1001,6 +1016,10 @@ public class DataRegion implements IDataRegionForQuery {
         + TsFileSet.FILE_SET_DIR_NAME;
   }
 
+  public File getDataRegionSysDir() {
+    return dataRegionSysDir;
+  }
+
   private List<TsFileSet> recoverTsFileSets(
       long partitionId,
       Map<Long, List<TsFileSet>> tsFileSetMap
@@ -1189,7 +1208,7 @@ public class DataRegion implements IDataRegionForQuery {
     return newSet;
   }
 
-  public void applySchemaEvolution(List<SchemaEvolution> schemaEvolutions) {
+  public void applySchemaEvolution(List<SchemaEvolution> schemaEvolutions) 
throws IOException {
     long startTime = System.nanoTime();
     writeLock("applySchemaEvolution");
     PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - 
startTime);
@@ -1198,32 +1217,44 @@ public class DataRegion implements IDataRegionForQuery {
         return;
       }
       DataNodeTableCache.getInstance().invalid(databaseName);
-      schemaEvolutions.forEach(lastFlushTimeMap::accept);
 
       syncCloseAllWorkingTsFileProcessors();
 
-      for (Entry<Long, Long> partitionVersionEntry : 
partitionMaxFileVersions.entrySet()) {
-        long partitionId = partitionVersionEntry.getKey();
-        long maxVersion = partitionVersionEntry.getValue();
-        lastTsFileSetMap.compute(partitionId, (pid, lastSet) -> {
-          if (lastSet == null) {
-            lastSet = createNewFileSet(maxVersion, partitionId);
-          } else if (lastSet.getEndVersion() < maxVersion) {
-            lastSet = createNewFileSet(maxVersion, partitionId);
-          }
-          try {
-            lastSet.appendSchemaEvolution(schemaEvolutions);
-          } catch (IOException e) {
-            logger.error("Cannot append schema evolutions to fileSets in 
partition {}-{}", dataRegionId, partitionId, e);
-          }
-          return lastSet;
-        });
-      }
+      // may update table names in deviceIds
+      schemaEvolutions.forEach(lastFlushTimeMap::accept);
+
+      SchemaEvolutionTask evolutionTask = new 
SchemaEvolutionTask(schemaEvolutions, this);
+      dataRegionTaskManager.submitAndRun(evolutionTask);
     } finally {
       writeUnlock();
     }
   }
 
+  public void recordSchemaEvolution(List<SchemaEvolution> schemaEvolutions) {
+    for (Entry<Long, Long> partitionVersionEntry : 
partitionMaxFileVersions.entrySet()) {
+      long partitionId = partitionVersionEntry.getKey();
+      long maxVersion = partitionVersionEntry.getValue();
+      lastTsFileSetMap.compute(partitionId, (pid, lastSet) -> {
+        if (lastSet == null) {
+          lastSet = createNewFileSet(maxVersion, partitionId);
+        } else if (lastSet.getEndVersion() < maxVersion) {
+          lastSet = createNewFileSet(maxVersion, partitionId);
+        }
+        try {
+          lastSet.appendSchemaEvolution(schemaEvolutions);
+        } catch (IOException e) {
+          logger.error("Cannot append schema evolutions to fileSets in 
partition {}-{}", dataRegionId, partitionId, e);
+        }
+        return lastSet;
+      });
+    }
+  }
+
+  public void applySchemaEvolutionToObjects(List<SchemaEvolution> 
schemaEvolutions) {
+    // TODO-SchemaEvolution
+    throw new UnsupportedOperationException();
+  }
+
   /**
    * insert one row of data.
    *
@@ -2624,12 +2655,7 @@ public class DataRegion implements IDataRegionForQuery {
     List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
 
     for (TsFileResource tsFileResource : tsFileResources) {
-      EvolvedSchema evolvedSchema;
-      try {
-        evolvedSchema = tsFileResource.getMergedEvolvedSchema();
-      } catch (IOException e) {
-        throw new MetadataException(e);
-      }
+      EvolvedSchema evolvedSchema = tsFileResource.getMergedEvolvedSchema();
       IDeviceID deviceIdBackThen = singleDeviceId;
       if (evolvedSchema != null) {
         deviceIdBackThen = evolvedSchema.rewriteDeviceId(singleDeviceId);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/DataRegionTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/DataRegionTask.java
new file mode 100644
index 00000000000..f171003aa69
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/DataRegionTask.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.task;
+
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.utils.io.StreamSerializable;
+import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
+
+public interface DataRegionTask extends Runnable, StreamSerializable {
+
+  long getTaskId();
+
+  void setTaskId(long taskId);
+
+  TaskType getTaskType();
+
+  enum TaskType {
+    SchemaEvolutionTask
+  }
+
+  @SuppressWarnings("SwitchStatementWithTooFewBranches")
+  static DataRegionTask createFrom(InputStream stream, long taskId, DataRegion 
dataRegion) throws IOException {
+    int typeOrdinal = ReadWriteForEncodingUtils.readVarInt(stream);
+    if (typeOrdinal < 0 || typeOrdinal >= TaskType.values().length) {
+      throw new IOException("Invalid task type: " + typeOrdinal);
+    }
+
+    TaskType taskType = TaskType.values()[typeOrdinal];
+
+    DataRegionTask task = null;
+    switch (taskType) {
+      case SchemaEvolutionTask:
+       task = new SchemaEvolutionTask(dataRegion);
+       break;
+       default:
+         throw new IOException("Invalid task type: " + taskType);
+    }
+    task.deserialize(stream);
+    task.setTaskId(taskId);
+    return task;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/DataRegionTaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/DataRegionTaskManager.java
new file mode 100644
index 00000000000..5441b9b19c7
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/DataRegionTaskManager.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.task;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("ResultOfMethodCallIgnored")
+public class DataRegionTaskManager {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DataRegionTaskManager.class);
+  private static final String TASKS_DIR_NAME = "tasks";
+  private static final String TASK_FILE_SUFFIX = ".tsk";
+
+  private final DataRegion dataRegion;
+  private final AtomicLong lastestTaskId = new AtomicLong(0);
+  private final File tasksDir;
+
+  public DataRegionTaskManager(DataRegion dataRegion) {
+    this.dataRegion = dataRegion;
+    this.tasksDir = new File(dataRegion.getDataRegionSysDir() + File.separator 
+ TASKS_DIR_NAME);
+  }
+
+  public void recover() {
+    tasksDir.mkdirs();
+    File[] files = tasksDir.listFiles((File dir, String name) -> 
name.endsWith(TASK_FILE_SUFFIX));
+    if (files == null) {
+      return;
+    }
+
+    Arrays.sort(files, (f1, f2) -> {
+      String fileName1 = f1.getName();
+      int suffixIndex1 = fileName1.indexOf(".");
+      long taskId1 = Long.parseLong(fileName1.substring(0, suffixIndex1));
+
+      String fileName2 = f2.getName();
+      int suffixIndex2 = fileName2.indexOf(".");
+      long taskId2 = Long.parseLong(fileName1.substring(0, suffixIndex2));
+
+      return Long.compare(taskId1, taskId2);
+    });
+
+    for (File file : files) {
+      String fileName = file.getName();
+      int suffixIndex = fileName.indexOf(".");
+      long taskId = Long.parseLong(fileName.substring(0, suffixIndex));
+      lastestTaskId.getAndUpdate(l -> Math.max(l, taskId));
+
+      try (FileInputStream fis = new FileInputStream(file);
+          BufferedInputStream bufferedInputStream = new 
BufferedInputStream(fis)) {
+        DataRegionTask task = DataRegionTask.createFrom(bufferedInputStream, 
taskId, dataRegion);
+        task.run();
+      } catch (IOException e) {
+        if (LOGGER.isWarnEnabled()) {
+          LOGGER.warn("Cannot recover task from file {}", 
file.getAbsolutePath(), e);
+        }
+      } finally {
+        file.delete();
+      }
+    }
+  }
+
+  private void persistTask(DataRegionTask task) throws IOException {
+    File taskFile = new File(tasksDir, task.getTaskId() + ".tsk");
+    try (FileOutputStream fos = new FileOutputStream(taskFile);
+        BufferedOutputStream bufferedOutputStream = new 
BufferedOutputStream(fos)) {
+      task.serialize(bufferedOutputStream);
+    }
+  }
+
+  private void removeTask(DataRegionTask task) throws IOException {
+    File taskFile = new File(tasksDir, task.getTaskId() + ".tsk");
+    taskFile.delete();
+  }
+
+  public void submitAndRun(DataRegionTask dataRegionTask) throws IOException {
+    dataRegionTask.setTaskId(lastestTaskId.getAndIncrement());
+    persistTask(dataRegionTask);
+    dataRegionTask.run();
+    removeTask(dataRegionTask);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/SchemaEvolutionTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/SchemaEvolutionTask.java
new file mode 100644
index 00000000000..23a84fbbd72
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/SchemaEvolutionTask.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.task;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution;
+import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+public class SchemaEvolutionTask implements DataRegionTask {
+
+  private List<SchemaEvolution> schemaEvolutions;
+  private final DataRegion dataRegion;
+  private long taskId;
+
+  @Override
+  public void run() {
+    dataRegion.recordSchemaEvolution(schemaEvolutions);
+    dataRegion.applySchemaEvolutionToObjects(schemaEvolutions);
+  }
+
+  public SchemaEvolutionTask(DataRegion dataRegion) {
+    this.dataRegion = dataRegion;
+  }
+
+  public SchemaEvolutionTask(List<SchemaEvolution> schemaEvolutions, 
DataRegion dataRegion) {
+    this.schemaEvolutions = schemaEvolutions;
+    this.dataRegion = dataRegion;
+  }
+
+  @Override
+  public long serialize(OutputStream stream) throws IOException {
+    long size = ReadWriteForEncodingUtils.writeVarInt(getTaskType().ordinal(), 
stream);
+    size += ReadWriteForEncodingUtils.writeVarInt(schemaEvolutions.size(), 
stream);
+    for (SchemaEvolution schemaEvolution : schemaEvolutions) {
+      size += schemaEvolution.serialize(stream);
+    }
+    return size;
+  }
+
+  @Override
+  public void deserialize(InputStream stream) throws IOException {
+    int size = ReadWriteForEncodingUtils.readVarInt(stream);
+    schemaEvolutions = new ArrayList<>(size);
+    for (int i = 0; i < size; i++) {
+      schemaEvolutions.add(SchemaEvolution.createFrom(stream));
+    }
+  }
+
+  @Override
+  public long getTaskId() {
+    return taskId;
+  }
+
+  @Override
+  public void setTaskId(long taskId) {
+    this.taskId = taskId;
+  }
+
+  @Override
+  public TaskType getTaskType() {
+    return TaskType.SchemaEvolutionTask;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index 80ab28eec9b..060c16fea7d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -1640,11 +1640,15 @@ public class TsFileResource implements 
PersistentResource, Cloneable {
     return tsFileSets;
   }
 
-  public EvolvedSchema getMergedEvolvedSchema() throws IOException {
+  public EvolvedSchema getMergedEvolvedSchema() {
     List<EvolvedSchema> list = new ArrayList<>();
     for (TsFileSet fileSet : getTsFileSets()) {
-      EvolvedSchema readEvolvedSchema = fileSet.readEvolvedSchema();
-      list.add(readEvolvedSchema);
+      try {
+        EvolvedSchema readEvolvedSchema = fileSet.readEvolvedSchema();
+        list.add(readEvolvedSchema);
+      } catch (IOException e) {
+        LOGGER.warn("Cannot read evolved schema from {}, skipping it", 
fileSet);
+      }
     }
 
     return EvolvedSchema.merge(list.toArray(new EvolvedSchema[0]));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/ColumnRename.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/ColumnRename.java
index fc363247b13..23c18cea9f4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/ColumnRename.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/ColumnRename.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution;
 
+import 
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet.ColumnCategory;
+import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 
@@ -32,6 +34,7 @@ public class ColumnRename implements SchemaEvolution {
   private String tableName;
   private String nameBefore;
   private String nameAfter;
+  private TSDataType dataType;
 
   // for deserialization
   public ColumnRename() {}
@@ -58,6 +61,7 @@ public class ColumnRename implements SchemaEvolution {
     size += ReadWriteIOUtils.writeVar(tableName, stream);
     size += ReadWriteIOUtils.writeVar(nameBefore, stream);
     size += ReadWriteIOUtils.writeVar(nameAfter, stream);
+    size += ReadWriteIOUtils.write(dataType != null ? (byte) 
dataType.ordinal() : -1, stream);
     return size;
   }
 
@@ -66,5 +70,18 @@ public class ColumnRename implements SchemaEvolution {
     tableName = ReadWriteIOUtils.readVarIntString(stream);
     nameBefore = ReadWriteIOUtils.readVarIntString(stream);
     nameAfter = ReadWriteIOUtils.readVarIntString(stream);
+    byte category = ReadWriteIOUtils.readByte(stream);
+    if (category != -1)  {
+      dataType = TSDataType.values()[category];
+    }
+  }
+
+  public TSDataType getDataType() {
+    return dataType;
+  }
+
+  public void setDataType(
+      TSDataType dataType) {
+    this.dataType = dataType;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java
index d3271d98b45..5c43418fa64 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java
@@ -120,4 +120,12 @@ public class TsFileSet implements Comparable<TsFileSet> {
   public long getEndVersion() {
     return endVersion;
   }
+
+  @Override
+  public String toString() {
+    return "TsFileSet{" +
+        "endVersion=" + endVersion +
+        ", fileSetDir=" + fileSetDir +
+        '}';
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
index b525b4860e7..e896a599c43 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
@@ -1786,7 +1786,7 @@ public class DataRegionTest {
 
   @Test
   public void testSchemaEvolution()
-      throws IllegalPathException, WriteProcessException, 
QueryProcessException {
+      throws IllegalPathException, WriteProcessException, 
QueryProcessException, IOException {
     String[] measurements = {"tag1", "s1", "s2"};
     MeasurementSchema[] measurementSchemas = {
         new MeasurementSchema("tag1", TSDataType.STRING),
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
index baed8bcd537..a2510454e73 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
@@ -19,6 +19,12 @@
 
 package org.apache.iotdb.commons.utils;
 
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.iotdb.commons.file.SystemFileFactory;
 
 import org.apache.tsfile.external.commons.codec.digest.DigestUtils;
@@ -334,6 +340,81 @@ public class FileUtils {
     return true;
   }
 
+  public static <T> List<T> applyReversedIndexesOnListV2(
+      final List<Integer> filteredIndexes, final List<T> originalList) {
+    // filteredIndexes.sort(null); if necessary
+    List<T> filteredList = new ArrayList<>(originalList.size() - 
filteredIndexes.size());
+    int filteredIndexPos = 0;
+    int processingIndex = 0;
+    for (; processingIndex < originalList.size(); processingIndex++) {
+      if (filteredIndexPos >= filteredIndexes.size()) {
+        // all filteredIndexes processed, add remaining to the filteredList
+        filteredList.addAll(originalList.subList(processingIndex, 
originalList.size()));
+        break;
+      } else {
+        int filteredIndex = filteredIndexes.get(filteredIndexPos);
+        if (filteredIndex == processingIndex) {
+          // the index is filtered, move to the next filtered pos
+          filteredIndexPos ++;
+        } else {
+          // the index is not filtered, add to the filteredList
+          filteredList.add(originalList.get(processingIndex));
+        }
+      }
+    }
+    return filteredList;
+  }
+
+  public static <T> List<T> applyReversedIndexesOnListV1(
+      final List<Integer> filteredIndexes, final List<T> originalList) {
+    final Set<Integer> indexes = new HashSet<>(filteredIndexes);
+    return Objects.nonNull(originalList)
+        ? IntStream.range(0, originalList.size())
+        .filter(index -> !indexes.contains(index)) // 保留不在排除列表中的下标
+        .mapToObj(originalList::get)
+        .collect(Collectors.toList())
+        : null;
+  }
+
+  public static void main(String[] args) {
+    int elementNum = 10_000_000;
+    int filteredNum = elementNum / 10;
+    Random random = new Random();
+    List<Integer> originalList = IntStream.range(0, 
elementNum).boxed().collect(Collectors.toList());
+    List<Integer> filteredIndexes = new ArrayList<>(filteredNum);
+    for (int i = 0; i < filteredNum; i++) {
+      filteredIndexes.add(random.nextInt(elementNum));
+    }
+    filteredIndexes = 
filteredIndexes.stream().sorted().distinct().collect(Collectors.toList());
+
+    long start = System.currentTimeMillis();
+    List<Integer> appliedList = applyReversedIndexesOnListV1(filteredIndexes, 
originalList);
+    System.out.println(System.currentTimeMillis() - start);
+    Set<Integer> appliedSet = new HashSet<>(appliedList);
+    for (Integer filteredIndex : filteredIndexes) {
+      if (appliedSet.contains(filteredIndex)) {
+        System.out.println("Incorrect implementation");
+        System.exit(-1);
+      }
+    }
+
+
+    start = System.currentTimeMillis();
+    appliedList = WapplyReversedIndexesOnListV2(filteredIndexes, originalList);
+    System.out.println(System.currentTimeMillis() - start);
+    appliedSet = new HashSet<>(appliedList);
+    if (appliedList.size() != originalList.size() - filteredIndexes.size()) {
+      System.out.println("Incorrect implementation");
+      System.exit(-1);
+    }
+    for (Integer filteredIndex : filteredIndexes) {
+      if (appliedSet.contains(filteredIndex)) {
+        System.out.println("Incorrect implementation");
+        System.exit(-1);
+      }
+    }
+  }
+
   public static File createHardLink(File sourceFile, File hardlink) throws 
IOException {
     if (!hardlink.getParentFile().exists() && 
!hardlink.getParentFile().mkdirs()) {
       synchronized (FileUtils.class) {

Reply via email to