This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch support_schema_evolution
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/support_schema_evolution by
this push:
new f4d9110622e add mergedSchema
f4d9110622e is described below
commit f4d9110622ef24069ed263b945b675a748c060ab
Author: Tian Jiang <[email protected]>
AuthorDate: Thu Nov 27 17:15:17 2025 +0800
add mergedSchema
---
.../db/storageengine/dataregion/DataRegion.java | 4 +-
.../dataregion/tsfile/TsFileResource.java | 12 +++++
.../dataregion/tsfile/evolution/EvolvedSchema.java | 53 +++++++++++++---------
.../tsfile/evolution/SchemaEvolutionFile.java | 15 ++++--
4 files changed, 58 insertions(+), 26 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index b1520c56ec9..4a28034c84e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1189,13 +1189,13 @@ public class DataRegion implements IDataRegionForQuery {
public void applySchemaEvolution(List<SchemaEvolution> schemaEvolutions) {
long startTime = System.nanoTime();
- writeLock("InsertRow");
+ writeLock("applySchemaEvolution");
PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() -
startTime);
try {
if (deleted) {
return;
}
-
+ DataNodeTableCache.getInstance().invalid(databaseName);
syncCloseAllWorkingTsFileProcessors();
for (Entry<Long, Long> partitionVersionEntry :
partitionMaxFileVersions.entrySet()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index 53b07166b94..d26172122a6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion.tsfile;
+import java.util.stream.Collectors;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
@@ -42,6 +43,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEnt
import org.apache.iotdb.db.storageengine.dataregion.modification.v1.Deletion;
import
org.apache.iotdb.db.storageengine.dataregion.modification.v1.Modification;
import
org.apache.iotdb.db.storageengine.dataregion.modification.v1.ModificationFileV1;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchema;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.fileset.TsFileSet;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDeviceTimeIndex;
@@ -1638,4 +1640,14 @@ public class TsFileResource implements
PersistentResource, Cloneable {
public List<TsFileSet> getTsFileSets() {
return tsFileSets;
}
+
+ public EvolvedSchema getMergedSchema() throws IOException {
+ List<EvolvedSchema> list = new ArrayList<>();
+ for (TsFileSet fileSet : getTsFileSets()) {
+ EvolvedSchema readEvolvedSchema = fileSet.readEvolvedSchema();
+ list.add(readEvolvedSchema);
+ }
+
+ return EvolvedSchema.merge(list.toArray(new EvolvedSchema[0]));
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java
index f56a30bd1e3..13d6bc6fdea 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java
@@ -98,33 +98,44 @@ public class EvolvedSchema {
public static EvolvedSchema deepCopy(EvolvedSchema evolvedSchema) {
EvolvedSchema newEvolvedSchema = new EvolvedSchema();
- newEvolvedSchema.originalTableNames = new
HashMap<>(evolvedSchema.originalTableNames);
- newEvolvedSchema.originalColumnNames = new
HashMap<>(evolvedSchema.originalColumnNames);
+ newEvolvedSchema.originalTableNames = new
LinkedHashMap<>(evolvedSchema.originalTableNames);
+ newEvolvedSchema.originalColumnNames = new
LinkedHashMap<>(evolvedSchema.originalColumnNames);
return newEvolvedSchema;
}
- public static EvolvedSchema merge(EvolvedSchema oldSchema, EvolvedSchema
newSchema) {
- if (oldSchema == null) {
- return newSchema;
- }
- if (newSchema == null) {
- return oldSchema;
- }
+ public static EvolvedSchema merge(EvolvedSchema... schemas) {
+ EvolvedSchema firstNotNullSchema = null;
+ int i = 0;
+ for (; i < schemas.length; i++) {
+ if (schemas[i] != null) {
+ firstNotNullSchema = schemas[i];
+ i++;
+ break;
+ }
+ }
- EvolvedSchema mergedSchema = deepCopy(oldSchema);
- for (Entry<String, String> finalOriginalTableName :
newSchema.originalTableNames.entrySet()) {
- mergedSchema.renameTable(finalOriginalTableName.getValue(),
finalOriginalTableName.getKey());
- }
- for (Entry<String, Map<String, String>> finalTableNameColumnNameMapEntry :
newSchema.originalColumnNames.entrySet()) {
- for (Entry<String, String> finalColNameOriginalColNameEntry :
finalTableNameColumnNameMapEntry.getValue()
- .entrySet()) {
- String finalTableName = finalTableNameColumnNameMapEntry.getKey();
- String finalColName = finalColNameOriginalColNameEntry.getKey();
- String originalColName = finalColNameOriginalColNameEntry.getValue();
- mergedSchema.renameColumn(finalTableName, originalColName,
finalColName);
+ if (firstNotNullSchema == null) {
+ return null;
+ }
+ EvolvedSchema mergedSchema = deepCopy(firstNotNullSchema);
+
+ for (; i < schemas.length; i++) {
+ if (schemas[i] != null) {
+ EvolvedSchema newSchema = schemas[i];
+ for (Entry<String, String> finalOriginalTableName :
newSchema.originalTableNames.entrySet()) {
+ mergedSchema.renameTable(finalOriginalTableName.getValue(),
finalOriginalTableName.getKey());
+ }
+ for (Entry<String, Map<String, String>>
finalTableNameColumnNameMapEntry : newSchema.originalColumnNames.entrySet()) {
+ for (Entry<String, String> finalColNameOriginalColNameEntry :
finalTableNameColumnNameMapEntry.getValue()
+ .entrySet()) {
+ String finalTableName = finalTableNameColumnNameMapEntry.getKey();
+ String finalColName = finalColNameOriginalColNameEntry.getKey();
+ String originalColName =
finalColNameOriginalColNameEntry.getValue();
+ mergedSchema.renameColumn(finalTableName, originalColName,
finalColName);
+ }
+ }
}
}
-
return mergedSchema;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolutionFile.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolutionFile.java
index ac9e94bdc5f..e90c1ac32ca 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolutionFile.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolutionFile.java
@@ -40,10 +40,15 @@ public class SchemaEvolutionFile {
this.filePath = filePath;
}
- private void recoverFile() throws IOException {
+ /**
+ * Recover the SchemaEvolutionFile if it is broken.
+ * @return true if the file exists false otherwise
+ * @throws IOException if the file cannot be recovered
+ */
+ private boolean recoverFile() throws IOException {
File file = new File(filePath);
if (!file.exists() || file.length() == 0) {
- return;
+ return false;
}
long length = file.length();
@@ -55,6 +60,7 @@ public class SchemaEvolutionFile {
fileChannel.truncate(validLength);
}
}
+ return true;
}
public static long parseValidLength(String fileName) {
@@ -79,7 +85,10 @@ public class SchemaEvolutionFile {
}
public EvolvedSchema readAsSchema() throws IOException {
- recoverFile();
+ boolean exists = recoverFile();
+ if (!exists) {
+ return null;
+ }
EvolvedSchema evolvedSchema = new EvolvedSchema();
try (FileInputStream fis = new FileInputStream(filePath);