This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch force_ci/support_schema_evolution in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 448be8980756e0eebbd896e845e2029d9521a5fb Author: Tian Jiang <[email protected]> AuthorDate: Tue Jan 13 16:47:05 2026 +0800 Add EvolvedSchemaCache --- .../dataregion/tsfile/evolution/EvolvedSchema.java | 20 ++++++++-- .../tsfile/evolution/EvolvedSchemaCache.java | 45 ++++++++++++++++++++++ .../dataregion/tsfile/fileset/TsFileSet.java | 25 ++++++------ 3 files changed, 73 insertions(+), 17 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java index 8c04b4805f3..1e6c22e057a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java @@ -32,6 +32,8 @@ import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.IDeviceID.Factory; import org.apache.tsfile.file.metadata.TableSchema; import org.apache.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.tsfile.utils.Accountable; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; import org.apache.tsfile.write.schema.Schema; @@ -47,7 +49,8 @@ import java.util.Objects; import java.util.function.Function; import java.util.stream.Collectors; -public class EvolvedSchema { +public class EvolvedSchema implements Accountable { + // the evolved table names after applying all schema evolution operations private Map<String, String> finalToOriginalTableNames = new LinkedHashMap<>(); @@ -285,8 +288,8 @@ public class EvolvedSchema { measurementSchemas.add( new MeasurementSchema( getFinalColumnName( - tableSchema.getTableName(), measurementSchema.getMeasurementName()), - measurementSchema.getType(), + tableSchema.getTableName(), measurementSchema.getMeasurementName()), + measurementSchema.getType(), measurementSchema.getEncodingType(), measurementSchema.getCompressor())); columnCategories.add(tableSchema.getColumnTypes().get(i)); } @@ -390,8 +393,17 @@ public class EvolvedSchema { String originalTableName) { for (IChunkMetadata iChunkMetadata : abstractAlignedChunkMetadata.getValueChunkMetadataList()) { if (iChunkMetadata != null) { - iChunkMetadata.setMeasurementUid(getFinalColumnName(originalTableName, iChunkMetadata.getMeasurementUid())); + iChunkMetadata.setMeasurementUid( + getFinalColumnName(originalTableName, iChunkMetadata.getMeasurementUid())); } } } + + @Override + public long ramBytesUsed() { + return RamUsageEstimator.sizeOfMap(this.finalToOriginalTableNames) + + RamUsageEstimator.sizeOfMap(this.finalToOriginalColumnNames) + + RamUsageEstimator.sizeOfMap(this.originalToFinalTableNames) + RamUsageEstimator.sizeOfMap( + this.originalToFinalColumnNames); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchemaCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchemaCache.java new file mode 100644 index 00000000000..13188395e2d --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchemaCache.java @@ -0,0 +1,45 @@ +package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Weigher; +import java.util.function.Supplier; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.fileset.TsFileSet; +import org.checkerframework.checker.nullness.qual.Nullable; + +public class EvolvedSchemaCache { + + private Cache<TsFileSet, EvolvedSchema> cache; + + private EvolvedSchemaCache() { + cache = Caffeine.newBuilder().weigher( + (Weigher<TsFileSet, EvolvedSchema>) (k, v) -> { + // TsFileSet is always in memory, do not count it + return (int) v.ramBytesUsed(); + } + ).maximumWeight( + // TODO-Sevo configurable + 128 * 1024 * 1024L + ).build(); + } + + public void put(TsFileSet tsFileSet, EvolvedSchema schema) { + cache.put(tsFileSet, schema); + } + + public @Nullable EvolvedSchema computeIfAbsent(TsFileSet tsFileSet, Supplier<EvolvedSchema> schemaSupplier) { + return cache.get(tsFileSet, k -> schemaSupplier.get()); + } + + public void invalidate(TsFileSet tsFileSet) { + cache.invalidate(tsFileSet); + } + + public static EvolvedSchemaCache getInstance() { + return InstanceHolder.INSTANCE; + } + + private static class InstanceHolder { + private static final EvolvedSchemaCache INSTANCE = new EvolvedSchemaCache(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java index 3ae2949deaa..af53868a771 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.storageengine.dataregion.tsfile.fileset; import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchema; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchemaCache; import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution; import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolutionFile; @@ -29,10 +30,12 @@ import java.io.File; import java.io.IOException; import java.util.Collection; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.tsfile.utils.Accountable; +import org.apache.tsfile.utils.RamUsageEstimator; /** TsFileSet represents a set of TsFiles in a time partition whose version <= endVersion. */ public class TsFileSet implements Comparable<TsFileSet> { - + public static final String FILE_SET_DIR_NAME = "filesets"; private final long endVersion; @@ -40,17 +43,6 @@ public class TsFileSet implements Comparable<TsFileSet> { private final ReentrantReadWriteLock lock; private SchemaEvolutionFile schemaEvolutionFile; - // only As comparator key - private TsFileSet(long endVersion) { - this.endVersion = endVersion; - this.fileSetDir = null; - this.lock = null; - } - - public static TsFileSet comparatorKey(long endVersion) { - return new TsFileSet(endVersion); - } - public TsFileSet(long endVersion, String fileSetsDir, boolean recover) { this.endVersion = endVersion; this.fileSetDir = new File(fileSetsDir + File.separator + endVersion); @@ -86,6 +78,7 @@ public class TsFileSet implements Comparable<TsFileSet> { writeLock(); try { schemaEvolutionFile.append(schemaEvolutions); + EvolvedSchemaCache.getInstance().invalidate(this); } finally { writeUnlock(); } @@ -94,7 +87,13 @@ public class TsFileSet implements Comparable<TsFileSet> { public EvolvedSchema readEvolvedSchema() throws IOException { readLock(); try { - return schemaEvolutionFile.readAsSchema(); + return EvolvedSchemaCache.getInstance().computeIfAbsent(this, () -> { + try { + return schemaEvolutionFile.readAsSchema(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); } finally { readUnlock(); }
