This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch force_ci/support_schema_evolution in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 12bcb3a06cb23ab058c787e634a9aa6623e30026 Author: Tian Jiang <[email protected]> AuthorDate: Wed Jan 7 19:11:47 2026 +0800 support ReadPointCompactionPerformer with sevo --- .../relational/it/db/it/IoTDBLoadTsFileIT.java | 8 +- .../java/org/apache/iotdb/session/Session.java | 7 +- .../operator/source/AlignedSeriesScanUtil.java | 24 +- .../execution/operator/source/FileLoaderUtils.java | 38 ++- .../execution/operator/source/SeriesScanUtil.java | 39 ++- .../db/storageengine/dataregion/DataRegion.java | 36 ++- .../constant/InnerSeqCompactionPerformer.java | 8 +- .../performer/impl/FastCompactionPerformer.java | 38 ++- .../impl/ReadChunkCompactionPerformer.java | 26 +- .../impl/ReadPointCompactionPerformer.java | 59 ++-- .../task/RepairUnsortedFileCompactionTask.java | 7 +- .../task/subtask/ReadPointPerformerSubTask.java | 7 +- .../execute/utils/CompactionTableSchema.java | 5 +- .../utils/CompactionTableSchemaCollector.java | 7 +- .../execute/utils/MultiTsFileDeviceIterator.java | 94 +++++-- .../utils/ReorderedTsFileDeviceIterator.java | 31 ++- .../utils/TransformedTsFileDeviceIterator.java | 14 +- .../utils/reader/SeriesDataBlockReader.java | 9 +- .../utils/writer/AbstractCompactionWriter.java | 5 +- .../writer/AbstractCrossCompactionWriter.java | 12 +- .../writer/AbstractInnerCompactionWriter.java | 8 +- .../utils/writer/FastCrossCompactionWriter.java | 11 +- .../writer/ReadPointCrossCompactionWriter.java | 11 +- .../compaction/io/CompactionTsFileWriter.java | 33 ++- .../compaction/repair/RepairDataFileScanUtil.java | 2 +- .../dataregion/read/QueryDataSource.java | 16 +- .../dataregion/tsfile/TsFileManager.java | 17 +- .../dataregion/tsfile/TsFileResource.java | 41 ++- .../dataregion/tsfile/evolution/EvolvedSchema.java | 41 ++- .../dataregion/tsfile/fileset/TsFileSet.java | 5 +- .../compaction/AbstractCompactionTest.java | 4 +- .../ReadPointCompactionPerformerTest.java | 307 +++++++++++++++++++++ .../compaction/utils/CompactionCheckerUtils.java | 4 +- 33 files changed, 757 insertions(+), 217 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java index b9241b66182..5a754c6afe8 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java @@ -19,7 +19,6 @@ package org.apache.iotdb.relational.it.db.it; -import java.sql.SQLException; import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.ColumnRename; import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution; import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolutionFile; @@ -50,6 +49,7 @@ import java.io.File; import java.nio.file.Files; import java.sql.Connection; import java.sql.ResultSet; +import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; import java.util.Collections; @@ -328,7 +328,8 @@ public class IoTDBLoadTsFileIT { // cannot query using INT322INT32 try (final ResultSet resultSet = - statement.executeQuery(String.format("select count(%s) from %s", "INT322INT32", SchemaConfig.TABLE_1))) { + statement.executeQuery( + String.format("select count(%s) from %s", "INT322INT32", SchemaConfig.TABLE_1))) { fail(); } catch (SQLException e) { assertEquals("616: Column 'int322int32' cannot be resolved", e.getMessage()); @@ -336,7 +337,8 @@ public class IoTDBLoadTsFileIT { // can query with INT322INT32_NEW try (final ResultSet resultSet = - statement.executeQuery(String.format("select count(%s) from %s", "INT322INT32_NEW", SchemaConfig.TABLE_1))) { + statement.executeQuery( + String.format("select count(%s) from %s", "INT322INT32_NEW", SchemaConfig.TABLE_1))) { if (resultSet.next()) { Assert.assertEquals(lineCount, resultSet.getLong(1)); } else { diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java index 8844f36c247..ea7c7c85cd7 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java @@ -3074,10 +3074,9 @@ public class Session implements ISession { this.columnEncodersMap .getOrDefault( measurementSchema.getType(), - TSEncoding.valueOf( - TSFileDescriptor.getInstance() - .getConfig() - .getValueEncoder(measurementSchema.getType()))) + TSFileDescriptor.getInstance() + .getConfig() + .getValueEncoder(measurementSchema.getType())) .serialize()); } } else { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java index f7ddaee472e..c9d7dfeabbc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java @@ -62,6 +62,15 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil { this(seriesPath, scanOrder, scanOptions, context, false, null); } + public AlignedSeriesScanUtil( + AlignedFullPath seriesPath, + Ordering scanOrder, + SeriesScanOptions scanOptions, + FragmentInstanceContext context, + long maxTsFileSetEndVersion) { + this(seriesPath, scanOrder, scanOptions, context, false, null, maxTsFileSetEndVersion); + } + public AlignedSeriesScanUtil( AlignedFullPath seriesPath, Ordering scanOrder, @@ -69,7 +78,18 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil { FragmentInstanceContext context, boolean queryAllSensors, List<TSDataType> givenDataTypes) { - super(seriesPath, scanOrder, scanOptions, context); + this(seriesPath, scanOrder, scanOptions, context, queryAllSensors, givenDataTypes, Long.MAX_VALUE); + } + + public AlignedSeriesScanUtil( + AlignedFullPath seriesPath, + Ordering scanOrder, + SeriesScanOptions scanOptions, + FragmentInstanceContext context, + boolean queryAllSensors, + List<TSDataType> givenDataTypes, + long maxTsFileSetEndVersion) { + super(seriesPath, scanOrder, scanOptions, context, maxTsFileSetEndVersion); isAligned = true; this.dataTypes = givenDataTypes != null @@ -100,7 +120,7 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil { context, scanOptions.getGlobalTimeFilter(), isSeq, - ignoreAllNullRows); + ignoreAllNullRows, maxTsFileSetEndVersion); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java index da8fd358e97..ffaeb60bab0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.execution.operator.source; +import java.util.stream.Collectors; import org.apache.iotdb.commons.path.AlignedFullPath; import org.apache.iotdb.commons.path.NonAlignedFullPath; import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; @@ -55,6 +56,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Set; +import org.apache.tsfile.write.schema.IMeasurementSchema; import static com.google.common.base.Preconditions.checkArgument; @@ -83,7 +85,8 @@ public class FileLoaderUtils { FragmentInstanceContext context, Filter globalTimeFilter, Set<String> allSensors, - boolean isSeq) + boolean isSeq, + long maxTsFileSetEndVersion) throws IOException { long t1 = System.nanoTime(); boolean loadFromMem = false; @@ -94,9 +97,13 @@ public class FileLoaderUtils { if (resource.isClosed()) { // when resource.getTimeIndexType() == 1, TsFileResource.timeIndexType is deviceTimeIndex // we should not ignore the non-exist of device in TsFileMetadata - EvolvedSchema evolvedSchema = resource.getMergedEvolvedSchema(); + EvolvedSchema evolvedSchema = resource.getMergedEvolvedSchema(maxTsFileSetEndVersion); IDeviceID deviceId = seriesPath.getDeviceId(); String measurement = seriesPath.getMeasurement(); + if (evolvedSchema != null) { + measurement = evolvedSchema.getOriginalColumnName(deviceId.getTableName(), measurement); + deviceId = evolvedSchema.rewriteToOriginal(deviceId); + } timeSeriesMetadata = TimeSeriesMetadataCache.getInstance() @@ -186,7 +193,8 @@ public class FileLoaderUtils { FragmentInstanceContext context, Filter globalTimeFilter, boolean isSeq, - boolean ignoreAllNullRows) + boolean ignoreAllNullRows, + long maxTsFileSetEndVersion) throws IOException { final long t1 = System.nanoTime(); boolean loadFromMem = false; @@ -196,7 +204,7 @@ public class FileLoaderUtils { if (resource.isClosed()) { alignedTimeSeriesMetadata = loadAlignedTimeSeriesMetadataFromDisk( - resource, alignedPath, context, globalTimeFilter, ignoreAllNullRows); + resource, alignedPath, context, globalTimeFilter, ignoreAllNullRows, maxTsFileSetEndVersion); } else { // if the tsfile is unclosed, we just get it directly from TsFileResource loadFromMem = true; alignedTimeSeriesMetadata = @@ -262,7 +270,8 @@ public class FileLoaderUtils { AlignedFullPath alignedPath, FragmentInstanceContext context, Filter globalTimeFilter, - boolean ignoreAllNullRows) + boolean ignoreAllNullRows, + long maxTsFileSetEndVersion) throws IOException { AbstractAlignedTimeSeriesMetadata alignedTimeSeriesMetadata = null; // load all the TimeseriesMetadata of vector, the first one is for time column and the @@ -275,6 +284,15 @@ public class FileLoaderUtils { String filePath = resource.getTsFilePath(); IDeviceID deviceId = alignedPath.getDeviceId(); + EvolvedSchema evolvedSchema = resource.getMergedEvolvedSchema(maxTsFileSetEndVersion); + if (evolvedSchema != null) { + IDeviceID finalDeviceId = deviceId; + valueMeasurementList = valueMeasurementList.stream().map(m -> evolvedSchema.getOriginalColumnName(finalDeviceId.getTableName(), m)).collect( + Collectors.toList()); + allSensors = allSensors.stream().map(m -> evolvedSchema.getOriginalColumnName(finalDeviceId.getTableName(), m)).collect(Collectors.toSet()); + deviceId = evolvedSchema.rewriteToOriginal(deviceId); + } + // when resource.getTimeIndexType() == 1, TsFileResource.timeIndexType is deviceTimeIndex // we should not ignore the non-exist of device in TsFileMetadata TimeseriesMetadata timeColumn = @@ -296,7 +314,7 @@ public class FileLoaderUtils { resource, timeColumn, Collections.emptyList(), - alignedPath, + deviceId, context, globalTimeFilter, false); @@ -325,7 +343,7 @@ public class FileLoaderUtils { resource, timeColumn, valueTimeSeriesMetadataList, - alignedPath, + deviceId, context, globalTimeFilter, ignoreAllNullRows); @@ -339,7 +357,7 @@ public class FileLoaderUtils { TsFileResource resource, TimeseriesMetadata timeColumnMetadata, List<TimeseriesMetadata> valueColumnMetadataList, - AlignedFullPath alignedPath, + IDeviceID deviceID, QueryContext context, Filter globalTimeFilter, boolean ignoreAllNullRows) { @@ -348,7 +366,7 @@ public class FileLoaderUtils { // deal with time column List<ModEntry> timeModifications = context.getPathModifications( - resource, alignedPath.getDeviceId(), timeColumnMetadata.getMeasurementId()); + resource, deviceID, timeColumnMetadata.getMeasurementId()); // all rows are deleted, just return null to skip device data in this file if (ModificationUtils.isAllDeletedByMods( timeModifications, @@ -371,7 +389,7 @@ public class FileLoaderUtils { if (valueColumnMetadata != null) { List<ModEntry> modifications = context.getPathModifications( - resource, alignedPath.getDeviceId(), valueColumnMetadata.getMeasurementId()); + resource, deviceID, valueColumnMetadata.getMeasurementId()); valueColumnMetadata.setModified(!modifications.isEmpty()); valueColumnsModifications.add(modifications); modified = (modified || !modifications.isEmpty()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java index 233d24bf097..b232b1377d6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java @@ -130,6 +130,9 @@ public class SeriesScanUtil implements Accountable { protected final int MAX_NUMBER_OF_POINTS_IN_PAGE = TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage(); + // to restrict the scope of sevo files for compaction + protected final long maxTsFileSetEndVersion; + private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(SeriesScanUtil.class) + RamUsageEstimator.shallowSizeOfInstance(IDeviceID.class) @@ -145,6 +148,15 @@ public class SeriesScanUtil implements Accountable { Ordering scanOrder, SeriesScanOptions scanOptions, FragmentInstanceContext context) { + this(seriesPath, scanOrder, scanOptions, context, Long.MAX_VALUE); + } + + public SeriesScanUtil( + IFullPath seriesPath, + Ordering scanOrder, + SeriesScanOptions scanOptions, + FragmentInstanceContext context, + long maxTsFileSetEndVersion) { this.seriesPath = seriesPath; this.deviceID = seriesPath.getDeviceId(); this.dataType = seriesPath.getSeriesType(); @@ -182,6 +194,8 @@ public class SeriesScanUtil implements Accountable { new PriorityQueue<>( orderUtils.comparingLong( versionPageReader -> orderUtils.getOrderTime(versionPageReader.getStatistics()))); + + this.maxTsFileSetEndVersion = maxTsFileSetEndVersion; } /** @@ -190,7 +204,7 @@ public class SeriesScanUtil implements Accountable { * @param dataSource the query data source */ public void initQueryDataSource(QueryDataSource dataSource) { - dataSource.fillOrderIndexes(deviceID, orderUtils.getAscending()); + dataSource.fillOrderIndexes(deviceID, orderUtils.getAscending(), maxTsFileSetEndVersion); this.dataSource = dataSource; // updated filter concerning TTL @@ -1339,7 +1353,8 @@ public class SeriesScanUtil implements Accountable { context, scanOptions.getGlobalTimeFilter(), scanOptions.getAllSensors(), - isSeq); + isSeq, + maxTsFileSetEndVersion); } public List<TSDataType> getTsDataTypeList() { @@ -1753,26 +1768,26 @@ public class SeriesScanUtil implements Accountable { @Override public boolean hasNextSeqResource() { - while (dataSource.hasNextSeqResource(curSeqFileIndex, false, deviceID)) { + while (dataSource.hasNextSeqResource(curSeqFileIndex, false, deviceID, maxTsFileSetEndVersion)) { if (dataSource.isSeqSatisfied( - deviceID, curSeqFileIndex, scanOptions.getGlobalTimeFilter(), false)) { + deviceID, curSeqFileIndex, scanOptions.getGlobalTimeFilter(), false, maxTsFileSetEndVersion)) { break; } curSeqFileIndex--; } - return dataSource.hasNextSeqResource(curSeqFileIndex, false, deviceID); + return dataSource.hasNextSeqResource(curSeqFileIndex, false, deviceID, maxTsFileSetEndVersion); } @Override public boolean hasNextUnseqResource() { - while (dataSource.hasNextUnseqResource(curUnseqFileIndex, false, deviceID)) { + while (dataSource.hasNextUnseqResource(curUnseqFileIndex, false, deviceID, maxTsFileSetEndVersion)) { if (dataSource.isUnSeqSatisfied( deviceID, curUnseqFileIndex, scanOptions.getGlobalTimeFilter(), false)) { break; } curUnseqFileIndex++; } - return dataSource.hasNextUnseqResource(curUnseqFileIndex, false, deviceID); + return dataSource.hasNextUnseqResource(curUnseqFileIndex, false, deviceID, maxTsFileSetEndVersion); } @Override @@ -1882,26 +1897,26 @@ public class SeriesScanUtil implements Accountable { @Override public boolean hasNextSeqResource() { - while (dataSource.hasNextSeqResource(curSeqFileIndex, true, deviceID)) { + while (dataSource.hasNextSeqResource(curSeqFileIndex, true, deviceID, maxTsFileSetEndVersion)) { if (dataSource.isSeqSatisfied( - deviceID, curSeqFileIndex, scanOptions.getGlobalTimeFilter(), false)) { + deviceID, curSeqFileIndex, scanOptions.getGlobalTimeFilter(), false, maxTsFileSetEndVersion)) { break; } curSeqFileIndex++; } - return dataSource.hasNextSeqResource(curSeqFileIndex, true, deviceID); + return dataSource.hasNextSeqResource(curSeqFileIndex, true, deviceID, maxTsFileSetEndVersion); } @Override public boolean hasNextUnseqResource() { - while (dataSource.hasNextUnseqResource(curUnseqFileIndex, true, deviceID)) { + while (dataSource.hasNextUnseqResource(curUnseqFileIndex, true, deviceID, maxTsFileSetEndVersion)) { if (dataSource.isUnSeqSatisfied( deviceID, curUnseqFileIndex, scanOptions.getGlobalTimeFilter(), false)) { break; } curUnseqFileIndex++; } - return dataSource.hasNextUnseqResource(curUnseqFileIndex, true, deviceID); + return dataSource.hasNextUnseqResource(curUnseqFileIndex, true, deviceID, maxTsFileSetEndVersion); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index e64350500a3..201007dcdb3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -685,7 +685,8 @@ public class DataRegion implements IDataRegionForQuery { partitionFiles.getValue(), fileTimeIndexMap, false, - recoveredPartitionTsFileSetMap, partitionMinimalVersion); + recoveredPartitionTsFileSetMap, + partitionMinimalVersion); if (asyncRecoverTask != null) { asyncTsFileResourceRecoverTaskList.add(asyncRecoverTask); } @@ -705,7 +706,8 @@ public class DataRegion implements IDataRegionForQuery { for (Entry<Long, List<TsFileSet>> entry : recoveredPartitionTsFileSetMap.entrySet()) { long partitionId = entry.getKey(); // if no file in the partition, all filesets should be cleared - long minimumFileVersion = partitionMinimalVersion.getOrDefault(partitionId, Long.MAX_VALUE); + long minimumFileVersion = + partitionMinimalVersion.getOrDefault(partitionId, Long.MAX_VALUE); for (TsFileSet tsFileSet : entry.getValue()) { if (tsFileSet.getEndVersion() < minimumFileVersion) { tsFileSet.remove(); @@ -1056,6 +1058,7 @@ public class DataRegion implements IDataRegionForQuery { tsFileSet = new TsFileSet( Long.parseLong(fileSet.getName()), fileSetDir.getAbsolutePath(), true); + tsFileManager.addTsFileSet(tsFileSet, partitionId); } catch (NumberFormatException e) { continue; } @@ -1077,30 +1080,23 @@ public class DataRegion implements IDataRegionForQuery { List<TsFileResource> resourceList, Map<TsFileID, FileTimeIndex> fileTimeIndexMap, boolean isSeq, - Map<Long, List<TsFileSet>> partitionTsFileSetMap, Map<Long, Long> partitionMinimalVersion) { + Map<Long, List<TsFileSet>> partitionTsFileSetMap, + Map<Long, Long> partitionMinimalVersion) { List<TsFileResource> resourceListForAsyncRecover = new ArrayList<>(); List<TsFileResource> resourceListForSyncRecover = new ArrayList<>(); Callable<Void> asyncRecoverTask = null; - List<TsFileSet> tsFileSets = recoverTsFileSets(partitionId, partitionTsFileSetMap); + recoverTsFileSets(partitionId, partitionTsFileSetMap); for (TsFileResource tsFileResource : resourceList) { long fileVersion = tsFileResource.getTsFileID().fileVersion; - partitionMinimalVersion.compute(partitionId, (pid, oldVersion) -> { - if (oldVersion == null) { - return fileVersion; - } - return Math.min(oldVersion, fileVersion); - }); - - int i = Collections.binarySearch(tsFileSets, TsFileSet.comparatorKey(fileVersion)); - if (i < 0) { - // if the binary search does not find an exact match, -i indicates the closest one - i = -i; - } - if (i < tsFileSets.size()) { - List<TsFileSet> containedSets = tsFileSets.subList(i, tsFileSets.size()); - containedSets.forEach(tsFileResource::addFileSet); - } + partitionMinimalVersion.compute( + partitionId, + (pid, oldVersion) -> { + if (oldVersion == null) { + return fileVersion; + } + return Math.min(oldVersion, fileVersion); + }); tsFileManager.add(tsFileResource, isSeq); if (fileTimeIndexMap.containsKey(tsFileResource.getTsFileID()) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/constant/InnerSeqCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/constant/InnerSeqCompactionPerformer.java index 0404ec65e75..f64597e25d4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/constant/InnerSeqCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/constant/InnerSeqCompactionPerformer.java @@ -25,9 +25,7 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer; -import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.tsfile.encrypt.EncryptParameter; -import org.apache.tsfile.utils.Pair; public enum InnerSeqCompactionPerformer { READ_CHUNK, @@ -56,12 +54,12 @@ public enum InnerSeqCompactionPerformer { } } - public ISeqCompactionPerformer createInstance(EncryptParameter encryptParameter, Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) { + public ISeqCompactionPerformer createInstance(EncryptParameter encryptParameter) { switch (this) { case READ_CHUNK: - return new ReadChunkCompactionPerformer(encryptParameter, maxTsFileSetEndVersionAndMinResource); + return new ReadChunkCompactionPerformer(encryptParameter); case FAST: - return new FastCompactionPerformer(false, encryptParameter, maxTsFileSetEndVersionAndMinResource); + return new FastCompactionPerformer(false, encryptParameter); default: throw new IllegalCompactionPerformerException( "Illegal compaction performer for seq inner compaction " + this); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java index a45d0deb5b2..568db7081bc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java @@ -19,8 +19,6 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl; -import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PatternTreeMap; @@ -74,6 +72,8 @@ import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.stream.Collectors; +import java.util.stream.Stream; public class FastCompactionPerformer implements ICrossCompactionPerformer, ISeqCompactionPerformer, IUnseqCompactionPerformer { @@ -108,7 +108,7 @@ public class FastCompactionPerformer public FastCompactionPerformer( List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles, - List<TsFileResource> targetFiles, Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) { + List<TsFileResource> targetFiles) { this.seqFiles = seqFiles; this.unseqFiles = unseqFiles; this.targetFiles = targetFiles; @@ -122,14 +122,14 @@ public class FastCompactionPerformer new EncryptParameter( TSFileDescriptor.getInstance().getConfig().getEncryptType(), TSFileDescriptor.getInstance().getConfig().getEncryptKey()); - this.maxTsFileSetEndVersionAndMinResource = maxTsFileSetEndVersionAndMinResource; + this.maxTsFileSetEndVersionAndMinResource = new Pair<>(Long.MIN_VALUE, null); } public FastCompactionPerformer( List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles, List<TsFileResource> targetFiles, - EncryptParameter encryptParameter, Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) { + EncryptParameter encryptParameter) { this.seqFiles = seqFiles; this.unseqFiles = unseqFiles; this.targetFiles = targetFiles; @@ -140,7 +140,9 @@ public class FastCompactionPerformer isCrossCompaction = true; } this.encryptParameter = encryptParameter; - this.maxTsFileSetEndVersionAndMinResource = maxTsFileSetEndVersionAndMinResource; + this.maxTsFileSetEndVersionAndMinResource = + TsFileResource.getMaxTsFileSetEndVersionAndMinResource( + Stream.concat(seqFiles.stream(), unseqFiles.stream()).collect(Collectors.toList())); } @TestOnly @@ -162,22 +164,30 @@ public class FastCompactionPerformer @Override public void perform() throws Exception { this.subTaskSummary.setTemporalFileNum(targetFiles.size()); + List<TsFileResource> allSourceFiles = + Stream.concat(seqFiles.stream(), unseqFiles.stream()) + .sorted(TsFileResource::compareFileName) + .collect(Collectors.toList()); + Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource = + TsFileResource.getMaxTsFileSetEndVersionAndMinResource(allSourceFiles); + try (MultiTsFileDeviceIterator deviceIterator = new MultiTsFileDeviceIterator(seqFiles, unseqFiles, readerCacheMap); AbstractCompactionWriter compactionWriter = isCrossCompaction ? new FastCrossCompactionWriter( - targetFiles, seqFiles, readerCacheMap, encryptParameter) + targetFiles, + seqFiles, + readerCacheMap, + encryptParameter, + maxTsFileSetEndVersionAndMinResource.left) : new FastInnerCompactionWriter(targetFiles, encryptParameter)) { - - List<TsFileResource> allSourceFiles = Stream.concat(seqFiles.stream(), unseqFiles.stream()) - .sorted(TsFileResource::compareFileName) - .collect(Collectors.toList()); - Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource = TsFileResource.getMaxTsFileSetEndVersionAndMinResource( - allSourceFiles); List<Schema> schemas = CompactionTableSchemaCollector.collectSchema( - seqFiles, unseqFiles, readerCacheMap, deviceIterator.getDeprecatedTableSchemaMap(), + seqFiles, + unseqFiles, + readerCacheMap, + deviceIterator.getDeprecatedTableSchemaMap(), maxTsFileSetEndVersionAndMinResource); compactionWriter.setSchemaForAllTargetFile(schemas, maxTsFileSetEndVersionAndMinResource); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java index 4280c13b24f..904b17567d9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java @@ -71,7 +71,7 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer { private Schema schema = null; private final EncryptParameter firstEncryptParameter; - protected final Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource; + protected Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource; @TestOnly public ReadChunkCompactionPerformer(List<TsFileResource> sourceFiles, TsFileResource targetFile) { @@ -81,9 +81,8 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer { public ReadChunkCompactionPerformer( List<TsFileResource> sourceFiles, TsFileResource targetFile, - EncryptParameter encryptParameter, - Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) { - this(sourceFiles, Collections.singletonList(targetFile), encryptParameter, maxTsFileSetEndVersionAndMinResource); + EncryptParameter encryptParameter) { + this(sourceFiles, Collections.singletonList(targetFile), encryptParameter); } @TestOnly @@ -98,12 +97,12 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer { public ReadChunkCompactionPerformer( List<TsFileResource> sourceFiles, List<TsFileResource> targetFiles, - EncryptParameter encryptParameter, - Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) { + EncryptParameter encryptParameter) { setSourceFiles(sourceFiles); setTargetFiles(targetFiles); this.firstEncryptParameter = encryptParameter; - this.maxTsFileSetEndVersionAndMinResource = maxTsFileSetEndVersionAndMinResource; + this.maxTsFileSetEndVersionAndMinResource = + TsFileResource.getMaxTsFileSetEndVersionAndMinResource(sourceFiles); } @TestOnly @@ -114,10 +113,11 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer { } public ReadChunkCompactionPerformer( - List<TsFileResource> sourceFiles, EncryptParameter encryptParameter, Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) { + List<TsFileResource> sourceFiles, EncryptParameter encryptParameter) { setSourceFiles(sourceFiles); this.firstEncryptParameter = encryptParameter; - this.maxTsFileSetEndVersionAndMinResource = maxTsFileSetEndVersionAndMinResource; + this.maxTsFileSetEndVersionAndMinResource = + TsFileResource.getMaxTsFileSetEndVersionAndMinResource(sourceFiles); } @TestOnly @@ -129,9 +129,8 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer { this.maxTsFileSetEndVersionAndMinResource = new Pair<>(Long.MIN_VALUE, null); } - public ReadChunkCompactionPerformer(EncryptParameter encryptParameter, Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) { + public ReadChunkCompactionPerformer(EncryptParameter encryptParameter) { this.firstEncryptParameter = encryptParameter; - this.maxTsFileSetEndVersionAndMinResource = maxTsFileSetEndVersionAndMinResource; } @Override @@ -218,7 +217,8 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer { targetResources.get(currentTargetFileIndex), memoryBudgetForFileWriter, CompactionType.INNER_SEQ_COMPACTION, - firstEncryptParameter, maxTsFileSetEndVersionAndMinResource.getLeft()); + firstEncryptParameter, + maxTsFileSetEndVersionAndMinResource.getLeft()); currentWriter.setSchema(CompactionTableSchemaCollector.copySchema(schema)); } @@ -357,6 +357,8 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer { @Override public void setSourceFiles(List<TsFileResource> seqFiles) { this.seqFiles = seqFiles; + this.maxTsFileSetEndVersionAndMinResource = + TsFileResource.getMaxTsFileSetEndVersionAndMinResource(seqFiles); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java index 9b18cee6839..b7c143808c6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java @@ -18,9 +18,7 @@ */ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl; -import java.util.stream.Stream; import org.apache.iotdb.commons.conf.IoTDBConstant; -import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.path.AlignedFullPath; import org.apache.iotdb.commons.path.IFullPath; import org.apache.iotdb.commons.path.NonAlignedFullPath; @@ -72,6 +70,7 @@ import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.stream.Collectors; +import java.util.stream.Stream; public class ReadPointCompactionPerformer implements ICrossCompactionPerformer, IUnseqCompactionPerformer { @@ -154,11 +153,12 @@ public class ReadPointCompactionPerformer // Do not close device iterator, because tsfile reader is managed by FileReaderManager. MultiTsFileDeviceIterator deviceIterator = new MultiTsFileDeviceIterator(seqFiles, unseqFiles); - List<TsFileResource> allSourceFiles = Stream.concat(seqFiles.stream(), unseqFiles.stream()) - .sorted(TsFileResource::compareFileName) - .collect(Collectors.toList()); - Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource = TsFileResource.getMaxTsFileSetEndVersionAndMinResource( - allSourceFiles); + List<TsFileResource> allSourceFiles = + Stream.concat(seqFiles.stream(), unseqFiles.stream()) + .sorted(TsFileResource::compareFileName) + .collect(Collectors.toList()); + Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource = + TsFileResource.getMaxTsFileSetEndVersionAndMinResource(allSourceFiles); List<Schema> schemas = CompactionTableSchemaCollector.collectSchema( @@ -174,14 +174,24 @@ public class ReadPointCompactionPerformer Pair<IDeviceID, Boolean> deviceInfo = deviceIterator.nextDevice(); IDeviceID device = deviceInfo.left; boolean isAligned = deviceInfo.right; - queryDataSource.fillOrderIndexes(device, true); + queryDataSource.fillOrderIndexes(device, true, maxTsFileSetEndVersionAndMinResource.left); if (isAligned) { compactAlignedSeries( - device, deviceIterator, compactionWriter, fragmentInstanceContext, queryDataSource, maxTsFileSetEndVersionAndMinResource); + device, + deviceIterator, + compactionWriter, + fragmentInstanceContext, + queryDataSource, + maxTsFileSetEndVersionAndMinResource); } else { compactNonAlignedSeries( - device, deviceIterator, compactionWriter, fragmentInstanceContext, queryDataSource, maxTsFileSetEndVersionAndMinResource); + device, + deviceIterator, + compactionWriter, + fragmentInstanceContext, + queryDataSource, + maxTsFileSetEndVersionAndMinResource); } summary.setTemporaryFileSize(compactionWriter.getWriterSize()); } @@ -219,8 +229,9 @@ public class ReadPointCompactionPerformer FragmentInstanceContext fragmentInstanceContext, QueryDataSource queryDataSource, Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) - throws IOException, MetadataException { - Map<String, MeasurementSchema> schemaMap = deviceIterator.getAllSchemasOfCurrentDevice(maxTsFileSetEndVersionAndMinResource); + throws IOException { + Map<String, MeasurementSchema> schemaMap = + deviceIterator.getAllSchemasOfCurrentDevice(maxTsFileSetEndVersionAndMinResource); IMeasurementSchema timeSchema = schemaMap.remove(TsFileConstant.TIME_COLUMN_ID); List<IMeasurementSchema> measurementSchemas = new ArrayList<>(schemaMap.values()); if (measurementSchemas.isEmpty()) { @@ -240,7 +251,8 @@ public class ReadPointCompactionPerformer new ArrayList<>(schemaMap.keySet()), fragmentInstanceContext, queryDataSource, - true); + true, + maxTsFileSetEndVersionAndMinResource.left); if (dataBlockReader.hasNextBatch()) { compactionWriter.startChunkGroup(device, true); @@ -267,8 +279,8 @@ public class ReadPointCompactionPerformer QueryDataSource queryDataSource, Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) throws IOException, InterruptedException, ExecutionException { - Map<String, MeasurementSchema> schemaMap = deviceIterator.getAllSchemasOfCurrentDevice( - maxTsFileSetEndVersionAndMinResource); + Map<String, MeasurementSchema> schemaMap = + deviceIterator.getAllSchemasOfCurrentDevice(maxTsFileSetEndVersionAndMinResource); List<String> allMeasurements = new ArrayList<>(schemaMap.keySet()); allMeasurements.sort((String::compareTo)); int subTaskNums = Math.min(allMeasurements.size(), SUB_TASK_NUM); @@ -297,7 +309,7 @@ public class ReadPointCompactionPerformer new QueryDataSource(queryDataSource), compactionWriter, schemaMap, - i))); + i, maxTsFileSetEndVersionAndMinResource.left))); } for (Future<Void> future : futures) { future.get(); @@ -321,7 +333,8 @@ public class ReadPointCompactionPerformer List<String> allSensors, FragmentInstanceContext fragmentInstanceContext, QueryDataSource queryDataSource, - boolean isAlign) { + boolean isAlign, + long maxTsFileSetEndVersion) { IFullPath seriesPath; if (isAlign) { seriesPath = new AlignedFullPath(deviceId, measurementIds, measurementSchemas); @@ -330,7 +343,7 @@ public class ReadPointCompactionPerformer } return new SeriesDataBlockReader( - seriesPath, new HashSet<>(allSensors), fragmentInstanceContext, queryDataSource, true); + seriesPath, new HashSet<>(allSensors), fragmentInstanceContext, queryDataSource, true, maxTsFileSetEndVersion); } @SuppressWarnings("squid:S1172") @@ -361,8 +374,16 @@ public class ReadPointCompactionPerformer throws IOException { if (!seqFileResources.isEmpty() && !unseqFileResources.isEmpty()) { // cross space + List<TsFileResource> allSourceFiles = + Stream.concat(seqFileResources.stream(), unseqFileResources.stream()) + .collect(Collectors.toList()); + Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource = + TsFileResource.getMaxTsFileSetEndVersionAndMinResource(allSourceFiles); return new ReadPointCrossCompactionWriter( - targetFileResources, seqFileResources, encryptParameter); + targetFileResources, + seqFileResources, + encryptParameter, + maxTsFileSetEndVersionAndMinResource.left); } else { // inner space return new ReadPointInnerCompactionWriter(targetFileResources, encryptParameter); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java index dba8b5e4f2a..d027957c6e6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java @@ -121,11 +121,10 @@ public class RepairUnsortedFileCompactionTask extends InnerSpaceCompactionTask { @Override protected void calculateSourceFilesAndTargetFiles() throws IOException { filesView.sourceFilesInLog = filesView.sourceFilesInCompactionPerformer; - TsFileResource targetResource = new TsFileResource(generateTargetFile(), - TsFileResourceStatus.COMPACTING); + TsFileResource targetResource = + new TsFileResource(generateTargetFile(), TsFileResourceStatus.COMPACTING); targetResource.setTsFileManager(tsFileManager); - filesView.targetFilesInLog = - Collections.singletonList(targetResource); + filesView.targetFilesInLog = Collections.singletonList(targetResource); filesView.targetFilesInPerformer = filesView.targetFilesInLog; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/subtask/ReadPointPerformerSubTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/subtask/ReadPointPerformerSubTask.java index 74f72590745..ed967030b52 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/subtask/ReadPointPerformerSubTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/subtask/ReadPointPerformerSubTask.java @@ -57,6 +57,7 @@ public class ReadPointPerformerSubTask implements Callable<Void> { private final AbstractCompactionWriter compactionWriter; private final Map<String, MeasurementSchema> schemaMap; private final int taskId; + private final long maxTsFileSetEndVersion; public ReadPointPerformerSubTask( IDeviceID device, @@ -65,7 +66,7 @@ public class ReadPointPerformerSubTask implements Callable<Void> { QueryDataSource queryDataSource, AbstractCompactionWriter compactionWriter, Map<String, MeasurementSchema> schemaMap, - int taskId) { + int taskId, long maxTsFileSetEndVersion) { this.device = device; this.measurementList = measurementList; this.fragmentInstanceContext = fragmentInstanceContext; @@ -73,6 +74,7 @@ public class ReadPointPerformerSubTask implements Callable<Void> { this.compactionWriter = compactionWriter; this.schemaMap = schemaMap; this.taskId = taskId; + this.maxTsFileSetEndVersion = maxTsFileSetEndVersion; } @Override @@ -88,7 +90,8 @@ public class ReadPointPerformerSubTask implements Callable<Void> { new ArrayList<>(schemaMap.keySet()), fragmentInstanceContext, queryDataSource, - false); + false, + maxTsFileSetEndVersion); if (dataBlockReader.hasNextBatch()) { compactionWriter.startMeasurement( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchema.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchema.java index 99c4286c759..8d43305d94e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchema.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchema.java @@ -35,9 +35,12 @@ public class CompactionTableSchema extends TableSchema { public CompactionTableSchema(TableSchema tableSchema) { this(tableSchema.getTableName(), tableSchema.getColumnSchemas(), tableSchema.getColumnTypes()); + this.updatable = tableSchema.isUpdatable(); } - public CompactionTableSchema(String tableName, List<IMeasurementSchema> columnSchemas, + public CompactionTableSchema( + String tableName, + List<IMeasurementSchema> columnSchemas, List<ColumnCategory> columnCategories) { super(tableName, columnSchemas, columnCategories); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchemaCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchemaCollector.java index d5eb8d08d98..2c2f34e0fca 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchemaCollector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchemaCollector.java @@ -20,8 +20,8 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; - import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchema; + import org.apache.tsfile.file.metadata.TableSchema; import org.apache.tsfile.read.TsFileSequenceReader; import org.apache.tsfile.utils.Pair; @@ -91,8 +91,8 @@ public class CompactionTableSchemaCollector { continue; } - EvolvedSchema evolvedSchema = resource.getMergedEvolvedSchema( - maxTsFileSetEndVersionAndAssociatedResource.getLeft()); + EvolvedSchema evolvedSchema = + resource.getMergedEvolvedSchema(maxTsFileSetEndVersionAndAssociatedResource.getLeft()); for (Map.Entry<String, TableSchema> entry : tableSchemaMap.entrySet()) { String tableName = entry.getKey(); @@ -102,6 +102,7 @@ public class CompactionTableSchemaCollector { } if (evolvedSchema != null) { currentTableSchema = evolvedSchema.rewriteToFinal(currentTableSchema); + tableName = currentTableSchema.getTableName(); } // merge all id columns, measurement schema will be generated automatically when end chunk diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java index a6136d7bd2f..067cb3602a5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java @@ -97,12 +97,18 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { // sort the files from the newest to the oldest Collections.sort( this.tsFileResourcesSortedByDesc, TsFileResource::compareFileCreationOrderByDesc); - long maxTsFileSetEndVersion = this.tsFileResourcesSortedByDesc.stream().mapToLong( - // max endVersion of all filesets of a TsFile - resource -> resource.getTsFileSets().stream().mapToLong(TsFileSet::getEndVersion).max() - .orElse(Long.MAX_VALUE)) - // overall max endVersion - .max().orElse(Long.MAX_VALUE); + long maxTsFileSetEndVersion = + this.tsFileResourcesSortedByDesc.stream() + .mapToLong( + // max endVersion of all filesets of a TsFile + resource -> + resource.getTsFileSets().stream() + .mapToLong(TsFileSet::getEndVersion) + .max() + .orElse(Long.MAX_VALUE)) + // overall max endVersion + .max() + .orElse(Long.MAX_VALUE); try { for (TsFileResource tsFileResource : this.tsFileResourcesSortedByDesc) { CompactionTsFileReader reader = @@ -114,8 +120,8 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { TsFileDeviceIterator tsFileDeviceIterator; EvolvedSchema evolvedSchema = tsFileResource.getMergedEvolvedSchema(maxTsFileSetEndVersion); if (evolvedSchema != null) { - tsFileDeviceIterator = new ReorderedTsFileDeviceIterator(reader, - evolvedSchema::rewriteToFinal); + tsFileDeviceIterator = + new ReorderedTsFileDeviceIterator(reader, evolvedSchema::rewriteToFinal); } else { tsFileDeviceIterator = reader.getAllDevicesIteratorWithIsAligned(); } @@ -144,12 +150,35 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { // sort the files from the newest to the oldest Collections.sort( this.tsFileResourcesSortedByDesc, TsFileResource::compareFileCreationOrderByDesc); + + long maxTsFileSetEndVersion = + this.tsFileResourcesSortedByDesc.stream() + .mapToLong( + // max endVersion of all filesets of a TsFile + resource -> + resource.getTsFileSets().stream() + .mapToLong(TsFileSet::getEndVersion) + .max() + .orElse(Long.MAX_VALUE)) + // overall max endVersion + .max() + .orElse(Long.MAX_VALUE); + for (TsFileResource tsFileResource : tsFileResourcesSortedByDesc) { TsFileSequenceReader reader = FileReaderManager.getInstance() .get(tsFileResource.getTsFilePath(), tsFileResource.getTsFileID(), true); readerMap.put(tsFileResource, reader); - deviceIteratorMap.put(tsFileResource, reader.getAllDevicesIteratorWithIsAligned()); + + TsFileDeviceIterator tsFileDeviceIterator; + EvolvedSchema evolvedSchema = tsFileResource.getMergedEvolvedSchema(maxTsFileSetEndVersion); + if (evolvedSchema != null) { + tsFileDeviceIterator = + new ReorderedTsFileDeviceIterator(reader, evolvedSchema::rewriteToFinal); + } else { + tsFileDeviceIterator = reader.getAllDevicesIteratorWithIsAligned(); + } + deviceIteratorMap.put(tsFileResource, tsFileDeviceIterator); } } @@ -171,6 +200,19 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { this.tsFileResourcesSortedByDesc, TsFileResource::compareFileCreationOrderByDesc); this.readerMap = readerMap; + long maxTsFileSetEndVersion = + this.tsFileResourcesSortedByDesc.stream() + .mapToLong( + // max endVersion of all filesets of a TsFile + resource -> + resource.getTsFileSets().stream() + .mapToLong(TsFileSet::getEndVersion) + .max() + .orElse(Long.MAX_VALUE)) + // overall max endVersion + .max() + .orElse(Long.MAX_VALUE); + CompactionType type = null; if (!seqResources.isEmpty() && !unseqResources.isEmpty()) { type = CompactionType.CROSS_COMPACTION; @@ -187,11 +229,20 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { type, EncryptDBUtils.getFirstEncryptParamFromTSFilePath(tsFileResource.getTsFilePath())); readerMap.put(tsFileResource, reader); - deviceIteratorMap.put(tsFileResource, reader.getAllDevicesIteratorWithIsAligned()); + + TsFileDeviceIterator tsFileDeviceIterator; + EvolvedSchema evolvedSchema = tsFileResource.getMergedEvolvedSchema(maxTsFileSetEndVersion); + if (evolvedSchema != null) { + tsFileDeviceIterator = + new ReorderedTsFileDeviceIterator(reader, evolvedSchema::rewriteToFinal); + } else { + tsFileDeviceIterator = reader.getAllDevicesIteratorWithIsAligned(); + } + deviceIteratorMap.put(tsFileResource, tsFileDeviceIterator); } } - public boolean hasNextDevice() { + public boolean hasNextDevice() throws IOException { boolean hasNext = false; for (TsFileDeviceIterator iterator : deviceIteratorMap.values()) { hasNext = @@ -212,7 +263,7 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { * @return Pair of device full path and whether this device is aligned */ @SuppressWarnings({"squid:S135", "java:S2259"}) - public Pair<IDeviceID, Boolean> nextDevice() throws IllegalPathException { + public Pair<IDeviceID, Boolean> nextDevice() throws IllegalPathException, IOException { List<TsFileResource> toBeRemovedResources = new LinkedList<>(); Pair<IDeviceID, Boolean> minDevice = null; // get the device from source files sorted from the newest to the oldest by version @@ -292,24 +343,25 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { timeseriesMetadataList, deviceIteratorMap.get(resource).getFirstMeasurementNodeOfCurrentDevice(), schemaMap.keySet(), - true); - EvolvedSchema evolvedSchema = resource.getMergedEvolvedSchema( - maxTsFileSetEndVersionAndMinResource.left); + true, + null); + EvolvedSchema evolvedSchema = + resource.getMergedEvolvedSchema(maxTsFileSetEndVersionAndMinResource.left); if (evolvedSchema != null) { // the device has been rewritten, should get the original name for rewriting - evolvedSchema.rewriteToFinal(evolvedSchema.getOriginalTableName(currentDevice.left.getTableName()), timeseriesMetadataList); + evolvedSchema.rewriteToFinal( + evolvedSchema.getOriginalTableName(currentDevice.left.getTableName()), + timeseriesMetadataList); } for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) { if (!schemaMap.containsKey(timeseriesMetadata.getMeasurementId()) && !timeseriesMetadata.getChunkMetadataList().isEmpty()) { - MeasurementSchema measurementSchema = reader.getMeasurementSchema( - timeseriesMetadata.getChunkMetadataList()); + MeasurementSchema measurementSchema = + reader.getMeasurementSchema(timeseriesMetadata.getChunkMetadataList()); // the column may be renamed measurementSchema.setMeasurementName(timeseriesMetadata.getMeasurementId()); - schemaMap.put( - timeseriesMetadata.getMeasurementId(), - measurementSchema); + schemaMap.put(timeseriesMetadata.getMeasurementId(), measurementSchema); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/ReorderedTsFileDeviceIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/ReorderedTsFileDeviceIterator.java index 14d2db8a191..3b9bf6ab8da 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/ReorderedTsFileDeviceIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/ReorderedTsFileDeviceIterator.java @@ -1,40 +1,43 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.MetadataIndexNode; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.utils.Pair; + import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.function.Function; -import org.apache.tsfile.file.metadata.IDeviceID; -import org.apache.tsfile.file.metadata.MetadataIndexNode; -import org.apache.tsfile.read.TsFileSequenceReader; -import org.apache.tsfile.utils.Pair; public class ReorderedTsFileDeviceIterator extends TransformedTsFileDeviceIterator { - private final List<Pair<Pair<IDeviceID, Boolean>, MetadataIndexNode>> deviceIDAndFirstMeasurementNodeList = new ArrayList<>(); + private final List<Pair<Pair<IDeviceID, Boolean>, MetadataIndexNode>> + deviceIDAndFirstMeasurementNodeList = new ArrayList<>(); private Iterator<Pair<Pair<IDeviceID, Boolean>, MetadataIndexNode>> deviceIDListIterator; private Pair<Pair<IDeviceID, Boolean>, MetadataIndexNode> current; - public ReorderedTsFileDeviceIterator(TsFileSequenceReader reader, - Function<IDeviceID, IDeviceID> transformer) - throws IOException { + public ReorderedTsFileDeviceIterator( + TsFileSequenceReader reader, Function<IDeviceID, IDeviceID> transformer) throws IOException { super(reader, transformer); collectAndSort(); } - public ReorderedTsFileDeviceIterator(TsFileSequenceReader reader, String tableName, - Function<IDeviceID, IDeviceID> transformer) throws IOException { + public ReorderedTsFileDeviceIterator( + TsFileSequenceReader reader, String tableName, Function<IDeviceID, IDeviceID> transformer) + throws IOException { super(reader, tableName, transformer); collectAndSort(); } - private void collectAndSort() { + private void collectAndSort() throws IOException { while (super.hasNext()) { Pair<IDeviceID, Boolean> next = super.next(); next.left = transformer.apply(next.left); - deviceIDAndFirstMeasurementNodeList.add(new Pair<>(next, super.getFirstMeasurementNodeOfCurrentDevice())); + deviceIDAndFirstMeasurementNodeList.add( + new Pair<>(next, super.getFirstMeasurementNodeOfCurrentDevice())); } deviceIDAndFirstMeasurementNodeList.sort(Comparator.comparing(p -> p.getLeft().getLeft())); deviceIDListIterator = deviceIDAndFirstMeasurementNodeList.iterator(); @@ -54,12 +57,12 @@ public class ReorderedTsFileDeviceIterator extends TransformedTsFileDeviceIterat @Override public Pair<IDeviceID, Boolean> current() { - return current.left; + return current == null ? null : current.left; } @Override public MetadataIndexNode getFirstMeasurementNodeOfCurrentDevice() { // the devices have been reordered, cannot use the measurementNode - return current.right; + return current == null ? null : current.right; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/TransformedTsFileDeviceIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/TransformedTsFileDeviceIterator.java index f1af028226d..a361adb18e6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/TransformedTsFileDeviceIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/TransformedTsFileDeviceIterator.java @@ -19,26 +19,28 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils; -import java.io.IOException; -import java.util.function.Function; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.read.TsFileDeviceIterator; import org.apache.tsfile.read.TsFileSequenceReader; import org.apache.tsfile.utils.Pair; +import java.io.IOException; +import java.util.function.Function; + public class TransformedTsFileDeviceIterator extends TsFileDeviceIterator { protected Function<IDeviceID, IDeviceID> transformer; - public TransformedTsFileDeviceIterator(TsFileSequenceReader reader, Function<IDeviceID, IDeviceID> transformer) - throws IOException { + public TransformedTsFileDeviceIterator( + TsFileSequenceReader reader, Function<IDeviceID, IDeviceID> transformer) throws IOException { super(reader); this.transformer = transformer; } - public TransformedTsFileDeviceIterator(TsFileSequenceReader reader, String tableName, Function<IDeviceID, IDeviceID> transformer) + public TransformedTsFileDeviceIterator( + TsFileSequenceReader reader, String tableName, Function<IDeviceID, IDeviceID> transformer) throws IOException { - super(reader, tableName); + super(reader, tableName, null); this.transformer = transformer; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/reader/SeriesDataBlockReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/reader/SeriesDataBlockReader.java index a49b97c8f52..9c78d866852 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/reader/SeriesDataBlockReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/reader/SeriesDataBlockReader.java @@ -53,7 +53,8 @@ public class SeriesDataBlockReader implements IDataBlockReader { Set<String> allSensors, FragmentInstanceContext context, QueryDataSource dataSource, - boolean ascending) { + boolean ascending, + long maxTsFileSetEndVersion) { SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder(); scanOptionsBuilder.withAllSensors(allSensors); @@ -63,14 +64,16 @@ public class SeriesDataBlockReader implements IDataBlockReader { (AlignedFullPath) seriesPath, ascending ? Ordering.ASC : Ordering.DESC, scanOptionsBuilder.build(), - context); + context, + maxTsFileSetEndVersion); } else if (seriesPath instanceof NonAlignedFullPath) { this.seriesScanUtil = new SeriesScanUtil( seriesPath, ascending ? Ordering.ASC : Ordering.DESC, scanOptionsBuilder.build(), - context); + context, + maxTsFileSetEndVersion); } else { throw new IllegalArgumentException("Should call exact sub class!"); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java index 623f2b3287d..63f148e5a58 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java @@ -27,8 +27,8 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.exe import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.flushcontroller.AbstractCompactionFlushController; import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileWriter; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; - import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; + import org.apache.tsfile.encrypt.EncryptParameter; import org.apache.tsfile.exception.write.PageException; import org.apache.tsfile.file.header.PageHeader; @@ -340,5 +340,6 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { } } - public abstract void setSchemaForAllTargetFile(List<Schema> schemas, Pair<Long, TsFileResource> maxTsFileSetEndVersionAndAssociatedResource); + public abstract void setSchemaForAllTargetFile( + List<Schema> schemas, Pair<Long, TsFileResource> maxTsFileSetEndVersionAndAssociatedResource); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java index cfa377c3133..5c4a56ce54a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java @@ -80,7 +80,11 @@ public abstract class AbstractCrossCompactionWriter extends AbstractCompactionWr protected AbstractCrossCompactionWriter( List<TsFileResource> targetResources, List<TsFileResource> seqFileResources) throws IOException { - this(targetResources, seqFileResources, EncryptDBUtils.getDefaultFirstEncryptParam(), Long.MIN_VALUE); + this( + targetResources, + seqFileResources, + EncryptDBUtils.getDefaultFirstEncryptParam(), + Long.MIN_VALUE); } protected AbstractCrossCompactionWriter( @@ -106,7 +110,8 @@ public abstract class AbstractCrossCompactionWriter extends AbstractCompactionWr targetResources.get(i), memorySizeForEachWriter, CompactionType.CROSS_COMPACTION, - this.encryptParameter, maxTsFileSetEndVersion)); + this.encryptParameter, + maxTsFileSetEndVersion)); isEmptyFile[i] = true; } this.seqTsFileResources = seqFileResources; @@ -270,7 +275,8 @@ public abstract class AbstractCrossCompactionWriter extends AbstractCompactionWr } @Override - public void setSchemaForAllTargetFile(List<Schema> schemas, Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) { + public void setSchemaForAllTargetFile( + List<Schema> schemas, Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) { for (int i = 0; i < targetFileWriters.size(); i++) { CompactionTsFileWriter compactionTsFileWriter = targetFileWriters.get(i); Schema schema = schemas.get(i); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java index 28ca734f32d..42338a8ad1f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java @@ -130,8 +130,9 @@ public abstract class AbstractInnerCompactionWriter extends AbstractCompactionWr Schema schema = CompactionTableSchemaCollector.copySchema(schemas.get(0)); TsFileResource minVersionResource = maxTsFileSetEndVersionAndMinResource.getRight(); fileWriter.getTsFileResource().setTsFileManager(minVersionResource.getTsFileManager()); - EvolvedSchema evolvedSchema = fileWriter.getTsFileResource().getMergedEvolvedSchema(maxTsFileSetEndVersion); - fileWriter.setSchema(evolvedSchema.rewriteToOriginal(schema, CompactionTableSchema::new)); + EvolvedSchema evolvedSchema = + fileWriter.getTsFileResource().getMergedEvolvedSchema(maxTsFileSetEndVersion); + fileWriter.setSchema(evolvedSchema != null ? evolvedSchema.rewriteToOriginal(schema, CompactionTableSchema::new) : schema); } @Override @@ -184,7 +185,8 @@ public abstract class AbstractInnerCompactionWriter extends AbstractCompactionWr } @Override - public void setSchemaForAllTargetFile(List<Schema> schemas, Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) { + public void setSchemaForAllTargetFile( + List<Schema> schemas, Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) { this.schemas = schemas; this.maxTsFileSetEndVersionAndMinResource = maxTsFileSetEndVersionAndMinResource; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/FastCrossCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/FastCrossCompactionWriter.java index 59a87b4211c..f379d02704a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/FastCrossCompactionWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/FastCrossCompactionWriter.java @@ -53,7 +53,11 @@ public class FastCrossCompactionWriter extends AbstractCrossCompactionWriter { List<TsFileResource> seqSourceResources, Map<TsFileResource, TsFileSequenceReader> readerMap) throws IOException { - super(targetResources, seqSourceResources, EncryptDBUtils.getDefaultFirstEncryptParam()); + super( + targetResources, + seqSourceResources, + EncryptDBUtils.getDefaultFirstEncryptParam(), + Long.MIN_VALUE); this.readerMap = readerMap; } @@ -61,9 +65,10 @@ public class FastCrossCompactionWriter extends AbstractCrossCompactionWriter { List<TsFileResource> targetResources, List<TsFileResource> seqSourceResources, Map<TsFileResource, TsFileSequenceReader> readerMap, - EncryptParameter encryptParameter) + EncryptParameter encryptParameter, + long maxTsFileSetEndVersion) throws IOException { - super(targetResources, seqSourceResources, encryptParameter); + super(targetResources, seqSourceResources, encryptParameter, maxTsFileSetEndVersion); this.readerMap = readerMap; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java index 6810df4d1a3..b2799c0dfe5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java @@ -47,15 +47,20 @@ public class ReadPointCrossCompactionWriter extends AbstractCrossCompactionWrite public ReadPointCrossCompactionWriter( List<TsFileResource> targetResources, List<TsFileResource> seqFileResources) throws IOException { - super(targetResources, seqFileResources, EncryptDBUtils.getDefaultFirstEncryptParam()); + super( + targetResources, + seqFileResources, + EncryptDBUtils.getDefaultFirstEncryptParam(), + Long.MIN_VALUE); } public ReadPointCrossCompactionWriter( List<TsFileResource> targetResources, List<TsFileResource> seqFileResources, - EncryptParameter encryptParameter) + EncryptParameter encryptParameter, + long maxTsFileSetEndVersion) throws IOException { - super(targetResources, seqFileResources, encryptParameter); + super(targetResources, seqFileResources, encryptParameter, maxTsFileSetEndVersion); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java index c8c24fca8e4..efe33d228f6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java @@ -65,12 +65,19 @@ public class CompactionTsFileWriter extends TsFileIOWriter { @TestOnly public CompactionTsFileWriter(File file, long maxMetadataSize, CompactionType type) throws IOException { - this(new TsFileResource(file), maxMetadataSize, type, EncryptDBUtils.getDefaultFirstEncryptParam(), + this( + new TsFileResource(file), + maxMetadataSize, + type, + EncryptDBUtils.getDefaultFirstEncryptParam(), Long.MIN_VALUE); } public CompactionTsFileWriter( - TsFileResource tsFile, long maxMetadataSize, CompactionType type, EncryptParameter encryptParameter, + TsFileResource tsFile, + long maxMetadataSize, + CompactionType type, + EncryptParameter encryptParameter, long maxTsFileSetEndVersion) throws IOException { super(tsFile.getTsFile(), maxMetadataSize, encryptParameter); @@ -101,7 +108,13 @@ public class CompactionTsFileWriter extends TsFileIOWriter { if (!chunkWriter.isEmpty()) { isEmptyTargetFile = false; } - chunkWriter.writeToFileWriter(this); + chunkWriter.writeToFileWriter( + this, + evolvedSchema == null + ? null + : measurementName -> + evolvedSchema.getOriginalColumnName( + evolvedSchema.getFinalTableName(currentDeviceId.getTableName()), measurementName)); long writtenDataSize = this.getPos() - beforeOffset; CompactionMetrics.getInstance() .recordWriteInfo( @@ -116,6 +129,13 @@ public class CompactionTsFileWriter extends TsFileIOWriter { if (chunkMetadata.getNumOfPoints() != 0) { isEmptyTargetFile = false; } + if (evolvedSchema != null) { + chunk + .getHeader() + .setMeasurementID( + evolvedSchema.getOriginalColumnName( + currentDeviceId.getTableName(), chunk.getHeader().getMeasurementID())); + } super.writeChunk(chunk, chunkMetadata); long writtenDataSize = this.getPos() - beforeOffset; CompactionMetrics.getInstance() @@ -133,6 +153,10 @@ public class CompactionTsFileWriter extends TsFileIOWriter { TSEncoding encodingType, Statistics<? extends Serializable> statistics) throws IOException { + if (evolvedSchema != null) { + measurementId = + evolvedSchema.getOriginalColumnName(currentDeviceId.getTableName(), measurementId); + } long beforeOffset = this.getPos(); super.writeEmptyValueChunk( measurementId, compressionType, tsDataType, encodingType, statistics); @@ -150,6 +174,9 @@ public class CompactionTsFileWriter extends TsFileIOWriter { @Override public int startChunkGroup(IDeviceID deviceId) throws IOException { + if (evolvedSchema != null) { + deviceId = evolvedSchema.rewriteToOriginal(deviceId); + } currentDeviceId = deviceId; return super.startChunkGroup(deviceId); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java index d515a7fa3a9..d0f3426423c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java @@ -283,7 +283,7 @@ public class RepairDataFileScanUtil { throws IOException { List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>(); reader.getDeviceTimeseriesMetadata( - timeseriesMetadataList, metadataIndexNode, Collections.emptySet(), true); + timeseriesMetadataList, metadataIndexNode, Collections.emptySet(), true, null); long actualDeviceStartTime = Long.MAX_VALUE; long actualDeviceEndTime = Long.MIN_VALUE; for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java index 28164934016..b659585d9fe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java @@ -113,18 +113,18 @@ public class QueryDataSource implements IQueryDataSource { return queryDataSource; } - public boolean hasNextSeqResource(int curIndex, boolean ascending, IDeviceID deviceID) { + public boolean hasNextSeqResource(int curIndex, boolean ascending, IDeviceID deviceID, long maxTsFileSetEndVersion) { boolean res = ascending ? curIndex < seqResources.size() : curIndex >= 0; if (res && curIndex != this.curSeqIndex) { this.curSeqIndex = curIndex; - this.curSeqOrderTime = seqResources.get(curIndex).getOrderTimeForSeq(deviceID, ascending); + this.curSeqOrderTime = seqResources.get(curIndex).getOrderTimeForSeq(deviceID, ascending, maxTsFileSetEndVersion); this.curSeqSatisfied = null; } return res; } public boolean isSeqSatisfied( - IDeviceID deviceID, int curIndex, Filter timeFilter, boolean debug) { + IDeviceID deviceID, int curIndex, Filter timeFilter, boolean debug, long maxTsFileSetEndVersion) { if (curIndex != this.curSeqIndex) { throw new IllegalArgumentException( String.format("curIndex %d is not equal to curSeqIndex %d", curIndex, this.curSeqIndex)); @@ -133,7 +133,7 @@ public class QueryDataSource implements IQueryDataSource { TsFileResource tsFileResource = seqResources.get(curSeqIndex); curSeqSatisfied = tsFileResource != null - && (isSingleDevice || tsFileResource.isSatisfied(deviceID, timeFilter, true, debug)); + && (isSingleDevice || tsFileResource.isSatisfied(deviceID, timeFilter, true, debug, maxTsFileSetEndVersion)); } return curSeqSatisfied; @@ -154,14 +154,14 @@ public class QueryDataSource implements IQueryDataSource { return null; } - public boolean hasNextUnseqResource(int curIndex, boolean ascending, IDeviceID deviceID) { + public boolean hasNextUnseqResource(int curIndex, boolean ascending, IDeviceID deviceID, long maxTsFileSetEndVersion) { boolean res = curIndex < unseqResources.size(); if (res && curIndex != this.curUnSeqIndex) { this.curUnSeqIndex = curIndex; this.curUnSeqOrderTime = unseqResources .get(unSeqFileOrderIndex[curIndex]) - .getOrderTimeForUnseq(deviceID, ascending); + .getOrderTimeForUnseq(deviceID, ascending, maxTsFileSetEndVersion); this.curUnSeqSatisfied = null; } return res; @@ -209,7 +209,7 @@ public class QueryDataSource implements IQueryDataSource { return unseqResources.size(); } - public void fillOrderIndexes(IDeviceID deviceId, boolean ascending) { + public void fillOrderIndexes(IDeviceID deviceId, boolean ascending, long maxTsFileSetEndVersion) { if (unseqResources == null || unseqResources.isEmpty()) { return; } @@ -219,7 +219,7 @@ public class QueryDataSource implements IQueryDataSource { for (TsFileResource resource : unseqResources) { orderTimeToIndexMap .computeIfAbsent( - resource.getOrderTimeForUnseq(deviceId, ascending), key -> new ArrayList<>()) + resource.getOrderTimeForUnseq(deviceId, ascending, maxTsFileSetEndVersion), key -> new ArrayList<>()) .add(index++); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java index 11dcde29b37..610365cbfb7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.storageengine.dataregion.tsfile; -import java.util.stream.Collectors; import org.apache.iotdb.commons.utils.TimePartitionUtils; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.storageengine.dataregion.modification.ModFileManagement; @@ -43,6 +42,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; public class TsFileManager { private final String storageGroupName; @@ -518,20 +518,25 @@ public class TsFileManager { public void addTsFileSet(TsFileSet newSet, long partitionId) { writeLock("addTsFileSet"); try { - List<TsFileSet> tsFileSetList = tsfileSets.computeIfAbsent(partitionId, - p -> new ArrayList<>()); + List<TsFileSet> tsFileSetList = + tsfileSets.computeIfAbsent(partitionId, p -> new ArrayList<>()); tsFileSetList.add(newSet); } finally { writeUnlock(); } } - public List<TsFileSet> getTsFileSet(long partitionId, long minFileVersionIncluded, long maxFileVersionExcluded) { + public List<TsFileSet> getTsFileSet( + long partitionId, long minFileVersionIncluded, long maxFileVersionExcluded) { readLock(); try { List<TsFileSet> tsFileSetList = tsfileSets.get(partitionId); - return tsFileSetList.stream().filter(s -> s.getEndVersion() < maxFileVersionExcluded && s.getEndVersion() >= minFileVersionIncluded).collect( - Collectors.toList()); + return tsFileSetList.stream() + .filter( + s -> + s.getEndVersion() < maxFileVersionExcluded + && s.getEndVersion() >= minFileVersionIncluded) + .collect(Collectors.toList()); } finally { readUnlock(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java index a387db509c6..0bc12e3f6f2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java @@ -641,8 +641,12 @@ public class TsFileResource implements PersistentResource, Cloneable { } // cannot use FileTimeIndex - public long getOrderTimeForSeq(IDeviceID deviceId, boolean ascending) { + public long getOrderTimeForSeq(IDeviceID deviceId, boolean ascending, long maxTsFileSetEndVersion) { if (timeIndex instanceof ArrayDeviceTimeIndex) { + EvolvedSchema evolvedSchema = getMergedEvolvedSchema(maxTsFileSetEndVersion); + if (evolvedSchema != null) { + deviceId = evolvedSchema.rewriteToOriginal(deviceId); + } return ascending ? timeIndex.getStartTime(deviceId).orElse(Long.MIN_VALUE) : timeIndex.getEndTime(deviceId).orElse(Long.MAX_VALUE); @@ -652,8 +656,12 @@ public class TsFileResource implements PersistentResource, Cloneable { } // can use FileTimeIndex - public long getOrderTimeForUnseq(IDeviceID deviceId, boolean ascending) { + public long getOrderTimeForUnseq(IDeviceID deviceId, boolean ascending, long maxTsFileSetEndVersion) { if (timeIndex instanceof ArrayDeviceTimeIndex) { + EvolvedSchema evolvedSchema = getMergedEvolvedSchema(maxTsFileSetEndVersion); + if (evolvedSchema != null) { + deviceId = evolvedSchema.rewriteToOriginal(deviceId); + } if (ascending) { return timeIndex.getStartTime(deviceId).orElse(Long.MIN_VALUE); } else { @@ -998,14 +1006,26 @@ public class TsFileResource implements PersistentResource, Cloneable { } public boolean isDeviceIdExist(IDeviceID deviceId) { + EvolvedSchema evolvedSchema = getMergedEvolvedSchema(); + if (evolvedSchema != null) { + deviceId = evolvedSchema.rewriteToOriginal(deviceId); + } return timeIndex.checkDeviceIdExist(deviceId); } + public boolean isSatisfied(IDeviceID deviceId, Filter timeFilter, boolean isSeq, boolean debug) { + return isSatisfied(deviceId, timeFilter, isSeq, debug, Long.MAX_VALUE); + } + /** * @return true if the device is contained in the TsFile */ @SuppressWarnings("OptionalGetWithoutIsPresent") - public boolean isSatisfied(IDeviceID deviceId, Filter timeFilter, boolean isSeq, boolean debug) { + public boolean isSatisfied(IDeviceID deviceId, Filter timeFilter, boolean isSeq, boolean debug, long maxTsFileSetEndVersion) { + EvolvedSchema evolvedSchema = getMergedEvolvedSchema(maxTsFileSetEndVersion); + if (evolvedSchema != null) { + deviceId = evolvedSchema.rewriteToOriginal(deviceId); + } if (deviceId != null && definitelyNotContains(deviceId)) { if (debug) { DEBUG_LOGGER.info( @@ -1632,11 +1652,13 @@ public class TsFileResource implements PersistentResource, Cloneable { } public List<TsFileSet> getTsFileSets() { - return tsFileManager.getTsFileSet(tsFileID.timePartitionId, tsFileID.fileVersion, Long.MAX_VALUE); + return tsFileManager.getTsFileSet( + tsFileID.timePartitionId, tsFileID.fileVersion, Long.MAX_VALUE); } public List<TsFileSet> getTsFileSets(long maxEndVersionExcluded) { - return tsFileManager.getTsFileSet(tsFileID.timePartitionId, tsFileID.fileVersion, maxEndVersionExcluded); + return tsFileManager.getTsFileSet( + tsFileID.timePartitionId, tsFileID.fileVersion, maxEndVersionExcluded); } public EvolvedSchema getMergedEvolvedSchema() { @@ -1645,7 +1667,8 @@ public class TsFileResource implements PersistentResource, Cloneable { public EvolvedSchema getMergedEvolvedSchema(long excludedMaxFileVersion) { List<EvolvedSchema> list = new ArrayList<>(); - for (TsFileSet fileSet : getTsFileSets()) { + List<TsFileSet> tsFileSets = getTsFileSets(); + for (TsFileSet fileSet : tsFileSets) { if (fileSet.getEndVersion() >= excludedMaxFileVersion) { continue; } @@ -1661,7 +1684,8 @@ public class TsFileResource implements PersistentResource, Cloneable { return EvolvedSchema.merge(list.toArray(new EvolvedSchema[0])); } - public static Pair<Long, TsFileResource> getMaxTsFileSetEndVersionAndMinResource(List<TsFileResource> tsFileResources) { + public static Pair<Long, TsFileResource> getMaxTsFileSetEndVersionAndMinResource( + List<TsFileResource> tsFileResources) { long maxTsFileSetEndVersion = Long.MIN_VALUE; long minResourceVersion = Long.MAX_VALUE; TsFileResource minTsFileResource = null; @@ -1682,8 +1706,7 @@ public class TsFileResource implements PersistentResource, Cloneable { return new Pair<>(maxTsFileSetEndVersion, minTsFileResource); } - public void setTsFileManager( - TsFileManager tsFileManager) { + public void setTsFileManager(TsFileManager tsFileManager) { this.tsFileManager = tsFileManager; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java index a88e76d4327..3dca08b13d7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java @@ -19,8 +19,6 @@ package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution; -import java.util.function.Function; -import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionTableSchema; import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry.ModType; @@ -28,13 +26,13 @@ import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEn import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate; import org.apache.tsfile.enums.ColumnCategory; -import org.apache.tsfile.file.metadata.IChunkMetadata; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.IDeviceID.Factory; import org.apache.tsfile.file.metadata.TableSchema; import org.apache.tsfile.file.metadata.TimeseriesMetadata; import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.schema.Schema; import java.util.ArrayList; import java.util.Collections; @@ -44,8 +42,8 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.function.Function; import java.util.stream.Collectors; -import org.apache.tsfile.write.schema.Schema; public class EvolvedSchema { // the evolved table names after applying all schema evolution operations @@ -106,7 +104,7 @@ public class EvolvedSchema { return finalToOriginalTableNames.getOrDefault(finalTableName, finalTableName); } - private String getFinalTableName(String originalTableName) { + public String getFinalTableName(String originalTableName) { return originalToFinalTableNames.getOrDefault(originalTableName, originalTableName); } @@ -232,10 +230,9 @@ public class EvolvedSchema { String originalTableName, List<TimeseriesMetadata> timeseriesMetadataList) { timeseriesMetadataList.forEach( timeseriesMetadata -> { - String finalColumnName = getFinalColumnName(originalTableName, - timeseriesMetadata.getMeasurementId()); - timeseriesMetadata.setMeasurementId( - finalColumnName); + String finalColumnName = + getFinalColumnName(originalTableName, timeseriesMetadata.getMeasurementId()); + timeseriesMetadata.setMeasurementId(finalColumnName); }); } @@ -263,11 +260,15 @@ public class EvolvedSchema { getOriginalColumnName( tableSchema.getTableName(), measurementSchema.getMeasurementName()), measurementSchema.getType(), - measurementSchema.getEncodingType(), measurementSchema.getCompressor())); + measurementSchema.getEncodingType(), + measurementSchema.getCompressor())); columnCategories.add(tableSchema.getColumnTypes().get(i)); } - return new TableSchema(originalTableName, measurementSchemas, columnCategories); + TableSchema schema = new TableSchema(originalTableName, measurementSchemas, + columnCategories); + schema.setUpdatable(tableSchema.isUpdatable()); + return schema; } public TableSchema rewriteToFinal(TableSchema tableSchema) { @@ -288,7 +289,10 @@ public class EvolvedSchema { columnCategories.add(tableSchema.getColumnTypes().get(i)); } - return new TableSchema(finalTableName, measurementSchemas, columnCategories); + TableSchema schema = new TableSchema(finalTableName, measurementSchemas, + columnCategories); + schema.setUpdatable(tableSchema.isUpdatable()); + return schema; } @SuppressWarnings("SuspiciousSystemArraycopy") @@ -310,6 +314,10 @@ public class EvolvedSchema { new LinkedHashMap<>(evolvedSchema.finalToOriginalTableNames); newEvolvedSchema.finalToOriginalColumnNames = new LinkedHashMap<>(evolvedSchema.finalToOriginalColumnNames); + newEvolvedSchema.originalToFinalTableNames = + new LinkedHashMap<>(evolvedSchema.originalToFinalTableNames); + newEvolvedSchema.originalToFinalColumnNames = + new LinkedHashMap<>(evolvedSchema.originalToFinalColumnNames); return newEvolvedSchema; } @@ -323,6 +331,9 @@ public class EvolvedSchema { break; } } + if (i == schemas.length) { + return firstNotNullSchema; + } if (firstNotNullSchema == null) { return null; @@ -359,7 +370,9 @@ public class EvolvedSchema { public Schema rewriteToOriginal(Schema schema) { return rewriteToOriginal(schema, null); } - public Schema rewriteToOriginal(Schema schema, Function<TableSchema, TableSchema> tableSchemaTransformer) { + + public Schema rewriteToOriginal( + Schema schema, Function<TableSchema, TableSchema> tableSchemaTransformer) { Schema copySchema = new Schema(); for (TableSchema tableSchema : schema.getTableSchemaMap().values()) { TableSchema originalSchema = rewriteToOriginal(tableSchema); @@ -370,6 +383,4 @@ public class EvolvedSchema { } return copySchema; } - - } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java index c2364b20b61..3ae2949deaa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java @@ -23,11 +23,12 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSche import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution; import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolutionFile; +import org.apache.tsfile.external.commons.io.FileUtils; + import java.io.File; import java.io.IOException; import java.util.Collection; import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.tsfile.external.commons.io.FileUtils; /** TsFileSet represents a set of TsFiles in a time partition whose version <= endVersion. */ public class TsFileSet implements Comparable<TsFileSet> { @@ -65,7 +66,7 @@ public class TsFileSet implements Comparable<TsFileSet> { if (schemaEvolutionFile == null) { schemaEvolutionFile = new SchemaEvolutionFile( - fileSetsDir + File.separator + 0 + SchemaEvolutionFile.FILE_SUFFIX); + fileSetDir + File.separator + 0 + SchemaEvolutionFile.FILE_SUFFIX); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java index 31ed98eb0e1..7a90f6b16d6 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java @@ -850,8 +850,8 @@ public class AbstractCompactionTest { Pair<IDeviceID, Boolean> iDeviceIDBooleanPair = deviceIterator.nextDevice(); IDeviceID deviceID = iDeviceIDBooleanPair.getLeft(); boolean isAlign = iDeviceIDBooleanPair.getRight(); - Map<String, MeasurementSchema> schemaMap = deviceIterator.getAllSchemasOfCurrentDevice( - new Pair<>(Long.MIN_VALUE, null)); + Map<String, MeasurementSchema> schemaMap = + deviceIterator.getAllSchemasOfCurrentDevice(new Pair<>(Long.MIN_VALUE, null)); IMeasurementSchema timeSchema = schemaMap.remove(TsFileConstant.TIME_COLUMN_ID); List<IMeasurementSchema> measurementSchemas = new ArrayList<>(schemaMap.values()); if (measurementSchemas.isEmpty()) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/ReadPointCompactionPerformerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/ReadPointCompactionPerformerTest.java index f21571ce4f8..f926506e889 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/ReadPointCompactionPerformerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/ReadPointCompactionPerformerTest.java @@ -37,18 +37,32 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionF import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.ColumnRename; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.TableRename; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.fileset.TsFileSet; import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.db.utils.constant.TestConstant; import org.apache.tsfile.common.conf.TSFileDescriptor; +import org.apache.tsfile.enums.ColumnCategory; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.write.NoMeasurementException; +import org.apache.tsfile.exception.write.NoTableException; import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.file.metadata.ColumnSchemaBuilder; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.IDeviceID.Factory; +import org.apache.tsfile.file.metadata.TableSchema; import org.apache.tsfile.read.common.IBatchDataIterator; import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.query.dataset.ResultSet; +import org.apache.tsfile.read.v4.ITsFileReader; +import org.apache.tsfile.read.v4.TsFileReaderBuilder; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.TsFileGeneratorUtils; import org.apache.tsfile.utils.TsPrimitiveType; +import org.apache.tsfile.write.TsFileWriter; +import org.apache.tsfile.write.record.Tablet; import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.After; @@ -56,8 +70,10 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -66,6 +82,8 @@ import java.util.concurrent.ExecutionException; import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_SEPARATOR; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; @SuppressWarnings("OptionalGetWithoutIsPresent") public class ReadPointCompactionPerformerTest extends AbstractCompactionTest { @@ -6993,4 +7011,293 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest { Assert.fail(); } } + + @Test + public void testWithSevoFile() throws Exception { + String fileSetDir = + TestConstant.BASE_OUTPUT_PATH + File.separator + TsFileSet.FILE_SET_DIR_NAME; + // file1: + // table1[s1, s2, s3] + // table2[s1, s2, s3] + File f1 = new File(SEQ_DIRS, "0-1-0-0.tsfile"); + TableSchema tableSchema1_1 = + new TableSchema( + "table1", + Arrays.asList( + new ColumnSchemaBuilder() + .name("s1") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s2") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s3") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build())); + TableSchema tableSchema1_2 = + new TableSchema( + "table2", + Arrays.asList( + new ColumnSchemaBuilder() + .name("s1") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s2") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s3") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build())); + try (TsFileWriter tsFileWriter = new TsFileWriter(f1)) { + tsFileWriter.registerTableSchema(tableSchema1_1); + tsFileWriter.registerTableSchema(tableSchema1_2); + + Tablet tablet1 = new Tablet(tableSchema1_1.getTableName(), tableSchema1_1.getColumnSchemas()); + tablet1.addTimestamp(0, 0); + tablet1.addValue(0, 0, 1); + tablet1.addValue(0, 1, 2); + tablet1.addValue(0, 2, 3); + + Tablet tablet2 = new Tablet(tableSchema1_2.getTableName(), tableSchema1_2.getColumnSchemas()); + tablet2.addTimestamp(0, 0); + tablet2.addValue(0, 0, 101); + tablet2.addValue(0, 1, 102); + tablet2.addValue(0, 2, 103); + + tsFileWriter.writeTable(tablet1); + tsFileWriter.writeTable(tablet2); + } + TsFileResource resource1 = new TsFileResource(f1); + resource1.setTsFileManager(tsFileManager); + resource1.updateStartTime(Factory.DEFAULT_FACTORY.create(new String[]{"table1"}), 0); + resource1.updateEndTime(Factory.DEFAULT_FACTORY.create(new String[]{"table1"}), 0); + resource1.updateStartTime(Factory.DEFAULT_FACTORY.create(new String[]{"table2"}), 0); + resource1.updateEndTime(Factory.DEFAULT_FACTORY.create(new String[]{"table2"}), 0); + resource1.close(); + + // rename table1 -> table0 + TsFileSet tsFileSet1 = new TsFileSet(1, fileSetDir, false); + tsFileSet1.appendSchemaEvolution( + Collections.singletonList(new TableRename("table1", "table0"))); + tsFileManager.addTsFileSet(tsFileSet1, 0); + + // file2: + // table0[s1, s2, s3] + // table2[s1, s2, s3] + File f2 = new File(SEQ_DIRS, "0-2-0-0.tsfile"); + TableSchema tableSchema2_1 = + new TableSchema( + "table0", + Arrays.asList( + new ColumnSchemaBuilder() + .name("s1") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s2") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s3") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build())); + TableSchema tableSchema2_2 = + new TableSchema( + "table2", + Arrays.asList( + new ColumnSchemaBuilder() + .name("s1") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s2") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s3") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build())); + try (TsFileWriter tsFileWriter = new TsFileWriter(f2)) { + tsFileWriter.registerTableSchema(tableSchema2_1); + tsFileWriter.registerTableSchema(tableSchema2_2); + + Tablet tablet1 = new Tablet(tableSchema2_1.getTableName(), tableSchema2_1.getColumnSchemas()); + tablet1.addTimestamp(0, 1); + tablet1.addValue(0, 0, 11); + tablet1.addValue(0, 1, 12); + tablet1.addValue(0, 2, 13); + + Tablet tablet2 = new Tablet(tableSchema2_2.getTableName(), tableSchema2_2.getColumnSchemas()); + tablet2.addTimestamp(0, 1); + tablet2.addValue(0, 0, 111); + tablet2.addValue(0, 1, 112); + tablet2.addValue(0, 2, 113); + + tsFileWriter.writeTable(tablet1); + tsFileWriter.writeTable(tablet2); + } + TsFileResource resource2 = new TsFileResource(f2); + resource2.setTsFileManager(tsFileManager); + resource2.updateStartTime(Factory.DEFAULT_FACTORY.create(new String[]{"table0"}), 1); + resource2.updateEndTime(Factory.DEFAULT_FACTORY.create(new String[]{"table0"}), 1); + resource2.updateStartTime(Factory.DEFAULT_FACTORY.create(new String[]{"table2"}), 1); + resource2.updateEndTime(Factory.DEFAULT_FACTORY.create(new String[]{"table2"}), 1); + resource2.close(); + + + // rename table0.s1 -> table0.s0 + TsFileSet tsFileSet2 = new TsFileSet(2, fileSetDir, false); + tsFileSet2.appendSchemaEvolution( + Collections.singletonList(new ColumnRename("table0", "s1", "s0"))); + tsFileManager.addTsFileSet(tsFileSet2, 0); + + // file3: + // table0[s0, s2, s3] + // table2[s1, s2, s3] + File f3 = new File(SEQ_DIRS, "0-3-0-0.tsfile"); + TableSchema tableSchema3_1 = + new TableSchema( + "table0", + Arrays.asList( + new ColumnSchemaBuilder() + .name("s0") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s2") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s3") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build())); + TableSchema tableSchema3_2 = + new TableSchema( + "table2", + Arrays.asList( + new ColumnSchemaBuilder() + .name("s1") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s2") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s3") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build())); + try (TsFileWriter tsFileWriter = new TsFileWriter(f3)) { + tsFileWriter.registerTableSchema(tableSchema3_1); + tsFileWriter.registerTableSchema(tableSchema3_2); + + Tablet tablet1 = new Tablet(tableSchema3_1.getTableName(), tableSchema3_1.getColumnSchemas()); + tablet1.addTimestamp(0, 2); + tablet1.addValue(0, 0, 21); + tablet1.addValue(0, 1, 22); + tablet1.addValue(0, 2, 23); + + Tablet tablet2 = new Tablet(tableSchema3_2.getTableName(), tableSchema3_2.getColumnSchemas()); + tablet2.addTimestamp(0, 2); + tablet2.addValue(0, 0, 121); + tablet2.addValue(0, 1, 122); + tablet2.addValue(0, 2, 123); + + tsFileWriter.writeTable(tablet1); + tsFileWriter.writeTable(tablet2); + } + TsFileResource resource3 = new TsFileResource(f3); + resource3.setTsFileManager(tsFileManager); + resource3.updateStartTime(Factory.DEFAULT_FACTORY.create(new String[]{"table0"}), 2); + resource3.updateEndTime(Factory.DEFAULT_FACTORY.create(new String[]{"table0"}), 2); + resource3.updateStartTime(Factory.DEFAULT_FACTORY.create(new String[]{"table2"}), 2); + resource3.updateEndTime(Factory.DEFAULT_FACTORY.create(new String[]{"table2"}), 2); + resource3.close(); + + // rename table2 -> table1 + TsFileSet tsFileSet3 = new TsFileSet(3, fileSetDir, false); + tsFileSet3.appendSchemaEvolution( + Collections.singletonList(new TableRename("table2", "table1"))); + tsFileManager.addTsFileSet(tsFileSet3, 0); + + // perform compaction + seqResources.add(resource1); + seqResources.add(resource2); + seqResources.add(resource3); + + List<TsFileResource> targetResources = + CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(seqResources, true); + targetResources.forEach(s -> s.setTsFileManager(tsFileManager)); + + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); + + // target(version=1): + // table1[s1, s2, s3] + // table2[s1, s2, s3] + try (ITsFileReader tsFileReader = + new TsFileReaderBuilder().file(targetResources.get(0).getTsFile()).build()) { + // table1 should not exist + try { + tsFileReader.query("table0", Collections.singletonList("s2"), Long.MIN_VALUE, Long.MAX_VALUE); + fail("table0 should not exist"); + } catch (NoTableException e) { + assertEquals("Table table0 not found", e.getMessage()); + } + + // table1.s0 should not exist + try { + tsFileReader.query("table1", Collections.singletonList("s0"), Long.MIN_VALUE, Long.MAX_VALUE); + fail("table1.s0 should not exist"); + } catch (NoMeasurementException e) { + assertEquals("No measurement for s0", e.getMessage()); + } + + // check data of table1 + ResultSet resultSet = tsFileReader.query("table1", Arrays.asList("s1", "s2", "s3"), + Long.MIN_VALUE, Long.MAX_VALUE); + for (int i = 0; i < 3; i++) { + assertTrue(resultSet.next()); + assertEquals(i, resultSet.getLong(1)); + for (int j = 0; j < 3; j++) { + assertEquals(i * 10 + j + 1, resultSet.getLong(j + 2)); + } + } + + // check data of table2 + resultSet = tsFileReader.query("table2", Arrays.asList("s1", "s2", "s3"), + Long.MIN_VALUE, Long.MAX_VALUE); + for (int i = 0; i < 3; i++) { + assertTrue(resultSet.next()); + assertEquals(i, resultSet.getLong(1)); + for (int j = 0; j < 3; j++) { + assertEquals(100 + i * 10 + j + 1, resultSet.getLong(j + 2)); + } + } + } + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionCheckerUtils.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionCheckerUtils.java index d3851e4e48e..8aca6468344 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionCheckerUtils.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionCheckerUtils.java @@ -538,8 +538,8 @@ public class CompactionCheckerUtils { Pair<IDeviceID, Boolean> iDeviceIDBooleanPair = deviceIterator.nextDevice(); IDeviceID deviceID = iDeviceIDBooleanPair.getLeft(); boolean isAlign = iDeviceIDBooleanPair.getRight(); - Map<String, MeasurementSchema> schemaMap = deviceIterator.getAllSchemasOfCurrentDevice( - new Pair<>(Long.MIN_VALUE, null)); + Map<String, MeasurementSchema> schemaMap = + deviceIterator.getAllSchemasOfCurrentDevice(new Pair<>(Long.MIN_VALUE, null)); IMeasurementSchema timeSchema = schemaMap.remove(TsFileConstant.TIME_COLUMN_ID); List<IMeasurementSchema> measurementSchemas = new ArrayList<>(schemaMap.values()); if (measurementSchemas.isEmpty()) {
