This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch support_schema_evolution
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/support_schema_evolution by 
this push:
     new eb1df4a5f9d EvolvedSchema may merge
eb1df4a5f9d is described below

commit eb1df4a5f9df0cc52608931f44e844299802b1f9
Author: Tian Jiang <[email protected]>
AuthorDate: Thu Nov 27 16:26:35 2025 +0800

    EvolvedSchema may merge
---
 .../db/storageengine/dataregion/DataRegion.java    | 138 +++++++++++++++------
 .../dataregion/tsfile/TsFileManager.java           |  22 ++++
 .../dataregion/tsfile/TsFileResource.java          |   8 ++
 .../dataregion/tsfile/evolution/EvolvedSchema.java |  69 +++++++++--
 .../dataregion/tsfile/fileset/TsFileSet.java       |  18 ++-
 .../tsfile/evolution/EvolvedSchemaTest.java        |  57 +++++++++
 6 files changed, 265 insertions(+), 47 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index cb6803a270a..b1520c56ec9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -126,6 +126,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.fileset.TsFileSet;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDeviceTimeIndex;
@@ -648,7 +649,7 @@ public class DataRegion implements IDataRegionForQuery {
           }
         }
         // ensure that seq and unseq files in the same partition have the same 
TsFileSet
-        Map<Long, List<TsFileSet>> recoveredTsFileSetMap = new HashMap<>();
+        Map<Long, List<TsFileSet>> recoveredPartitionTsFileSetMap = new 
HashMap<>();
 
         for (Entry<Long, List<TsFileResource>> partitionFiles : 
partitionTmpSeqTsFiles.entrySet()) {
           Callable<Void> asyncRecoverTask =
@@ -658,7 +659,7 @@ public class DataRegion implements IDataRegionForQuery {
                   partitionFiles.getValue(),
                   fileTimeIndexMap,
                   true,
-                  recoveredTsFileSetMap);
+                  recoveredPartitionTsFileSetMap);
           if (asyncRecoverTask != null) {
             asyncTsFileResourceRecoverTaskList.add(asyncRecoverTask);
           }
@@ -672,7 +673,7 @@ public class DataRegion implements IDataRegionForQuery {
                   partitionFiles.getValue(),
                   fileTimeIndexMap,
                   false,
-                  recoveredTsFileSetMap);
+                  recoveredPartitionTsFileSetMap);
           if (asyncRecoverTask != null) {
             asyncTsFileResourceRecoverTaskList.add(asyncRecoverTask);
           }
