This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch force_ci/support_schema_evolution in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c54b065574e67509880c9ef45392e244c847ee62 Author: Tian Jiang <[email protected]> AuthorDate: Sun Jan 4 09:30:30 2026 +0800 temp --- .../constant/InnerSeqCompactionPerformer.java | 8 ++++--- .../performer/impl/FastCompactionPerformer.java | 9 ++++++-- .../impl/ReadChunkCompactionPerformer.java | 26 +++++++++++++++------- .../impl/ReadPointCompactionPerformer.java | 19 ++++++++-------- .../execute/task/InnerSpaceCompactionTask.java | 1 + .../task/RepairUnsortedFileCompactionTask.java | 8 ++++--- .../execute/utils/MultiTsFileDeviceIterator.java | 16 +++++++++++-- .../writer/AbstractCrossCompactionWriter.java | 10 ++++----- .../writer/AbstractInnerCompactionWriter.java | 5 +++-- .../compaction/io/CompactionTsFileWriter.java | 14 ++++++++---- .../dataregion/tsfile/evolution/EvolvedSchema.java | 5 ++++- .../tsfile/generator/TsFileNameGenerator.java | 1 + .../compaction/AbstractCompactionTest.java | 3 ++- .../compaction/utils/CompactionCheckerUtils.java | 3 ++- 14 files changed, 86 insertions(+), 42 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/constant/InnerSeqCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/constant/InnerSeqCompactionPerformer.java index f64597e25d4..0404ec65e75 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/constant/InnerSeqCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/constant/InnerSeqCompactionPerformer.java @@ -25,7 +25,9 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.tsfile.encrypt.EncryptParameter; +import org.apache.tsfile.utils.Pair; public enum InnerSeqCompactionPerformer { READ_CHUNK, @@ -54,12 +56,12 @@ public enum InnerSeqCompactionPerformer { } } - public ISeqCompactionPerformer createInstance(EncryptParameter encryptParameter) { + public ISeqCompactionPerformer createInstance(EncryptParameter encryptParameter, Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) { switch (this) { case READ_CHUNK: - return new ReadChunkCompactionPerformer(encryptParameter); + return new ReadChunkCompactionPerformer(encryptParameter, maxTsFileSetEndVersionAndMinResource); case FAST: - return new FastCompactionPerformer(false, encryptParameter); + return new FastCompactionPerformer(false, encryptParameter, maxTsFileSetEndVersionAndMinResource); default: throw new IllegalCompactionPerformerException( "Illegal compaction performer for seq inner compaction " + this); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java index bc950a27114..a45d0deb5b2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java @@ -102,12 +102,13 @@ public class FastCompactionPerformer private final boolean isCrossCompaction; private EncryptParameter encryptParameter; + private final Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource; @TestOnly public FastCompactionPerformer( List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles, - List<TsFileResource> targetFiles) { + List<TsFileResource> targetFiles, Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) { this.seqFiles = seqFiles; this.unseqFiles = unseqFiles; this.targetFiles = targetFiles; @@ -121,13 +122,14 @@ public class FastCompactionPerformer new EncryptParameter( TSFileDescriptor.getInstance().getConfig().getEncryptType(), TSFileDescriptor.getInstance().getConfig().getEncryptKey()); + this.maxTsFileSetEndVersionAndMinResource = maxTsFileSetEndVersionAndMinResource; } public FastCompactionPerformer( List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles, List<TsFileResource> targetFiles, - EncryptParameter encryptParameter) { + EncryptParameter encryptParameter, Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) { this.seqFiles = seqFiles; this.unseqFiles = unseqFiles; this.targetFiles = targetFiles; @@ -138,6 +140,7 @@ public class FastCompactionPerformer isCrossCompaction = true; } this.encryptParameter = encryptParameter; + this.maxTsFileSetEndVersionAndMinResource = maxTsFileSetEndVersionAndMinResource; } @TestOnly @@ -147,11 +150,13 @@ public class FastCompactionPerformer new EncryptParameter( TSFileDescriptor.getInstance().getConfig().getEncryptType(), TSFileDescriptor.getInstance().getConfig().getEncryptKey()); + this.maxTsFileSetEndVersionAndMinResource = new Pair<>(Long.MIN_VALUE, null); } public FastCompactionPerformer(boolean isCrossCompaction, EncryptParameter encryptParameter) { this.isCrossCompaction = isCrossCompaction; this.encryptParameter = encryptParameter; + this.maxTsFileSetEndVersionAndMinResource = new Pair<>(Long.MIN_VALUE, null); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java index 2fefbba9ab1..4280c13b24f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java @@ -70,7 +70,8 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer { * IoTDBDescriptor.getInstance().getConfig().getChunkMetadataSizeProportion()); private Schema schema = null; - private EncryptParameter firstEncryptParameter; + private final EncryptParameter firstEncryptParameter; + protected final Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource; @TestOnly public ReadChunkCompactionPerformer(List<TsFileResource> sourceFiles, TsFileResource targetFile) { @@ -80,8 +81,9 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer { public ReadChunkCompactionPerformer( List<TsFileResource> sourceFiles, TsFileResource targetFile, - EncryptParameter encryptParameter) { - this(sourceFiles, Collections.singletonList(targetFile), encryptParameter); + EncryptParameter encryptParameter, + Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) { + this(sourceFiles, Collections.singletonList(targetFile), encryptParameter, maxTsFileSetEndVersionAndMinResource); } @TestOnly @@ -90,27 +92,32 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer { setSourceFiles(sourceFiles); setTargetFiles(targetFiles); this.firstEncryptParameter = EncryptDBUtils.getDefaultFirstEncryptParam(); + this.maxTsFileSetEndVersionAndMinResource = new Pair<>(Long.MIN_VALUE, null); } public ReadChunkCompactionPerformer( List<TsFileResource> sourceFiles, List<TsFileResource> targetFiles, - EncryptParameter encryptParameter) { + EncryptParameter encryptParameter, + Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) { setSourceFiles(sourceFiles); setTargetFiles(targetFiles); this.firstEncryptParameter = encryptParameter; + this.maxTsFileSetEndVersionAndMinResource = maxTsFileSetEndVersionAndMinResource; } @TestOnly public ReadChunkCompactionPerformer(List<TsFileResource> sourceFiles) { setSourceFiles(sourceFiles); this.firstEncryptParameter = EncryptDBUtils.getDefaultFirstEncryptParam(); + this.maxTsFileSetEndVersionAndMinResource = new Pair<>(Long.MIN_VALUE, null); } public ReadChunkCompactionPerformer( - List<TsFileResource> sourceFiles, EncryptParameter encryptParameter) { + List<TsFileResource> sourceFiles, EncryptParameter encryptParameter, Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) { setSourceFiles(sourceFiles); this.firstEncryptParameter = encryptParameter; + this.maxTsFileSetEndVersionAndMinResource = maxTsFileSetEndVersionAndMinResource; } @TestOnly @@ -119,10 +126,12 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer { new EncryptParameter( TSFileDescriptor.getInstance().getConfig().getEncryptType(), TSFileDescriptor.getInstance().getConfig().getEncryptKey()); + this.maxTsFileSetEndVersionAndMinResource = new Pair<>(Long.MIN_VALUE, null); } - public ReadChunkCompactionPerformer(EncryptParameter encryptParameter) { + public ReadChunkCompactionPerformer(EncryptParameter encryptParameter, Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) { this.firstEncryptParameter = encryptParameter; + this.maxTsFileSetEndVersionAndMinResource = maxTsFileSetEndVersionAndMinResource; } @Override @@ -137,7 +146,8 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer { CompactionTableSchemaCollector.collectSchema( seqFiles, deviceIterator.getReaderMap(), - deviceIterator.getDeprecatedTableSchemaMap()); + deviceIterator.getDeprecatedTableSchemaMap(), + maxTsFileSetEndVersionAndMinResource); while (deviceIterator.hasNextDevice()) { currentWriter = getAvailableCompactionWriter(); Pair<IDeviceID, Boolean> deviceInfo = deviceIterator.nextDevice(); @@ -208,7 +218,7 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer { targetResources.get(currentTargetFileIndex), memoryBudgetForFileWriter, CompactionType.INNER_SEQ_COMPACTION, - firstEncryptParameter); + firstEncryptParameter, maxTsFileSetEndVersionAndMinResource.getLeft()); currentWriter.setSchema(CompactionTableSchemaCollector.copySchema(schema)); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java index 824bb7e5196..9b18cee6839 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java @@ -178,10 +178,10 @@ public class ReadPointCompactionPerformer if (isAligned) { compactAlignedSeries( - device, deviceIterator, compactionWriter, fragmentInstanceContext, queryDataSource); + device, deviceIterator, compactionWriter, fragmentInstanceContext, queryDataSource, maxTsFileSetEndVersionAndMinResource); } else { compactNonAlignedSeries( - device, deviceIterator, compactionWriter, fragmentInstanceContext, queryDataSource); + device, deviceIterator, compactionWriter, fragmentInstanceContext, queryDataSource, maxTsFileSetEndVersionAndMinResource); } summary.setTemporaryFileSize(compactionWriter.getWriterSize()); } @@ -217,9 +217,10 @@ public class ReadPointCompactionPerformer MultiTsFileDeviceIterator deviceIterator, AbstractCompactionWriter compactionWriter, FragmentInstanceContext fragmentInstanceContext, - QueryDataSource queryDataSource) + QueryDataSource queryDataSource, + Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) throws IOException, MetadataException { - Map<String, MeasurementSchema> schemaMap = deviceIterator.getAllSchemasOfCurrentDevice(); + Map<String, MeasurementSchema> schemaMap = deviceIterator.getAllSchemasOfCurrentDevice(maxTsFileSetEndVersionAndMinResource); IMeasurementSchema timeSchema = schemaMap.remove(TsFileConstant.TIME_COLUMN_ID); List<IMeasurementSchema> measurementSchemas = new ArrayList<>(schemaMap.values()); if (measurementSchemas.isEmpty()) { @@ -242,13 +243,11 @@ public class ReadPointCompactionPerformer true); if (dataBlockReader.hasNextBatch()) { - // chunkgroup is serialized only when at least one timeseries under this device has data compactionWriter.startChunkGroup(device, true); - measurementSchemas.add(0, timeSchema); compactionWriter.startMeasurement( TsFileConstant.TIME_COLUMN_ID, new AlignedChunkWriterImpl( - measurementSchemas.remove(0), + timeSchema, measurementSchemas, EncryptUtils.getEncryptParameter(getEncryptParameter())), 0); @@ -265,9 +264,11 @@ public class ReadPointCompactionPerformer MultiTsFileDeviceIterator deviceIterator, AbstractCompactionWriter compactionWriter, FragmentInstanceContext fragmentInstanceContext, - QueryDataSource queryDataSource) + QueryDataSource queryDataSource, + Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) throws IOException, InterruptedException, ExecutionException { - Map<String, MeasurementSchema> schemaMap = deviceIterator.getAllSchemasOfCurrentDevice(); + Map<String, MeasurementSchema> schemaMap = deviceIterator.getAllSchemasOfCurrentDevice( + maxTsFileSetEndVersionAndMinResource); List<String> allMeasurements = new ArrayList<>(schemaMap.keySet()); allMeasurements.sort((String::compareTo)); int subTaskNums = Math.min(allMeasurements.size(), SUB_TASK_NUM); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java index ef4d1d850fd..86ea3ea6989 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java @@ -378,6 +378,7 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { new File(skippedSourceFile.getParentFile().getPath() + File.separator + newFileName), TsFileResourceStatus.COMPACTING); filesView.renamedTargetFiles.add(renamedTargetFile); + renamedTargetFile.setTsFileManager(tsFileManager); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java index fe1957975cf..dba8b5e4f2a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java @@ -121,9 +121,11 @@ public class RepairUnsortedFileCompactionTask extends InnerSpaceCompactionTask { @Override protected void calculateSourceFilesAndTargetFiles() throws IOException { filesView.sourceFilesInLog = filesView.sourceFilesInCompactionPerformer; + TsFileResource targetResource = new TsFileResource(generateTargetFile(), + TsFileResourceStatus.COMPACTING); + targetResource.setTsFileManager(tsFileManager); filesView.targetFilesInLog = - Collections.singletonList( - new TsFileResource(generateTargetFile(), TsFileResourceStatus.COMPACTING)); + Collections.singletonList(targetResource); filesView.targetFilesInPerformer = filesView.targetFilesInLog; } @@ -137,7 +139,7 @@ public class RepairUnsortedFileCompactionTask extends InnerSpaceCompactionTask { sourceFile.isSeq() ? lastAllocatedFileTimestamp.incrementAndGet() : sourceFileName.getTime(), - sourceFile.isSeq() ? 0 : sourceFileName.getVersion(), + sourceFileName.getVersion(), sourceFileName.getInnerCompactionCnt() + 1, sourceFileName.getCrossCompactionCnt()); // if source file is sequence, the sequence data targetFileDir should be replaced to unsequence diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java index aef15ad3dd4..a6136d7bd2f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java @@ -275,7 +275,8 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { * * @throws IOException if io errors occurred */ - public Map<String, MeasurementSchema> getAllSchemasOfCurrentDevice() throws IOException { + public Map<String, MeasurementSchema> getAllSchemasOfCurrentDevice( + Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) throws IOException { Map<String, MeasurementSchema> schemaMap = new ConcurrentHashMap<>(); // get schemas from the newest file to the oldest file for (TsFileResource resource : tsFileResourcesSortedByDesc) { @@ -292,12 +293,23 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { deviceIteratorMap.get(resource).getFirstMeasurementNodeOfCurrentDevice(), schemaMap.keySet(), true); + EvolvedSchema evolvedSchema = resource.getMergedEvolvedSchema( + maxTsFileSetEndVersionAndMinResource.left); + if (evolvedSchema != null) { + // the device has been rewritten, should get the original name for rewriting + evolvedSchema.rewriteToFinal(evolvedSchema.getOriginalTableName(currentDevice.left.getTableName()), timeseriesMetadataList); + } + for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) { if (!schemaMap.containsKey(timeseriesMetadata.getMeasurementId()) && !timeseriesMetadata.getChunkMetadataList().isEmpty()) { + MeasurementSchema measurementSchema = reader.getMeasurementSchema( + timeseriesMetadata.getChunkMetadataList()); + // the column may be renamed + measurementSchema.setMeasurementName(timeseriesMetadata.getMeasurementId()); schemaMap.put( timeseriesMetadata.getMeasurementId(), - reader.getMeasurementSchema(timeseriesMetadata.getChunkMetadataList())); + measurementSchema); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java index 6584513f606..cfa377c3133 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java @@ -19,17 +19,14 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer; -import java.util.stream.Collectors; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionTableSchema; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils; import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileWriter; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType; -import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchema; -import org.apache.iotdb.db.storageengine.dataregion.tsfile.fileset.TsFileSet; import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex; import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; import org.apache.iotdb.db.utils.EncryptDBUtils; @@ -83,13 +80,14 @@ public abstract class AbstractCrossCompactionWriter extends AbstractCompactionWr protected AbstractCrossCompactionWriter( List<TsFileResource> targetResources, List<TsFileResource> seqFileResources) throws IOException { - this(targetResources, seqFileResources, EncryptDBUtils.getDefaultFirstEncryptParam()); + this(targetResources, seqFileResources, EncryptDBUtils.getDefaultFirstEncryptParam(), Long.MIN_VALUE); } protected AbstractCrossCompactionWriter( List<TsFileResource> targetResources, List<TsFileResource> seqFileResources, - EncryptParameter encryptParameter) + EncryptParameter encryptParameter, + long maxTsFileSetEndVersion) throws IOException { currentDeviceEndTime = new long[seqFileResources.size()]; isCurrentDeviceExistedInSourceSeqFiles = new boolean[seqFileResources.size()]; @@ -108,7 +106,7 @@ public abstract class AbstractCrossCompactionWriter extends AbstractCompactionWr targetResources.get(i), memorySizeForEachWriter, CompactionType.CROSS_COMPACTION, - this.encryptParameter)); + this.encryptParameter, maxTsFileSetEndVersion)); isEmptyFile[i] = true; } this.seqTsFileResources = seqFileResources; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java index d7d7a44c00b..28ca734f32d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java @@ -117,6 +117,7 @@ public abstract class AbstractInnerCompactionWriter extends AbstractCompactionWr } private void useNewWriter() throws IOException { + long maxTsFileSetEndVersion = maxTsFileSetEndVersionAndMinResource.left; fileWriter = new CompactionTsFileWriter( targetResources.get(currentFileIndex), @@ -124,9 +125,9 @@ public abstract class AbstractInnerCompactionWriter extends AbstractCompactionWr targetResources.get(currentFileIndex).isSeq() ? CompactionType.INNER_SEQ_COMPACTION : CompactionType.INNER_UNSEQ_COMPACTION, - encryptParameter); + encryptParameter, + maxTsFileSetEndVersion); Schema schema = CompactionTableSchemaCollector.copySchema(schemas.get(0)); - long maxTsFileSetEndVersion = maxTsFileSetEndVersionAndMinResource.left; TsFileResource minVersionResource = maxTsFileSetEndVersionAndMinResource.getRight(); fileWriter.getTsFileResource().setTsFileManager(minVersionResource.getTsFileManager()); EvolvedSchema evolvedSchema = fileWriter.getTsFileResource().getMergedEvolvedSchema(maxTsFileSetEndVersion); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java index 7d7bf5cba82..c8c24fca8e4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.Compacti import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionIoDataType; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchema; import org.apache.iotdb.db.utils.EncryptDBUtils; import org.apache.tsfile.encrypt.EncryptParameter; @@ -55,18 +56,22 @@ public class CompactionTsFileWriter extends TsFileIOWriter { private volatile boolean isWritingAligned = false; private boolean isEmptyTargetFile = true; private IDeviceID currentDeviceId; - private TsFileResource tsFileResource; - private EncryptParameter firstEncryptParameter; + private final TsFileResource tsFileResource; + private final EvolvedSchema evolvedSchema; + + private final EncryptParameter firstEncryptParameter; @TestOnly public CompactionTsFileWriter(File file, long maxMetadataSize, CompactionType type) throws IOException { - this(new TsFileResource(file), maxMetadataSize, type, EncryptDBUtils.getDefaultFirstEncryptParam()); + this(new TsFileResource(file), maxMetadataSize, type, EncryptDBUtils.getDefaultFirstEncryptParam(), + Long.MIN_VALUE); } public CompactionTsFileWriter( - TsFileResource tsFile, long maxMetadataSize, CompactionType type, EncryptParameter encryptParameter) + TsFileResource tsFile, long maxMetadataSize, CompactionType type, EncryptParameter encryptParameter, + long maxTsFileSetEndVersion) throws IOException { super(tsFile.getTsFile(), maxMetadataSize, encryptParameter); this.tsFileResource = tsFile; @@ -75,6 +80,7 @@ public class CompactionTsFileWriter extends TsFileIOWriter { super.out = new CompactionTsFileOutput( super.out, CompactionTaskManager.getInstance().getMergeWriteRateLimiter()); + evolvedSchema = tsFileResource.getMergedEvolvedSchema(maxTsFileSetEndVersion); } public EncryptParameter getEncryptParameter() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java index e447a1fc630..a88e76d4327 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEn import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate; import org.apache.tsfile.enums.ColumnCategory; +import org.apache.tsfile.file.metadata.IChunkMetadata; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.IDeviceID.Factory; import org.apache.tsfile.file.metadata.TableSchema; @@ -231,8 +232,10 @@ public class EvolvedSchema { String originalTableName, List<TimeseriesMetadata> timeseriesMetadataList) { timeseriesMetadataList.forEach( timeseriesMetadata -> { + String finalColumnName = getFinalColumnName(originalTableName, + timeseriesMetadata.getMeasurementId()); timeseriesMetadata.setMeasurementId( - getFinalColumnName(originalTableName, timeseriesMetadata.getMeasurementId())); + finalColumnName); }); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/generator/TsFileNameGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/generator/TsFileNameGenerator.java index 16be82188e9..698b4b95be6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/generator/TsFileNameGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/generator/TsFileNameGenerator.java @@ -363,6 +363,7 @@ public class TsFileNameGenerator { TsFileResourceStatus.COMPACTING); targetResource.setSeq(sequence); targetResources.add(targetResource); + targetResource.setTsFileManager(resource.getTsFileManager()); } return targetResources; } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java index a5503bb9e64..31ed98eb0e1 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java @@ -850,7 +850,8 @@ public class AbstractCompactionTest { Pair<IDeviceID, Boolean> iDeviceIDBooleanPair = deviceIterator.nextDevice(); IDeviceID deviceID = iDeviceIDBooleanPair.getLeft(); boolean isAlign = iDeviceIDBooleanPair.getRight(); - Map<String, MeasurementSchema> schemaMap = deviceIterator.getAllSchemasOfCurrentDevice(); + Map<String, MeasurementSchema> schemaMap = deviceIterator.getAllSchemasOfCurrentDevice( + new Pair<>(Long.MIN_VALUE, null)); IMeasurementSchema timeSchema = schemaMap.remove(TsFileConstant.TIME_COLUMN_ID); List<IMeasurementSchema> measurementSchemas = new ArrayList<>(schemaMap.values()); if (measurementSchemas.isEmpty()) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionCheckerUtils.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionCheckerUtils.java index 6c3e692212a..d3851e4e48e 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionCheckerUtils.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionCheckerUtils.java @@ -538,7 +538,8 @@ public class CompactionCheckerUtils { Pair<IDeviceID, Boolean> iDeviceIDBooleanPair = deviceIterator.nextDevice(); IDeviceID deviceID = iDeviceIDBooleanPair.getLeft(); boolean isAlign = iDeviceIDBooleanPair.getRight(); - Map<String, MeasurementSchema> schemaMap = deviceIterator.getAllSchemasOfCurrentDevice(); + Map<String, MeasurementSchema> schemaMap = deviceIterator.getAllSchemasOfCurrentDevice( + new Pair<>(Long.MIN_VALUE, null)); IMeasurementSchema timeSchema = schemaMap.remove(TsFileConstant.TIME_COLUMN_ID); List<IMeasurementSchema> measurementSchemas = new ArrayList<>(schemaMap.values()); if (measurementSchemas.isEmpty()) {
