This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch force_ci/support_schema_evolution in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 12b959ae2439a633fce5a54895d2727c07455d58 Author: Tian Jiang <[email protected]> AuthorDate: Wed Dec 31 16:35:07 2025 +0800 temp --- .../performer/impl/FastCompactionPerformer.java | 14 ++++- .../impl/ReadChunkCompactionPerformer.java | 2 +- .../impl/ReadPointCompactionPerformer.java | 13 ++++- .../execute/utils/CompactionTableSchema.java | 9 +++ .../utils/CompactionTableSchemaCollector.java | 20 ++++++- .../execute/utils/MultiTsFileDeviceIterator.java | 18 +++++- .../utils/ReorderedTsFileDeviceIterator.java | 65 ++++++++++++++++++++++ .../utils/TransformedTsFileDeviceIterator.java | 51 +++++++++++++++++ .../utils/writer/AbstractCompactionWriter.java | 4 +- .../writer/AbstractCrossCompactionWriter.java | 25 ++++++++- .../writer/AbstractInnerCompactionWriter.java | 16 +++++- .../compaction/io/CompactionTsFileWriter.java | 13 ++++- .../dataregion/tsfile/TsFileManager.java | 33 ++++++----- .../dataregion/tsfile/TsFileResource.java | 49 ++++++++++++++-- .../dataregion/tsfile/evolution/EvolvedSchema.java | 41 ++++++++++++++ 15 files changed, 335 insertions(+), 38 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java index 91184aaec82..bc950a27114 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PatternTreeMap; @@ -162,10 +164,18 @@ public class FastCompactionPerformer ? new FastCrossCompactionWriter( targetFiles, seqFiles, readerCacheMap, encryptParameter) : new FastInnerCompactionWriter(targetFiles, encryptParameter)) { + + List<TsFileResource> allSourceFiles = Stream.concat(seqFiles.stream(), unseqFiles.stream()) + .sorted(TsFileResource::compareFileName) + .collect(Collectors.toList()); + Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource = TsFileResource.getMaxTsFileSetEndVersionAndMinResource( + allSourceFiles); List<Schema> schemas = CompactionTableSchemaCollector.collectSchema( - seqFiles, unseqFiles, readerCacheMap, deviceIterator.getDeprecatedTableSchemaMap()); - compactionWriter.setSchemaForAllTargetFile(schemas); + seqFiles, unseqFiles, readerCacheMap, deviceIterator.getDeprecatedTableSchemaMap(), + maxTsFileSetEndVersionAndMinResource); + + compactionWriter.setSchemaForAllTargetFile(schemas, maxTsFileSetEndVersionAndMinResource); readModification(seqFiles); readModification(unseqFiles); while (deviceIterator.hasNextDevice()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java index eaed8a7f6eb..2fefbba9ab1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java @@ -205,7 +205,7 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer { private void useNewWriter() throws IOException { currentWriter = new CompactionTsFileWriter( - targetResources.get(currentTargetFileIndex).getTsFile(), + targetResources.get(currentTargetFileIndex), memoryBudgetForFileWriter, CompactionType.INNER_SEQ_COMPACTION, firstEncryptParameter); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java index c58870357d9..824bb7e5196 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl; +import java.util.stream.Stream; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.path.AlignedFullPath; @@ -153,13 +154,21 @@ public class ReadPointCompactionPerformer // Do not close device iterator, because tsfile reader is managed by FileReaderManager. MultiTsFileDeviceIterator deviceIterator = new MultiTsFileDeviceIterator(seqFiles, unseqFiles); + List<TsFileResource> allSourceFiles = Stream.concat(seqFiles.stream(), unseqFiles.stream()) + .sorted(TsFileResource::compareFileName) + .collect(Collectors.toList()); + Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource = TsFileResource.getMaxTsFileSetEndVersionAndMinResource( + allSourceFiles); + List<Schema> schemas = CompactionTableSchemaCollector.collectSchema( seqFiles, unseqFiles, deviceIterator.getReaderMap(), - deviceIterator.getDeprecatedTableSchemaMap()); - compactionWriter.setSchemaForAllTargetFile(schemas); + deviceIterator.getDeprecatedTableSchemaMap(), + maxTsFileSetEndVersionAndMinResource); + + compactionWriter.setSchemaForAllTargetFile(schemas, maxTsFileSetEndVersionAndMinResource); while (deviceIterator.hasNextDevice()) { checkThreadInterrupted(); Pair<IDeviceID, Boolean> deviceInfo = deviceIterator.nextDevice(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchema.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchema.java index 3f6e83cbe96..99c4286c759 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchema.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchema.java @@ -33,6 +33,15 @@ public class CompactionTableSchema extends TableSchema { super(tableName); } + public CompactionTableSchema(TableSchema tableSchema) { + this(tableSchema.getTableName(), tableSchema.getColumnSchemas(), tableSchema.getColumnTypes()); + } + + public CompactionTableSchema(String tableName, List<IMeasurementSchema> columnSchemas, + List<ColumnCategory> columnCategories) { + super(tableName, columnSchemas, columnCategories); + } + public boolean merge(TableSchema tableSchema) { if (tableSchema == null) { return true; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchemaCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchemaCollector.java index 55640c3fcfa..d5eb8d08d98 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchemaCollector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchemaCollector.java @@ -21,8 +21,10 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchema; import org.apache.tsfile.file.metadata.TableSchema; import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.utils.Pair; import org.apache.tsfile.write.schema.Schema; import java.io.IOException; @@ -42,7 +44,8 @@ public class CompactionTableSchemaCollector { List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles, Map<TsFileResource, TsFileSequenceReader> readerMap, - Map<TsFileResource, Set<String>> deprecatedTableSchemaMap) + Map<TsFileResource, Set<String>> deprecatedTableSchemaMap, + Pair<Long, TsFileResource> maxTsFileSetEndVersionAndAssociatedResource) throws IOException { List<Schema> targetSchemas = new ArrayList<>(seqFiles.size()); Schema schema = @@ -51,7 +54,8 @@ public class CompactionTableSchemaCollector { .sorted(TsFileResource::compareFileName) .collect(Collectors.toList()), readerMap, - deprecatedTableSchemaMap); + deprecatedTableSchemaMap, + maxTsFileSetEndVersionAndAssociatedResource); targetSchemas.add(schema); for (int i = 1; i < seqFiles.size(); i++) { @@ -72,10 +76,12 @@ public class CompactionTableSchemaCollector { public static Schema collectSchema( List<TsFileResource> sourceFiles, Map<TsFileResource, TsFileSequenceReader> readerMap, - Map<TsFileResource, Set<String>> deprecatedTableSchemaMap) + Map<TsFileResource, Set<String>> deprecatedTableSchemaMap, + Pair<Long, TsFileResource> maxTsFileSetEndVersionAndAssociatedResource) throws IOException { Schema targetSchema = new Schema(); Map<String, TableSchema> targetTableSchemaMap = new HashMap<>(); + for (int i = 0; i < sourceFiles.size(); i++) { TsFileResource resource = sourceFiles.get(i); TsFileSequenceReader reader = readerMap.get(resource); @@ -84,12 +90,20 @@ public class CompactionTableSchemaCollector { // v3 tsfile continue; } + + EvolvedSchema evolvedSchema = resource.getMergedEvolvedSchema( + maxTsFileSetEndVersionAndAssociatedResource.getLeft()); + for (Map.Entry<String, TableSchema> entry : tableSchemaMap.entrySet()) { String tableName = entry.getKey(); TableSchema currentTableSchema = entry.getValue(); if (isTreeModel(currentTableSchema)) { continue; } + if (evolvedSchema != null) { + currentTableSchema = evolvedSchema.rewriteToFinal(currentTableSchema); + } + // merge all id columns, measurement schema will be generated automatically when end chunk // group CompactionTableSchema collectedTableSchema = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java index 1889182a2db..aef15ad3dd4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java @@ -32,6 +32,8 @@ import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; import org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry; import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchema; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.fileset.TsFileSet; import org.apache.iotdb.db.utils.EncryptDBUtils; import org.apache.iotdb.db.utils.ModificationUtils; import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; @@ -95,6 +97,12 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { // sort the files from the newest to the oldest Collections.sort( this.tsFileResourcesSortedByDesc, TsFileResource::compareFileCreationOrderByDesc); + long maxTsFileSetEndVersion = this.tsFileResourcesSortedByDesc.stream().mapToLong( + // max endVersion of all filesets of a TsFile + resource -> resource.getTsFileSets().stream().mapToLong(TsFileSet::getEndVersion).max() + .orElse(Long.MAX_VALUE)) + // overall max endVersion + .max().orElse(Long.MAX_VALUE); try { for (TsFileResource tsFileResource : this.tsFileResourcesSortedByDesc) { CompactionTsFileReader reader = @@ -103,7 +111,15 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { CompactionType.INNER_SEQ_COMPACTION, EncryptDBUtils.getFirstEncryptParamFromTSFilePath(tsFileResource.getTsFilePath())); readerMap.put(tsFileResource, reader); - deviceIteratorMap.put(tsFileResource, reader.getAllDevicesIteratorWithIsAligned()); + TsFileDeviceIterator tsFileDeviceIterator; + EvolvedSchema evolvedSchema = tsFileResource.getMergedEvolvedSchema(maxTsFileSetEndVersion); + if (evolvedSchema != null) { + tsFileDeviceIterator = new ReorderedTsFileDeviceIterator(reader, + evolvedSchema::rewriteToFinal); + } else { + tsFileDeviceIterator = reader.getAllDevicesIteratorWithIsAligned(); + } + deviceIteratorMap.put(tsFileResource, tsFileDeviceIterator); } } catch (Exception e) { // if there is any exception occurs diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/ReorderedTsFileDeviceIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/ReorderedTsFileDeviceIterator.java new file mode 100644 index 00000000000..14d2db8a191 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/ReorderedTsFileDeviceIterator.java @@ -0,0 +1,65 @@ +package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.function.Function; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.MetadataIndexNode; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.utils.Pair; + +public class ReorderedTsFileDeviceIterator extends TransformedTsFileDeviceIterator { + + private final List<Pair<Pair<IDeviceID, Boolean>, MetadataIndexNode>> deviceIDAndFirstMeasurementNodeList = new ArrayList<>(); + private Iterator<Pair<Pair<IDeviceID, Boolean>, MetadataIndexNode>> deviceIDListIterator; + private Pair<Pair<IDeviceID, Boolean>, MetadataIndexNode> current; + + public ReorderedTsFileDeviceIterator(TsFileSequenceReader reader, + Function<IDeviceID, IDeviceID> transformer) + throws IOException { + super(reader, transformer); + collectAndSort(); + } + + public ReorderedTsFileDeviceIterator(TsFileSequenceReader reader, String tableName, + Function<IDeviceID, IDeviceID> transformer) throws IOException { + super(reader, tableName, transformer); + collectAndSort(); + } + + private void collectAndSort() { + while (super.hasNext()) { + Pair<IDeviceID, Boolean> next = super.next(); + next.left = transformer.apply(next.left); + deviceIDAndFirstMeasurementNodeList.add(new Pair<>(next, super.getFirstMeasurementNodeOfCurrentDevice())); + } + deviceIDAndFirstMeasurementNodeList.sort(Comparator.comparing(p -> p.getLeft().getLeft())); + deviceIDListIterator = deviceIDAndFirstMeasurementNodeList.iterator(); + } + + @Override + public boolean hasNext() { + return deviceIDListIterator.hasNext(); + } + + @Override + public Pair<IDeviceID, Boolean> next() { + Pair<Pair<IDeviceID, Boolean>, MetadataIndexNode> next = deviceIDListIterator.next(); + current = next; + return next.left; + } + + @Override + public Pair<IDeviceID, Boolean> current() { + return current.left; + } + + @Override + public MetadataIndexNode getFirstMeasurementNodeOfCurrentDevice() { + // the devices have been reordered, cannot use the measurementNode + return current.right; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/TransformedTsFileDeviceIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/TransformedTsFileDeviceIterator.java new file mode 100644 index 00000000000..f1af028226d --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/TransformedTsFileDeviceIterator.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils; + +import java.io.IOException; +import java.util.function.Function; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.read.TsFileDeviceIterator; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.utils.Pair; + +public class TransformedTsFileDeviceIterator extends TsFileDeviceIterator { + + protected Function<IDeviceID, IDeviceID> transformer; + + public TransformedTsFileDeviceIterator(TsFileSequenceReader reader, Function<IDeviceID, IDeviceID> transformer) + throws IOException { + super(reader); + this.transformer = transformer; + } + + public TransformedTsFileDeviceIterator(TsFileSequenceReader reader, String tableName, Function<IDeviceID, IDeviceID> transformer) + throws IOException { + super(reader, tableName); + this.transformer = transformer; + } + + @Override + public Pair<IDeviceID, Boolean> next() { + Pair<IDeviceID, Boolean> next = super.next(); + next.left = transformer.apply(next.left); + return next; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java index 3458f9870d4..623f2b3287d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.wri import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileWriter; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.tsfile.encrypt.EncryptParameter; import org.apache.tsfile.exception.write.PageException; import org.apache.tsfile.file.header.PageHeader; @@ -38,6 +39,7 @@ import org.apache.tsfile.file.metadata.statistics.Statistics; import org.apache.tsfile.read.TimeValuePair; import org.apache.tsfile.read.common.Chunk; import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.TsPrimitiveType; import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl; import org.apache.tsfile.write.chunk.ChunkWriterImpl; @@ -338,5 +340,5 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { } } - public abstract void setSchemaForAllTargetFile(List<Schema> schemas); + public abstract void setSchemaForAllTargetFile(List<Schema> schemas, Pair<Long, TsFileResource> maxTsFileSetEndVersionAndAssociatedResource); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java index f970ad65e56..6584513f606 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java @@ -19,12 +19,17 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer; +import java.util.stream.Collectors; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionTableSchema; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils; import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileWriter; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchema; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.fileset.TsFileSet; import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex; import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; import org.apache.iotdb.db.utils.EncryptDBUtils; @@ -35,6 +40,7 @@ import org.apache.tsfile.file.metadata.TimeseriesMetadata; import org.apache.tsfile.read.TimeValuePair; import org.apache.tsfile.read.TsFileSequenceReader; import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.TsPrimitiveType; import org.apache.tsfile.write.schema.Schema; @@ -99,7 +105,7 @@ public abstract class AbstractCrossCompactionWriter extends AbstractCompactionWr for (int i = 0; i < targetResources.size(); i++) { this.targetFileWriters.add( new CompactionTsFileWriter( - targetResources.get(i).getTsFile(), + targetResources.get(i), memorySizeForEachWriter, CompactionType.CROSS_COMPACTION, this.encryptParameter)); @@ -266,9 +272,22 @@ public abstract class AbstractCrossCompactionWriter extends AbstractCompactionWr } @Override - public void setSchemaForAllTargetFile(List<Schema> schemas) { + public void setSchemaForAllTargetFile(List<Schema> schemas, Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) { for (int i = 0; i < targetFileWriters.size(); i++) { - targetFileWriters.get(i).setSchema(schemas.get(i)); + CompactionTsFileWriter compactionTsFileWriter = targetFileWriters.get(i); + Schema schema = schemas.get(i); + TsFileResource targetResource = compactionTsFileWriter.getTsFileResource(); + if (maxTsFileSetEndVersionAndMinResource.right != null) { + long maxTsFileSetEndVersion = maxTsFileSetEndVersionAndMinResource.left; + TsFileResource minVersionResource = maxTsFileSetEndVersionAndMinResource.getRight(); + targetResource.setTsFileManager(minVersionResource.getTsFileManager()); + EvolvedSchema evolvedSchema = targetResource.getMergedEvolvedSchema(maxTsFileSetEndVersion); + + schema = evolvedSchema.rewriteToOriginal(schema, CompactionTableSchema::new); + compactionTsFileWriter.setSchema(schema); + } else { + compactionTsFileWriter.setSchema(schema); + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java index 6573bb7e96e..d7d7a44c00b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java @@ -21,11 +21,13 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.wr import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionTableSchema; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionTableSchemaCollector; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils; import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileWriter; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchema; import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; import org.apache.iotdb.db.utils.EncryptDBUtils; @@ -33,6 +35,7 @@ import org.apache.tsfile.encrypt.EncryptParameter; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.read.TimeValuePair; import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.utils.Pair; import org.apache.tsfile.write.schema.Schema; import java.io.IOException; @@ -46,6 +49,7 @@ public abstract class AbstractInnerCompactionWriter extends AbstractCompactionWr protected long endedFileSize = 0; protected List<Schema> schemas; protected EncryptParameter encryptParameter; + protected Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource; protected final long memoryBudgetForFileWriter = (long) @@ -115,13 +119,18 @@ public abstract class AbstractInnerCompactionWriter extends AbstractCompactionWr private void useNewWriter() throws IOException { fileWriter = new CompactionTsFileWriter( - targetResources.get(currentFileIndex).getTsFile(), + targetResources.get(currentFileIndex), memoryBudgetForFileWriter, targetResources.get(currentFileIndex).isSeq() ? CompactionType.INNER_SEQ_COMPACTION : CompactionType.INNER_UNSEQ_COMPACTION, encryptParameter); - fileWriter.setSchema(CompactionTableSchemaCollector.copySchema(schemas.get(0))); + Schema schema = CompactionTableSchemaCollector.copySchema(schemas.get(0)); + long maxTsFileSetEndVersion = maxTsFileSetEndVersionAndMinResource.left; + TsFileResource minVersionResource = maxTsFileSetEndVersionAndMinResource.getRight(); + fileWriter.getTsFileResource().setTsFileManager(minVersionResource.getTsFileManager()); + EvolvedSchema evolvedSchema = fileWriter.getTsFileResource().getMergedEvolvedSchema(maxTsFileSetEndVersion); + fileWriter.setSchema(evolvedSchema.rewriteToOriginal(schema, CompactionTableSchema::new)); } @Override @@ -174,8 +183,9 @@ public abstract class AbstractInnerCompactionWriter extends AbstractCompactionWr } @Override - public void setSchemaForAllTargetFile(List<Schema> schemas) { + public void setSchemaForAllTargetFile(List<Schema> schemas, Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) { this.schemas = schemas; + this.maxTsFileSetEndVersionAndMinResource = maxTsFileSetEndVersionAndMinResource; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java index 1f822e3b934..7d7bf5cba82 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java @@ -24,6 +24,7 @@ import org.apache.iotdb.db.service.metrics.CompactionMetrics; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionIoDataType; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.utils.EncryptDBUtils; import org.apache.tsfile.encrypt.EncryptParameter; @@ -54,19 +55,21 @@ public class CompactionTsFileWriter extends TsFileIOWriter { private volatile boolean isWritingAligned = false; private boolean isEmptyTargetFile = true; private IDeviceID currentDeviceId; + private TsFileResource tsFileResource; private EncryptParameter firstEncryptParameter; @TestOnly public CompactionTsFileWriter(File file, long maxMetadataSize, CompactionType type) throws IOException { - this(file, maxMetadataSize, type, EncryptDBUtils.getDefaultFirstEncryptParam()); + this(new TsFileResource(file), maxMetadataSize, type, EncryptDBUtils.getDefaultFirstEncryptParam()); } public CompactionTsFileWriter( - File file, long maxMetadataSize, CompactionType type, EncryptParameter encryptParameter) + TsFileResource tsFile, long maxMetadataSize, CompactionType type, EncryptParameter encryptParameter) throws IOException { - super(file, maxMetadataSize, encryptParameter); + super(tsFile.getTsFile(), maxMetadataSize, encryptParameter); + this.tsFileResource = tsFile; this.firstEncryptParameter = encryptParameter; this.type = type; super.out = @@ -192,4 +195,8 @@ public class CompactionTsFileWriter extends TsFileIOWriter { iterator.remove(); } } + + public TsFileResource getTsFileResource() { + return tsFileResource; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java index 4466668ad5f..11dcde29b37 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.storageengine.dataregion.tsfile; +import java.util.stream.Collectors; import org.apache.iotdb.commons.utils.TimePartitionUtils; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.storageengine.dataregion.modification.ModFileManagement; @@ -56,6 +57,7 @@ public class TsFileManager { private final TreeMap<Long, TsFileResourceList> sequenceFiles = new TreeMap<>(); private final TreeMap<Long, TsFileResourceList> unsequenceFiles = new TreeMap<>(); private final TreeMap<Long, ModFileManagement> modFileManagementMap = new TreeMap<>(); + private final TreeMap<Long, List<TsFileSet>> tsfileSets = new TreeMap<>(); private volatile boolean allowCompaction = true; private final AtomicLong currentCompactionTaskSerialId = new AtomicLong(0); @@ -237,6 +239,7 @@ public class TsFileManager { modFileManagementMap.computeIfAbsent( timePartition, t -> new PartitionLevelModFileManager())); } + tsFileResource.setTsFileManager(this); } finally { writeUnlock(); } @@ -255,6 +258,7 @@ public class TsFileManager { modFileManagementMap.computeIfAbsent( tsFileResource.getTimePartition(), t -> new PartitionLevelModFileManager())); } + tsFileResource.setTsFileManager(this); } finally { writeUnlock(); } @@ -273,6 +277,7 @@ public class TsFileManager { modFileManagementMap.computeIfAbsent( tsFileResource.getTimePartition(), t -> new PartitionLevelModFileManager())); } + tsFileResource.setTsFileManager(this); } finally { writeUnlock(); } @@ -333,6 +338,7 @@ public class TsFileManager { modFileManagementMap.computeIfAbsent( resource.getTimePartition(), t -> new PartitionLevelModFileManager())); } + resource.setTsFileManager(this); } } } finally { @@ -512,21 +518,22 @@ public class TsFileManager { public void addTsFileSet(TsFileSet newSet, long partitionId) { writeLock("addTsFileSet"); try { - TsFileResourceList tsFileResources = sequenceFiles.get(partitionId); - if (tsFileResources != null) { - for (TsFileResource tsFileResource : tsFileResources) { - tsFileResource.addFileSet(newSet); - } - } - - tsFileResources = unsequenceFiles.get(partitionId); - if (tsFileResources != null) { - for (TsFileResource tsFileResource : tsFileResources) { - tsFileResource.addFileSet(newSet); - } - } + List<TsFileSet> tsFileSetList = tsfileSets.computeIfAbsent(partitionId, + p -> new ArrayList<>()); + tsFileSetList.add(newSet); } finally { writeUnlock(); } } + + public List<TsFileSet> getTsFileSet(long partitionId, long minFileVersionIncluded, long maxFileVersionExcluded) { + readLock(); + try { + List<TsFileSet> tsFileSetList = tsfileSets.get(partitionId); + return tsFileSetList.stream().filter(s -> s.getEndVersion() < maxFileVersionExcluded && s.getEndVersion() >= minFileVersionIncluded).collect( + Collectors.toList()); + } finally { + readUnlock(); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java index 060c16fea7d..a387db509c6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java @@ -211,8 +211,7 @@ public class TsFileResource implements PersistentResource, Cloneable { private Map<IDeviceID, List<Pair<String, TimeValuePair>>> lastValues; - // TsFileSets this TsFile belongs to - private final List<TsFileSet> tsFileSets = new ArrayList<>(); + private TsFileManager tsFileManager = null; @TestOnly public TsFileResource() { @@ -1632,17 +1631,25 @@ public class TsFileResource implements PersistentResource, Cloneable { return (TsFileResource) clone(); } - public void addFileSet(TsFileSet tsFileSet) { - tsFileSets.add(tsFileSet); + public List<TsFileSet> getTsFileSets() { + return tsFileManager.getTsFileSet(tsFileID.timePartitionId, tsFileID.fileVersion, Long.MAX_VALUE); } - public List<TsFileSet> getTsFileSets() { - return tsFileSets; + public List<TsFileSet> getTsFileSets(long maxEndVersionExcluded) { + return tsFileManager.getTsFileSet(tsFileID.timePartitionId, tsFileID.fileVersion, maxEndVersionExcluded); } public EvolvedSchema getMergedEvolvedSchema() { + return getMergedEvolvedSchema(Long.MAX_VALUE); + } + + public EvolvedSchema getMergedEvolvedSchema(long excludedMaxFileVersion) { List<EvolvedSchema> list = new ArrayList<>(); for (TsFileSet fileSet : getTsFileSets()) { + if (fileSet.getEndVersion() >= excludedMaxFileVersion) { + continue; + } + try { EvolvedSchema readEvolvedSchema = fileSet.readEvolvedSchema(); list.add(readEvolvedSchema); @@ -1653,4 +1660,34 @@ public class TsFileResource implements PersistentResource, Cloneable { return EvolvedSchema.merge(list.toArray(new EvolvedSchema[0])); } + + public static Pair<Long, TsFileResource> getMaxTsFileSetEndVersionAndMinResource(List<TsFileResource> tsFileResources) { + long maxTsFileSetEndVersion = Long.MIN_VALUE; + long minResourceVersion = Long.MAX_VALUE; + TsFileResource minTsFileResource = null; + for (TsFileResource tsFileResource : tsFileResources) { + List<TsFileSet> tsFileSets = tsFileResource.getTsFileSets(); + if (tsFileSets.isEmpty()) { + continue; + } + TsFileSet lastTsFileSet = tsFileSets.get(tsFileSets.size() - 1); + if (lastTsFileSet.getEndVersion() > maxTsFileSetEndVersion) { + maxTsFileSetEndVersion = lastTsFileSet.getEndVersion(); + } + if (tsFileResource.getTsFileID().fileVersion < minResourceVersion) { + minTsFileResource = tsFileResource; + minResourceVersion = tsFileResource.getTsFileID().fileVersion; + } + } + return new Pair<>(maxTsFileSetEndVersion, minTsFileResource); + } + + public void setTsFileManager( + TsFileManager tsFileManager) { + this.tsFileManager = tsFileManager; + } + + public TsFileManager getTsFileManager() { + return tsFileManager; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java index 7d04bb1aa3e..e447a1fc630 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution; +import java.util.function.Function; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionTableSchema; import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry.ModType; @@ -42,6 +44,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.stream.Collectors; +import org.apache.tsfile.write.schema.Schema; public class EvolvedSchema { // the evolved table names after applying all schema evolution operations @@ -243,6 +246,27 @@ public class EvolvedSchema { return finalTableSchemas; } + private TableSchema rewriteToOriginal(TableSchema tableSchema) { + String originalTableName = getOriginalTableName(tableSchema.getTableName()); + + List<IMeasurementSchema> measurementSchemas = + new ArrayList<>(tableSchema.getColumnSchemas().size()); + List<ColumnCategory> columnCategories = new ArrayList<>(tableSchema.getColumnTypes().size()); + List<IMeasurementSchema> columnSchemas = tableSchema.getColumnSchemas(); + for (int i = 0, columnSchemasSize = columnSchemas.size(); i < columnSchemasSize; i++) { + IMeasurementSchema measurementSchema = columnSchemas.get(i); + measurementSchemas.add( + new MeasurementSchema( + getOriginalColumnName( + tableSchema.getTableName(), measurementSchema.getMeasurementName()), + measurementSchema.getType(), + measurementSchema.getEncodingType(), measurementSchema.getCompressor())); + columnCategories.add(tableSchema.getColumnTypes().get(i)); + } + + return new TableSchema(originalTableName, measurementSchemas, columnCategories); + } + public TableSchema rewriteToFinal(TableSchema tableSchema) { String finalTableName = getFinalTableName(tableSchema.getTableName()); @@ -328,4 +352,21 @@ public class EvolvedSchema { } return mergedSchema; } + + public Schema rewriteToOriginal(Schema schema) { + return rewriteToOriginal(schema, null); + } + public Schema rewriteToOriginal(Schema schema, Function<TableSchema, TableSchema> tableSchemaTransformer) { + Schema copySchema = new Schema(); + for (TableSchema tableSchema : schema.getTableSchemaMap().values()) { + TableSchema originalSchema = rewriteToOriginal(tableSchema); + if (tableSchemaTransformer != null) { + originalSchema = tableSchemaTransformer.apply(originalSchema); + } + copySchema.registerTableSchema(originalSchema); + } + return copySchema; + } + + }