@@ -990,52 +991,74 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
+  private String getFileSetsDir(long partitionId) {
+    return dataRegionSysDir
+        + File.separator
+        + partitionId
+        + File.separator
+        + TsFileSet.FILE_SET_DIR_NAME;
+  }
+
+  private List<TsFileSet> recoverTsFileSets(
+      long partitionId,
+      Map<Long, List<TsFileSet>> tsFileSetMap
+  ) {
+    List<TsFileSet> tsFileSets =
+        tsFileSetMap.computeIfAbsent(
+            partitionId,
+            pid -> {
+              File fileSetDir =
+                  new File(getFileSetsDir(partitionId));
+              File[] fileSets = fileSetDir.listFiles();
+              if (fileSets == null || fileSets.length == 0) {
+                return Collections.emptyList();
+              } else {
+                List<TsFileSet> results = new ArrayList<>();
+                for (File fileSet : fileSets) {
+                  TsFileSet tsFileSet;
+                  try {
+                    tsFileSet =
+                        new TsFileSet(
+                            Long.parseLong(fileSet.getName()),
+                            fileSetDir.getAbsolutePath(),
+                            true);
+                  } catch (NumberFormatException e) {
+                    continue;
+                  }
+                  results.add(tsFileSet);
+                }
+                return results;
+              }
+            });
+    if (!tsFileSets.isEmpty()) {
+      tsFileSets.sort(null);
+      lastTsFileSetMap.put(partitionId, tsFileSets.get(tsFileSets.size() - 1));
+    }
+    return tsFileSets;
+  }
+
+
   private Callable<Void> recoverFilesInPartition(
       long partitionId,
       DataRegionRecoveryContext context,
       List<TsFileResource> resourceList,
       Map<TsFileID, FileTimeIndex> fileTimeIndexMap,
       boolean isSeq,
-      Map<Long, List<TsFileSet>> tsFileSetMap) {
+      Map<Long, List<TsFileSet>> partitionTsFileSetMap) {
 
     List<TsFileResource> resourceListForAsyncRecover = new ArrayList<>();
     List<TsFileResource> resourceListForSyncRecover = new ArrayList<>();
     Callable<Void> asyncRecoverTask = null;
     for (TsFileResource tsFileResource : resourceList) {
-      List<TsFileSet> tsFileSets =
-          tsFileSetMap.computeIfAbsent(
-              partitionId,
-              pid -> {
-                File fileSetDir =
-                    new File(
-                        dataRegionSysDir
-                            + File.separator
-                            + partitionId
-                            + File.separator
-                            + TsFileSet.FILE_SET_DIR_NAME);
-                File[] fileSets = fileSetDir.listFiles();
-                if (fileSets == null || fileSets.length == 0) {
-                  return Collections.emptyList();
-                } else {
-                  List<TsFileSet> results = new ArrayList<>();
-                  for (File fileSet : fileSets) {
-                    TsFileSet tsFileSet;
-                    try {
-                      tsFileSet =
-                          new TsFileSet(
-                              Long.parseLong(fileSet.getName()),
-                              fileSetDir.getAbsolutePath(),
-                              true);
-                    } catch (NumberFormatException e) {
-                      continue;
-                    }
-                    results.add(tsFileSet);
-                  }
-                  return results;
-                }
-              });
-      if (!tsFileSets.isEmpty()) {
-        tsFileSets.sort(null);
+      List<TsFileSet> tsFileSets = recoverTsFileSets(partitionId, 
partitionTsFileSetMap);
+      long fileVersion = tsFileResource.getTsFileID().fileVersion;
+      int i = Collections.binarySearch(tsFileSets, 
TsFileSet.comparatorKey(fileVersion));
+      if (i < 0) {
+        i = -i;
+      }
+      if (i < tsFileSets.size()) {
+        List<TsFileSet> containedSets = tsFileSets.subList(i, 
tsFileSets.size());
+        containedSets.forEach(tsFileResource::addFileSet);
       }
 
       tsFileManager.add(tsFileResource, isSeq);
@@ -1158,6 +1181,45 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
+  private TsFileSet createNewFileSet(long maxVersion, long partitionId) {
+    TsFileSet newSet = new TsFileSet(maxVersion, getFileSetsDir(partitionId), 
false);
+    tsFileManager.addTsFileSet(newSet, partitionId);
+    return newSet;
+  }
+
+  public void applySchemaEvolution(List<SchemaEvolution> schemaEvolutions) {
+    long startTime = System.nanoTime();
+    writeLock("InsertRow");
+    PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - 
startTime);
+    try {
+      if (deleted) {
+        return;
+      }
+
+      syncCloseAllWorkingTsFileProcessors();
+
+      for (Entry<Long, Long> partitionVersionEntry : 
partitionMaxFileVersions.entrySet()) {
+        long partitionId = partitionVersionEntry.getKey();
+        long maxVersion = partitionVersionEntry.getValue();
+        lastTsFileSetMap.compute(partitionId, (pid, lastSet) -> {
+          if (lastSet == null) {
+            lastSet = createNewFileSet(maxVersion, partitionId);
+          } else if (lastSet.getEndVersion() < maxVersion) {
+            lastSet = createNewFileSet(maxVersion, partitionId);
+          }
+          try {
+            lastSet.appendSchemaEvolution(schemaEvolutions);
+          } catch (IOException e) {
+            logger.error("Cannot append schema evolutions to fileSets in 
partition {}-{}", dataRegionId, partitionId, e);
+          }
+          return lastSet;
+        });
+      }
+    } finally {
+      writeUnlock();
+    }
+  }
+
   /**
    * insert one row of data.
    *
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
index b7c1ba2c14f..4466668ad5f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.utils.TimePartitionUtils;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModFileManagement;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.PartitionLevelModFileManager;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.fileset.TsFileSet;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndexCacheRecorder;
 import org.apache.iotdb.db.storageengine.rescon.memory.TsFileResourceManager;
 
@@ -507,4 +508,25 @@ public class TsFileManager {
     }
     return maxFileTimestamp;
   }
+
+  public void addTsFileSet(TsFileSet newSet, long partitionId) {
+    writeLock("addTsFileSet");
+    try {
+      TsFileResourceList tsFileResources = sequenceFiles.get(partitionId);
+      if (tsFileResources != null) {
+        for (TsFileResource tsFileResource : tsFileResources) {
+          tsFileResource.addFileSet(newSet);
+        }
+      }
+
+      tsFileResources = unsequenceFiles.get(partitionId);
+      if (tsFileResources != null) {
+        for (TsFileResource tsFileResource : tsFileResources) {
+          tsFileResource.addFileSet(newSet);
+        }
+      }
+    } finally {
+      writeUnlock();
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index 13c00febe3c..53b07166b94 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -1630,4 +1630,12 @@ public class TsFileResource implements 
PersistentResource, Cloneable {
   public TsFileResource shallowCloneForNative() throws 
CloneNotSupportedException {
     return (TsFileResource) clone();
   }
+
+  public void addFileSet(TsFileSet tsFileSet) {
+    tsFileSets.add(tsFileSet);
+  }
+
+  public List<TsFileSet> getTsFileSets() {
+    return tsFileSets;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java
index 452f79b720b..f56a30bd1e3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java
@@ -22,25 +22,25 @@ package 
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
 
 public class EvolvedSchema {
   // the evolved table names after applying all schema evolution operations
-  private final Map<String, String> originalTableNames = new HashMap<>();
+  private Map<String, String> originalTableNames = new LinkedHashMap<>();
 
   /**
    * the first key is the evolved table name, the second key is the evolved 
column name, and the
    * value is the original column name before any schema evolution.
    */
-  private final Map<String, Map<String, String>> originalColumnNames = new 
HashMap<>();
+  private Map<String, Map<String, String>> originalColumnNames = new 
LinkedHashMap<>();
 
   public void renameTable(String oldTableName, String newTableName) {
     if (!originalTableNames.containsKey(oldTableName)) {
       originalTableNames.put(newTableName, oldTableName);
-      // mark the old table name as non-exists
-      originalTableNames.put(oldTableName, "");
     } else {
       // mark the old table name as non-exists
-      String originalName = originalTableNames.put(oldTableName, "");
+      String originalName = originalTableNames.remove(oldTableName);
       originalTableNames.put(newTableName, originalName);
     }
 
@@ -55,10 +55,8 @@ public class EvolvedSchema {
         originalColumnNames.computeIfAbsent(tableName, t -> new 
LinkedHashMap<>());
     if (!columnNameMap.containsKey(oldColumnName)) {
       columnNameMap.put(newColumnName, oldColumnName);
-      // mark the old column name as non-exists
-      columnNameMap.put(oldColumnName, "");
     } else {
-      String originalName = columnNameMap.put(oldColumnName, "");
+      String originalName = columnNameMap.remove(oldColumnName);
       columnNameMap.put(newColumnName, originalName);
     }
   }
@@ -74,4 +72,59 @@ public class EvolvedSchema {
     }
     return columnNameMap.getOrDefault(evolvedColumnName, evolvedColumnName);
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    EvolvedSchema that = (EvolvedSchema) o;
+    return Objects.equals(originalTableNames, that.originalTableNames)
+        && Objects.equals(originalColumnNames, that.originalColumnNames);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(originalTableNames, originalColumnNames);
+  }
+
+  @Override
+  public String toString() {
+    return "EvolvedSchema{" +
+        "originalTableNames=" + originalTableNames +
+        ", originalColumnNames=" + originalColumnNames +
+        '}';
+  }
+
+  public static EvolvedSchema deepCopy(EvolvedSchema evolvedSchema) {
+    EvolvedSchema newEvolvedSchema = new EvolvedSchema();
+    newEvolvedSchema.originalTableNames = new 
HashMap<>(evolvedSchema.originalTableNames);
+    newEvolvedSchema.originalColumnNames = new 
HashMap<>(evolvedSchema.originalColumnNames);
+    return newEvolvedSchema;
+  }
+
+  public static EvolvedSchema merge(EvolvedSchema oldSchema, EvolvedSchema 
newSchema) {
+    if (oldSchema == null) {
+      return newSchema;
+    }
+    if (newSchema == null) {
+      return oldSchema;
+    }
+
+    EvolvedSchema mergedSchema = deepCopy(oldSchema);
+    for (Entry<String, String> finalOriginalTableName : 
newSchema.originalTableNames.entrySet()) {
+      mergedSchema.renameTable(finalOriginalTableName.getValue(), 
finalOriginalTableName.getKey());
+    }
+    for (Entry<String, Map<String, String>> finalTableNameColumnNameMapEntry : 
newSchema.originalColumnNames.entrySet()) {
+      for (Entry<String, String> finalColNameOriginalColNameEntry : 
finalTableNameColumnNameMapEntry.getValue()
+          .entrySet()) {
+        String finalTableName = finalTableNameColumnNameMapEntry.getKey();
+        String finalColName = finalColNameOriginalColNameEntry.getKey();
+        String originalColName = finalColNameOriginalColNameEntry.getValue();
+        mergedSchema.renameColumn(finalTableName, originalColName, 
finalColName);
+      }
+    }
+
+    return mergedSchema;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java
index 9d75031d30f..d3271d98b45 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java
@@ -35,12 +35,24 @@ public class TsFileSet implements Comparable<TsFileSet> {
 
   private final long endVersion;
   private final File fileSetDir;
-  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+  private final ReentrantReadWriteLock lock;
   private SchemaEvolutionFile schemaEvolutionFile;
 
+  // only As comparator key
+  private TsFileSet(long endVersion) {
+    this.endVersion = endVersion;
+    this.fileSetDir = null;
+    this.lock = null;
+  }
+
+  public static TsFileSet comparatorKey(long endVersion) {
+    return new TsFileSet(endVersion);
+  }
+
   public TsFileSet(long endVersion, String fileSetsDir, boolean recover) {
     this.endVersion = endVersion;
     this.fileSetDir = new File(fileSetsDir + File.separator + endVersion);
+    this.lock = new ReentrantReadWriteLock();
 
     if (recover) {
       recover();
@@ -104,4 +116,8 @@ public class TsFileSet implements Comparable<TsFileSet> {
   public void readUnlock() {
     lock.readLock().unlock();
   }
+
+  public long getEndVersion() {
+    return endVersion;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchemaTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchemaTest.java
new file mode 100644
index 00000000000..d9c76be4710
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchemaTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+import org.junit.Test;
+
+public class EvolvedSchemaTest {
+
+  @Test
+  public void testMerge() {
+    // t1 -> t2, t2.s1 -> t2.s2, t3 -> t1
+    List<SchemaEvolution> schemaEvolutionList =
+        Arrays.asList(
+            new TableRename("t1", "t2"),
+            new ColumnRename("t2", "s1", "s2"),
+            new TableRename("t3", "t1"));
+    EvolvedSchema oldSchema = new EvolvedSchema();
+    EvolvedSchema allSchema = new EvolvedSchema();
+    schemaEvolutionList.forEach(schemaEvolution -> 
schemaEvolution.applyTo(oldSchema));
+    schemaEvolutionList.forEach(schemaEvolution -> 
schemaEvolution.applyTo(allSchema));
+
+    // t1 -> t2 -> t3, t2.s1 -> t2.s2 -> t3.s1, t3 -> t1 -> t2
+    schemaEvolutionList =
+        Arrays.asList(
+            new TableRename("t2", "t3"),
+            new ColumnRename("t3", "s2", "s1"),
+            new TableRename("t1", "t2"));
+    EvolvedSchema newSchema = new EvolvedSchema();
+    schemaEvolutionList.forEach(schemaEvolution -> 
schemaEvolution.applyTo(newSchema));
+    schemaEvolutionList.forEach(schemaEvolution -> 
schemaEvolution.applyTo(allSchema));
+
+    EvolvedSchema mergedShema = EvolvedSchema.merge(oldSchema, newSchema);
+
+    assertEquals(allSchema, mergedShema);
+  }
+}
\ No newline at end of file

Reply via email to