This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch force_ci/support_schema_evolution in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 498f392d47a36955c316d157588e4bb9a5582caf Author: Tian Jiang <[email protected]> AuthorDate: Tue Jan 13 09:36:18 2026 +0800 support sevo in ReadChunkCompactionPerformer --- .../performer/impl/FastCompactionPerformer.java | 29 +- .../impl/ReadChunkCompactionPerformer.java | 13 +- .../subtask/FastCompactionPerformerSubTask.java | 12 +- .../execute/utils/MultiTsFileDeviceIterator.java | 54 +++- .../ReadChunkAlignedSeriesCompactionExecutor.java | 4 +- .../dataregion/tsfile/TsFileResource.java | 32 ++- .../compaction/ReadChunkInnerCompactionTest.java | 312 +++++++++++++++++++++ 7 files changed, 428 insertions(+), 28 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java index 568db7081bc..4c4b88e765f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java @@ -46,6 +46,7 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimato import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.FastCrossSpaceCompactionEstimator; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchema; import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; import org.apache.tsfile.common.conf.TSFileDescriptor; @@ -207,10 +208,26 @@ public class FastCompactionPerformer boolean isTreeModel = !isAligned || device.getTableName().startsWith("root."); long ttl = deviceIterator.getTTLForCurrentDevice(); sortedSourceFiles.removeIf( - x -> x.definitelyNotContains(device) || !x.isDeviceAlive(device, ttl)); + x -> { + EvolvedSchema evolvedSchema = x.getMergedEvolvedSchema( + maxTsFileSetEndVersionAndMinResource.left); + IDeviceID originalDevice = device; + if (evolvedSchema != null) { + originalDevice = evolvedSchema.rewriteToFinal(device); + } + return x.definitelyNotContains(originalDevice) || !x.isDeviceAlive(originalDevice, ttl); + }); // checked above - //noinspection OptionalGetWithoutIsPresent - sortedSourceFiles.sort(Comparator.comparingLong(x -> x.getStartTime(device).get())); + sortedSourceFiles.sort(Comparator.comparingLong(x -> { + EvolvedSchema evolvedSchema = x.getMergedEvolvedSchema( + maxTsFileSetEndVersionAndMinResource.left); + IDeviceID originalDevice = device; + if (evolvedSchema != null) { + originalDevice = evolvedSchema.rewriteToFinal(device); + } + //noinspection OptionalGetWithoutIsPresent + return x.getStartTime(originalDevice).get(); + })); ModEntry ttlDeletion = null; if (ttl != Long.MAX_VALUE) { ttlDeletion = @@ -289,7 +306,8 @@ public class FastCompactionPerformer measurementSchemas, deviceId, taskSummary, - ignoreAllNullRows) + ignoreAllNullRows, + maxTsFileSetEndVersionAndMinResource) .call(); subTaskSummary.increase(taskSummary); } @@ -338,7 +356,8 @@ public class FastCompactionPerformer measurementsForEachSubTask[i], deviceID, taskSummary, - i))); + i, + maxTsFileSetEndVersionAndMinResource))); taskSummaryList.add(taskSummary); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java index 904b17567d9..785f8b62294 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java @@ -26,6 +26,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ISeqCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionTableSchema; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionTableSchemaCollector; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.MultiTsFileDeviceIterator; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.BatchedReadChunkAlignedSeriesCompactionExecutor; @@ -35,6 +36,7 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractInnerSpaceEstimator; import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.ReadChunkInnerCompactionEstimator; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchema; import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; import org.apache.iotdb.db.utils.EncryptDBUtils; @@ -212,14 +214,21 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer { } private void useNewWriter() throws IOException { + TsFileResource tsFileResource = targetResources.get(currentTargetFileIndex); currentWriter = new CompactionTsFileWriter( - targetResources.get(currentTargetFileIndex), + tsFileResource, memoryBudgetForFileWriter, CompactionType.INNER_SEQ_COMPACTION, firstEncryptParameter, maxTsFileSetEndVersionAndMinResource.getLeft()); - currentWriter.setSchema(CompactionTableSchemaCollector.copySchema(schema)); + + Schema schema = CompactionTableSchemaCollector.copySchema(this.schema); + TsFileResource minVersionResource = maxTsFileSetEndVersionAndMinResource.getRight(); + tsFileResource.setTsFileManager(minVersionResource.getTsFileManager()); + EvolvedSchema evolvedSchema = + tsFileResource.getMergedEvolvedSchema(maxTsFileSetEndVersionAndMinResource.getLeft()); + currentWriter.setSchema(evolvedSchema != null ? evolvedSchema.rewriteToOriginal(schema, CompactionTableSchema::new) : schema); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/subtask/FastCompactionPerformerSubTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/subtask/FastCompactionPerformerSubTask.java index afd82268c5d..33393370641 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/subtask/FastCompactionPerformerSubTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/subtask/FastCompactionPerformerSubTask.java @@ -31,6 +31,7 @@ import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; +import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory.ModsSerializer; import org.apache.tsfile.exception.write.PageException; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.read.TsFileSequenceReader; @@ -75,6 +76,8 @@ public class FastCompactionPerformerSubTask implements Callable<Void> { private List<IMeasurementSchema> measurementSchemas; + private final Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource; + /** Used for nonAligned timeseries. */ @SuppressWarnings("squid:S107") public FastCompactionPerformerSubTask( @@ -87,7 +90,8 @@ public class FastCompactionPerformerSubTask implements Callable<Void> { List<String> measurements, IDeviceID deviceId, FastCompactionTaskSummary summary, - int subTaskId) { + int subTaskId, + Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) { this.compactionWriter = compactionWriter; this.subTaskId = subTaskId; this.timeseriesMetadataOffsetMap = timeseriesMetadataOffsetMap; @@ -99,6 +103,7 @@ public class FastCompactionPerformerSubTask implements Callable<Void> { this.measurements = measurements; this.summary = summary; this.ignoreAllNullRows = true; + this.maxTsFileSetEndVersionAndMinResource = maxTsFileSetEndVersionAndMinResource; } /** Used for aligned timeseries. */ @@ -106,13 +111,13 @@ public class FastCompactionPerformerSubTask implements Callable<Void> { AbstractCompactionWriter compactionWriter, Map<String, Map<TsFileResource, Pair<Long, Long>>> timeseriesMetadataOffsetMap, Map<TsFileResource, TsFileSequenceReader> readerCacheMap, - Map<String, PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>> + Map<String, PatternTreeMap<ModEntry, ModsSerializer>> modificationCacheMap, List<TsFileResource> sortedSourceFiles, List<IMeasurementSchema> measurementSchemas, IDeviceID deviceId, FastCompactionTaskSummary summary, - boolean ignoreAllNullRows) { + boolean ignoreAllNullRows, Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) { this.compactionWriter = compactionWriter; this.subTaskId = 0; this.timeseriesMetadataOffsetMap = timeseriesMetadataOffsetMap; @@ -124,6 +129,7 @@ public class FastCompactionPerformerSubTask implements Callable<Void> { this.measurementSchemas = measurementSchemas; this.summary = summary; this.ignoreAllNullRows = ignoreAllNullRows; + this.maxTsFileSetEndVersionAndMinResource = maxTsFileSetEndVersionAndMinResource; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java index 067cb3602a5..13c83cbf21b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java @@ -82,6 +82,7 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { private long ttlForCurrentDevice; private long timeLowerBoundForCurrentDevice; private final String databaseName; + private final long maxTsFileSetEndVersion; /** * Used for compaction with read chunk performer. @@ -97,7 +98,7 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { // sort the files from the newest to the oldest Collections.sort( this.tsFileResourcesSortedByDesc, TsFileResource::compareFileCreationOrderByDesc); - long maxTsFileSetEndVersion = + maxTsFileSetEndVersion = this.tsFileResourcesSortedByDesc.stream() .mapToLong( // max endVersion of all filesets of a TsFile @@ -151,18 +152,17 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { Collections.sort( this.tsFileResourcesSortedByDesc, TsFileResource::compareFileCreationOrderByDesc); - long maxTsFileSetEndVersion = - this.tsFileResourcesSortedByDesc.stream() - .mapToLong( - // max endVersion of all filesets of a TsFile - resource -> - resource.getTsFileSets().stream() - .mapToLong(TsFileSet::getEndVersion) - .max() - .orElse(Long.MAX_VALUE)) - // overall max endVersion - .max() - .orElse(Long.MAX_VALUE); + maxTsFileSetEndVersion = this.tsFileResourcesSortedByDesc.stream() + .mapToLong( + // max endVersion of all filesets of a TsFile + resource -> + resource.getTsFileSets().stream() + .mapToLong(TsFileSet::getEndVersion) + .max() + .orElse(Long.MAX_VALUE)) + // overall max endVersion + .max() + .orElse(Long.MAX_VALUE); for (TsFileResource tsFileResource : tsFileResourcesSortedByDesc) { TsFileSequenceReader reader = @@ -200,7 +200,7 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { this.tsFileResourcesSortedByDesc, TsFileResource::compareFileCreationOrderByDesc); this.readerMap = readerMap; - long maxTsFileSetEndVersion = + maxTsFileSetEndVersion = this.tsFileResourcesSortedByDesc.stream() .mapToLong( // max endVersion of all filesets of a TsFile @@ -445,6 +445,12 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { true) .entrySet()) { String measurementId = entrySet.getKey(); + EvolvedSchema evolvedSchema = resource.getMergedEvolvedSchema(maxTsFileSetEndVersion); + if (evolvedSchema != null) { + String originalTableName = evolvedSchema.getOriginalTableName( + currentDevice.left.getTableName()); + measurementId = evolvedSchema.getFinalColumnName(originalTableName, measurementId); + } if (!timeseriesMetadataOffsetMap.containsKey(measurementId)) { MeasurementSchema schema = reader.getMeasurementSchema(entrySet.getValue().left); timeseriesMetadataOffsetMap.put(measurementId, new Pair<>(schema, new HashMap<>())); @@ -505,10 +511,28 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { MetadataIndexNode firstMeasurementNodeOfCurrentDevice = iterator.getFirstMeasurementNodeOfCurrentDevice(); TsFileSequenceReader reader = readerMap.get(tsFileResource); + EvolvedSchema evolvedSchema = tsFileResource.getMergedEvolvedSchema(maxTsFileSetEndVersion); + IDeviceID originalDeviceId = currentDevice.left; + if (evolvedSchema != null) { + // rewrite the deviceId to the original one so that we can use it to query the file + originalDeviceId = evolvedSchema.rewriteToOriginal(originalDeviceId); + } List<AbstractAlignedChunkMetadata> alignedChunkMetadataList = reader.getAlignedChunkMetadataByMetadataIndexNode( - currentDevice.left, firstMeasurementNodeOfCurrentDevice, ignoreAllNullRows); + originalDeviceId, firstMeasurementNodeOfCurrentDevice, ignoreAllNullRows); applyModificationForAlignedChunkMetadataList(tsFileResource, alignedChunkMetadataList); + + if (evolvedSchema != null) { + // rewrite the measurementId to the final ones so that they can be aligned with other files + for (AbstractAlignedChunkMetadata abstractAlignedChunkMetadata : alignedChunkMetadataList) { + for (IChunkMetadata chunkMetadata : abstractAlignedChunkMetadata.getValueChunkMetadataList()) { + if (chunkMetadata != null) { + chunkMetadata.setMeasurementUid(evolvedSchema.getFinalColumnName(originalDeviceId.getTableName(), chunkMetadata.getMeasurementUid())); + } + } + } + } + readerAndChunkMetadataList.add(new Pair<>(reader, alignedChunkMetadataList)); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java index 928c0cdbc6b..481efc2fdb3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java @@ -164,7 +164,7 @@ public class ReadChunkAlignedSeriesCompactionExecutor { ChunkHeader chunkHeader = reader.readChunkHeader(chunkMetadata.getOffsetOfChunkHeader()); IMeasurementSchema schema = new MeasurementSchema( - chunkHeader.getMeasurementID(), + chunkMetadata.getMeasurementUid(), chunkHeader.getDataType(), chunkHeader.getEncodingType(), chunkHeader.getCompressionType()); @@ -254,6 +254,8 @@ public class ReadChunkAlignedSeriesCompactionExecutor { return new InstantChunkLoader(); } Chunk chunk = reader.readMemChunk(chunkMetadata); + // the chunk may be renamed and chunkMetadata contains the final name + chunk.getHeader().setMeasurementID(chunkMetadata.getMeasurementUid()); return new InstantChunkLoader(reader.getFileName(), chunkMetadata, chunk); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java index 0bc12e3f6f2..de55bcd209f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java @@ -613,8 +613,21 @@ public class TsFileResource implements PersistentResource, Cloneable { } } - public Optional<Long> getStartTime(IDeviceID deviceId) { + public IDeviceID toOriginalDeviceID(IDeviceID deviceID) { + return toOriginalDeviceID(Long.MAX_VALUE, deviceID); + } + + public IDeviceID toOriginalDeviceID(long maxTsFileSetEndVersion, IDeviceID deviceID) { + EvolvedSchema evolvedSchema = getMergedEvolvedSchema(maxTsFileSetEndVersion); + if (evolvedSchema != null) { + return evolvedSchema.rewriteToOriginal(deviceID); + } + return deviceID; + } + + public Optional<Long> getStartTime(IDeviceID deviceId, long maxTsFileSetEndVersion) { try { + deviceId = toOriginalDeviceID(maxTsFileSetEndVersion, deviceId); return deviceId == null ? Optional.of(getFileStartTime()) : timeIndex.getStartTime(deviceId); } catch (Exception e) { LOGGER.error( @@ -626,9 +639,14 @@ public class TsFileResource implements PersistentResource, Cloneable { } } + public Optional<Long> getStartTime(IDeviceID deviceId) { + return getStartTime(deviceId, Long.MAX_VALUE); + } + /** open file's end time is Long.MIN_VALUE */ - public Optional<Long> getEndTime(IDeviceID deviceId) { + public Optional<Long> getEndTime(IDeviceID deviceId, long maxTsFileSetEndVersion) { try { + deviceId = toOriginalDeviceID(maxTsFileSetEndVersion, deviceId); return deviceId == null ? Optional.of(getFileEndTime()) : timeIndex.getEndTime(deviceId); } catch (Exception e) { LOGGER.error( @@ -640,6 +658,11 @@ public class TsFileResource implements PersistentResource, Cloneable { } } + /** open file's end time is Long.MIN_VALUE */ + public Optional<Long> getEndTime(IDeviceID deviceId) { + return getEndTime(deviceId, Long.MAX_VALUE); + } + // cannot use FileTimeIndex public long getOrderTimeForSeq(IDeviceID deviceId, boolean ascending, long maxTsFileSetEndVersion) { if (timeIndex instanceof ArrayDeviceTimeIndex) { @@ -723,6 +746,8 @@ public class TsFileResource implements PersistentResource, Cloneable { * Whether this TsFile definitely not contains this device, if ture, it must not contain this * device, if false, it may or may not contain this device Notice: using method be CAREFULLY and * you really understand the meaning!!!!! + * + * @param device the IDeviceID before schema evolution */ public boolean definitelyNotContains(IDeviceID device) { return timeIndex.definitelyNotContains(device); @@ -1018,6 +1043,7 @@ public class TsFileResource implements PersistentResource, Cloneable { } /** + * @param deviceId the IDeviceID after schema evolution * @return true if the device is contained in the TsFile */ @SuppressWarnings("OptionalGetWithoutIsPresent") @@ -1072,6 +1098,8 @@ public class TsFileResource implements PersistentResource, Cloneable { /** * Check whether the given device may still alive or not. Return false if the device does not * exist or out of dated. + * + * @param device IDeviceID before schema evolution */ public boolean isDeviceAlive(IDeviceID device, long ttl) { if (definitelyNotContains(device)) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/ReadChunkInnerCompactionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/ReadChunkInnerCompactionTest.java index 972db81b20a..f9c5f2d1852 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/ReadChunkInnerCompactionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/ReadChunkInnerCompactionTest.java @@ -26,34 +26,54 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.service.metrics.FileMetrics; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadPointCompactionPerformer; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger; +import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionFileGeneratorUtils; import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter; import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; import org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry; import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.ColumnRename; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.TableRename; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.fileset.TsFileSet; +import org.apache.iotdb.db.utils.EncryptDBUtils; +import org.apache.iotdb.db.utils.constant.TestConstant; import org.apache.tsfile.common.conf.TSFileConfig; import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.common.constant.TsFileConstant; +import org.apache.tsfile.enums.ColumnCategory; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.write.NoMeasurementException; +import org.apache.tsfile.exception.write.NoTableException; import org.apache.tsfile.exception.write.PageException; import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.file.metadata.ColumnSchemaBuilder; import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.IDeviceID.Factory; +import org.apache.tsfile.file.metadata.TableSchema; import org.apache.tsfile.file.metadata.enums.CompressionType; import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.apache.tsfile.read.TimeValuePair; import org.apache.tsfile.read.common.TimeRange; +import org.apache.tsfile.read.query.dataset.ResultSet; +import org.apache.tsfile.read.v4.ITsFileReader; +import org.apache.tsfile.read.v4.TsFileReaderBuilder; import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.write.TsFileWriter; import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl; import org.apache.tsfile.write.chunk.ChunkWriterImpl; import org.apache.tsfile.write.chunk.IChunkWriter; import org.apache.tsfile.write.chunk.ValueChunkWriter; import org.apache.tsfile.write.page.TimePageWriter; import org.apache.tsfile.write.page.ValuePageWriter; +import org.apache.tsfile.write.record.Tablet; import org.apache.tsfile.write.writer.TsFileIOWriter; import org.junit.After; import org.junit.Assert; @@ -79,6 +99,9 @@ import static org.apache.iotdb.db.storageengine.dataregion.compaction.utils.TsFi import static org.apache.iotdb.db.storageengine.dataregion.compaction.utils.TsFileGeneratorUtils.writeNonAlignedChunk; import static org.apache.iotdb.db.storageengine.dataregion.compaction.utils.TsFileGeneratorUtils.writeOneAlignedPage; import static org.apache.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class ReadChunkInnerCompactionTest extends AbstractCompactionTest { @Before @@ -973,4 +996,293 @@ public class ReadChunkInnerCompactionTest extends AbstractCompactionTest { "Unknown data type " + valuePageWriter.getStatistics().getType()); } } + + @Test + public void testWithSevoFile() throws Exception { + String fileSetDir = + TestConstant.BASE_OUTPUT_PATH + File.separator + TsFileSet.FILE_SET_DIR_NAME; + // file1: + // table1[s1, s2, s3] + // table2[s1, s2, s3] + File f1 = new File(SEQ_DIRS, "0-1-0-0.tsfile"); + TableSchema tableSchema1_1 = + new TableSchema( + "table1", + Arrays.asList( + new ColumnSchemaBuilder() + .name("s1") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s2") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s3") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build())); + TableSchema tableSchema1_2 = + new TableSchema( + "table2", + Arrays.asList( + new ColumnSchemaBuilder() + .name("s1") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s2") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s3") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build())); + try (TsFileWriter tsFileWriter = new TsFileWriter(f1)) { + tsFileWriter.registerTableSchema(tableSchema1_1); + tsFileWriter.registerTableSchema(tableSchema1_2); + + Tablet tablet1 = new Tablet(tableSchema1_1.getTableName(), tableSchema1_1.getColumnSchemas()); + tablet1.addTimestamp(0, 0); + tablet1.addValue(0, 0, 1); + tablet1.addValue(0, 1, 2); + tablet1.addValue(0, 2, 3); + + Tablet tablet2 = new Tablet(tableSchema1_2.getTableName(), tableSchema1_2.getColumnSchemas()); + tablet2.addTimestamp(0, 0); + tablet2.addValue(0, 0, 101); + tablet2.addValue(0, 1, 102); + tablet2.addValue(0, 2, 103); + + tsFileWriter.writeTable(tablet1); + tsFileWriter.writeTable(tablet2); + } + TsFileResource resource1 = new TsFileResource(f1); + resource1.setTsFileManager(tsFileManager); + resource1.updateStartTime(Factory.DEFAULT_FACTORY.create(new String[]{"table1"}), 0); + resource1.updateEndTime(Factory.DEFAULT_FACTORY.create(new String[]{"table1"}), 0); + resource1.updateStartTime(Factory.DEFAULT_FACTORY.create(new String[]{"table2"}), 0); + resource1.updateEndTime(Factory.DEFAULT_FACTORY.create(new String[]{"table2"}), 0); + resource1.close(); + + // rename table1 -> table0 + TsFileSet tsFileSet1 = new TsFileSet(1, fileSetDir, false); + tsFileSet1.appendSchemaEvolution( + Collections.singletonList(new TableRename("table1", "table0"))); + tsFileManager.addTsFileSet(tsFileSet1, 0); + + // file2: + // table0[s1, s2, s3] + // table2[s1, s2, s3] + File f2 = new File(SEQ_DIRS, "0-2-0-0.tsfile"); + TableSchema tableSchema2_1 = + new TableSchema( + "table0", + Arrays.asList( + new ColumnSchemaBuilder() + .name("s1") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s2") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s3") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build())); + TableSchema tableSchema2_2 = + new TableSchema( + "table2", + Arrays.asList( + new ColumnSchemaBuilder() + .name("s1") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s2") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s3") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build())); + try (TsFileWriter tsFileWriter = new TsFileWriter(f2)) { + tsFileWriter.registerTableSchema(tableSchema2_1); + tsFileWriter.registerTableSchema(tableSchema2_2); + + Tablet tablet1 = new Tablet(tableSchema2_1.getTableName(), tableSchema2_1.getColumnSchemas()); + tablet1.addTimestamp(0, 1); + tablet1.addValue(0, 0, 11); + tablet1.addValue(0, 1, 12); + tablet1.addValue(0, 2, 13); + + Tablet tablet2 = new Tablet(tableSchema2_2.getTableName(), tableSchema2_2.getColumnSchemas()); + tablet2.addTimestamp(0, 1); + tablet2.addValue(0, 0, 111); + tablet2.addValue(0, 1, 112); + tablet2.addValue(0, 2, 113); + + tsFileWriter.writeTable(tablet1); + tsFileWriter.writeTable(tablet2); + } + TsFileResource resource2 = new TsFileResource(f2); + resource2.setTsFileManager(tsFileManager); + resource2.updateStartTime(Factory.DEFAULT_FACTORY.create(new String[]{"table0"}), 1); + resource2.updateEndTime(Factory.DEFAULT_FACTORY.create(new String[]{"table0"}), 1); + resource2.updateStartTime(Factory.DEFAULT_FACTORY.create(new String[]{"table2"}), 1); + resource2.updateEndTime(Factory.DEFAULT_FACTORY.create(new String[]{"table2"}), 1); + resource2.close(); + + + // rename table0.s1 -> table0.s0 + TsFileSet tsFileSet2 = new TsFileSet(2, fileSetDir, false); + tsFileSet2.appendSchemaEvolution( + Collections.singletonList(new ColumnRename("table0", "s1", "s0"))); + tsFileManager.addTsFileSet(tsFileSet2, 0); + + // file3: + // table0[s0, s2, s3] + // table2[s1, s2, s3] + File f3 = new File(SEQ_DIRS, "0-3-0-0.tsfile"); + TableSchema tableSchema3_1 = + new TableSchema( + "table0", + Arrays.asList( + new ColumnSchemaBuilder() + .name("s0") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s2") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s3") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build())); + TableSchema tableSchema3_2 = + new TableSchema( + "table2", + Arrays.asList( + new ColumnSchemaBuilder() + .name("s1") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s2") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s3") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build())); + try (TsFileWriter tsFileWriter = new TsFileWriter(f3)) { + tsFileWriter.registerTableSchema(tableSchema3_1); + tsFileWriter.registerTableSchema(tableSchema3_2); + + Tablet tablet1 = new Tablet(tableSchema3_1.getTableName(), tableSchema3_1.getColumnSchemas()); + tablet1.addTimestamp(0, 2); + tablet1.addValue(0, 0, 21); + tablet1.addValue(0, 1, 22); + tablet1.addValue(0, 2, 23); + + Tablet tablet2 = new Tablet(tableSchema3_2.getTableName(), tableSchema3_2.getColumnSchemas()); + tablet2.addTimestamp(0, 2); + tablet2.addValue(0, 0, 121); + tablet2.addValue(0, 1, 122); + tablet2.addValue(0, 2, 123); + + tsFileWriter.writeTable(tablet1); + tsFileWriter.writeTable(tablet2); + } + TsFileResource resource3 = new TsFileResource(f3); + resource3.setTsFileManager(tsFileManager); + resource3.updateStartTime(Factory.DEFAULT_FACTORY.create(new String[]{"table0"}), 2); + resource3.updateEndTime(Factory.DEFAULT_FACTORY.create(new String[]{"table0"}), 2); + resource3.updateStartTime(Factory.DEFAULT_FACTORY.create(new String[]{"table2"}), 2); + resource3.updateEndTime(Factory.DEFAULT_FACTORY.create(new String[]{"table2"}), 2); + resource3.close(); + + // rename table2 -> table1 + TsFileSet tsFileSet3 = new TsFileSet(3, fileSetDir, false); + tsFileSet3.appendSchemaEvolution( + Collections.singletonList(new TableRename("table2", "table1"))); + tsFileManager.addTsFileSet(tsFileSet3, 0); + + // perform compaction + seqResources.add(resource1); + seqResources.add(resource2); + seqResources.add(resource3); + + List<TsFileResource> targetResources = + CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(seqResources, true); + targetResources.forEach(s -> s.setTsFileManager(tsFileManager)); + + ICompactionPerformer performer = + new ReadChunkCompactionPerformer(seqResources, targetResources, EncryptDBUtils.getDefaultFirstEncryptParam()); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); + + // target(version=1): + // table1[s1, s2, s3] + // table2[s1, s2, s3] + try (ITsFileReader tsFileReader = + new TsFileReaderBuilder().file(targetResources.get(0).getTsFile()).build()) { + // table1 should not exist + try { + tsFileReader.query("table0", Collections.singletonList("s2"), Long.MIN_VALUE, Long.MAX_VALUE); + fail("table0 should not exist"); + } catch (NoTableException e) { + assertEquals("Table table0 not found", e.getMessage()); + } + + // table1.s0 should not exist + try { + tsFileReader.query("table1", Collections.singletonList("s0"), Long.MIN_VALUE, Long.MAX_VALUE); + fail("table1.s0 should not exist"); + } catch (NoMeasurementException e) { + assertEquals("No measurement for s0", e.getMessage()); + } + + // check data of table1 + ResultSet resultSet = tsFileReader.query("table1", Arrays.asList("s1", "s2", "s3"), + Long.MIN_VALUE, Long.MAX_VALUE); + for (int i = 0; i < 3; i++) { + assertTrue(resultSet.next()); + assertEquals(i, resultSet.getLong(1)); + for (int j = 0; j < 3; j++) { + assertEquals(i * 10 + j + 1, resultSet.getLong(j + 2)); + } + } + + // check data of table2 + resultSet = tsFileReader.query("table2", Arrays.asList("s1", "s2", "s3"), + Long.MIN_VALUE, Long.MAX_VALUE); + for (int i = 0; i < 3; i++) { + assertTrue(resultSet.next()); + assertEquals(i, resultSet.getLong(1)); + for (int j = 0; j < 3; j++) { + assertEquals(100 + i * 10 + j + 1, resultSet.getLong(j + 2)); + } + } + } + } }
