This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch force_ci/support_schema_evolution in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 10a555360a46be9629cf9a6112b29739d340e600 Author: Tian Jiang <[email protected]> AuthorDate: Mon Dec 29 18:19:19 2025 +0800 support load with sevo --- .../relational/it/db/it/IoTDBLoadTsFileIT.java | 73 +++++++- .../consensus/iot/client/DispatchLogHandler.java | 2 +- .../impl/DataNodeInternalRPCServiceImpl.java | 2 +- .../execution/operator/source/FileLoaderUtils.java | 10 +- .../db/queryengine/plan/analyze/AnalyzeUtils.java | 5 +- .../plan/analyze/load/LoadTsFileAnalyzer.java | 32 +++- .../plan/planner/LogicalPlanVisitor.java | 3 +- .../plan/node/load/LoadSingleTsFileNode.java | 15 +- .../planner/plan/node/load/LoadTsFileNode.java | 15 +- .../plan/relational/planner/RelationPlanner.java | 6 +- .../plan/relational/sql/ast/LoadTsFile.java | 6 + .../plan/scheduler/load/LoadTsFileScheduler.java | 4 +- .../plan/statement/crud/LoadTsFileStatement.java | 6 + .../db/storageengine/dataregion/DataRegion.java | 21 ++- .../compaction/execute/utils/CompactionUtils.java | 2 +- .../dataregion/modification/TagPredicate.java | 9 + .../dataregion/tsfile/evolution/EvolvedSchema.java | 202 ++++++++++++++++----- .../tsfile/evolution/SchemaEvolutionFile.java | 4 + .../dataregion/tsfile/fileset/TsFileSet.java | 4 +- .../file/UnsealedTsFileRecoverPerformer.java | 2 +- .../load/config/LoadTsFileConfigurator.java | 18 ++ .../load/splitter/AlignedChunkData.java | 13 +- .../storageengine/load/splitter/DeletionData.java | 8 +- .../load/splitter/NonAlignedChunkData.java | 11 +- .../db/storageengine/load/splitter/TsFileData.java | 3 + .../load/splitter/TsFileSplitter.java | 50 ++++- .../iotdb/db/metadata/path/PatternTreeMapTest.java | 2 +- .../db/pipe/consensus/DeletionRecoverTest.java | 2 +- .../db/pipe/consensus/DeletionResourceTest.java | 2 +- .../PipePlanTablePatternParseVisitorTest.java | 2 +- .../plan/planner/node/load/LoadTsFileNodeTest.java | 2 +- .../node/write/RelationalDeleteDataNodeTest.java | 2 +- .../storageengine/dataregion/DataRegionTest.java | 88 +++++---- .../BatchedCompactionWithTsFileSplitterTest.java | 3 +- .../tablemodel/CompactionWithAllNullRowsTest.java | 2 +- 35 files changed, 510 insertions(+), 121 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java index 5f88d50c9d9..46cc2e63adb 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java @@ -19,6 +19,10 @@ package org.apache.iotdb.relational.it.db.it; +import java.sql.SQLException; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolutionFile; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.TableRename; import org.apache.iotdb.it.env.EnvFactory; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.it.utils.TsFileTableGenerator; @@ -47,10 +51,15 @@ import java.sql.Connection; import java.sql.ResultSet; import java.sql.Statement; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + @RunWith(IoTDBTestRunner.class) @Category({TableLocalStandaloneIT.class, TableClusterIT.class}) public class IoTDBLoadTsFileIT { @@ -255,7 +264,69 @@ public class IoTDBLoadTsFileIT { try (final ResultSet resultSet = statement.executeQuery("show tables")) { Assert.assertTrue(resultSet.next()); - Assert.assertFalse(resultSet.next()); + assertFalse(resultSet.next()); + } + } + } + + @Test + public void testLoadWithSevoFile() throws Exception { + final int lineCount = 10000; + + List<Pair<MeasurementSchema, MeasurementSchema>> measurementSchemas = + generateMeasurementSchemas(); + List<ColumnCategory> columnCategories = + generateTabletColumnCategory(0, measurementSchemas.size()); + + final File file = new File(tmpDir, "1-0-0-0.tsfile"); + + List<MeasurementSchema> schemaList1 = + measurementSchemas.stream().map(pair -> pair.left).collect(Collectors.toList()); + + try (final TsFileTableGenerator generator = new TsFileTableGenerator(file)) { + generator.registerTable(SchemaConfig.TABLE_0, new ArrayList<>(schemaList1), columnCategories); + generator.generateData(SchemaConfig.TABLE_0, lineCount, PARTITION_INTERVAL / 10_000); + } + + // rename table0 to table1 + File sevoFile = new File(tmpDir, "0.sevo"); + SchemaEvolutionFile schemaEvolutionFile = new SchemaEvolutionFile(sevoFile.getAbsolutePath()); + SchemaEvolution schemaEvolution = new TableRename(SchemaConfig.TABLE_0, SchemaConfig.TABLE_1); + schemaEvolutionFile.append(Collections.singletonList(schemaEvolution)); + // rename INT322INT32 + + try (final Connection connection = + EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + final Statement statement = connection.createStatement()) { + statement.execute(String.format("create database if not exists %s", SchemaConfig.DATABASE_0)); + statement.execute(String.format("use %s", SchemaConfig.DATABASE_0)); + statement.execute( + String.format( + "load '%s' with ('database'='%s', 'sevo-file-path'='%s')", + file.getAbsolutePath(), SchemaConfig.DATABASE_0, schemaEvolutionFile.getFilePath())); + + // cannot query using table0 + try (final ResultSet resultSet = + statement.executeQuery(String.format("select count(*) from %s", SchemaConfig.TABLE_0))) { + fail(); + } catch (SQLException e) { + assertEquals("550: Table 'root.test' does not exist.", e.getMessage()); + } + + // can query with table1 + try (final ResultSet resultSet = + statement.executeQuery(String.format("select count(*) from %s", SchemaConfig.TABLE_1))) { + if (resultSet.next()) { + Assert.assertEquals(lineCount, resultSet.getLong(1)); + } else { + Assert.fail("This ResultSet is empty."); + } + } + + try (final ResultSet resultSet = statement.executeQuery("show tables")) { + Assert.assertTrue(resultSet.next()); + assertEquals(SchemaConfig.TABLE_1, resultSet.getString(1)); + assertFalse(resultSet.next()); } } } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java index bb0326d7473..9bfd48cea9b 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java @@ -106,7 +106,7 @@ public class DispatchLogHandler implements AsyncMethodCallback<TSyncLogEntriesRe ++retryCount; Throwable rootCause = ExceptionUtils.getRootCause(exception); logger.warn( - "Can not send {} to peer for {} times {} because {}", + "v {} to peer for {} times {} because {}", batch, thread.getPeer(), retryCount, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 84e3e17a498..dc6256ca13a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -195,8 +195,8 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.Compacti import org.apache.iotdb.db.storageengine.dataregion.compaction.settle.SettleRequestHandler; import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio; import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; -import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate; import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate; import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution; import org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeSpaceQuotaManager; import org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeThrottleQuotaManager; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java index cf128e33f18..da8fd358e97 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.queryengine.execution.operator.source; -import java.util.concurrent.LinkedBlockingDeque; import org.apache.iotdb.commons.path.AlignedFullPath; import org.apache.iotdb.commons.path.NonAlignedFullPath; import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; @@ -98,15 +97,13 @@ public class FileLoaderUtils { EvolvedSchema evolvedSchema = resource.getMergedEvolvedSchema(); IDeviceID deviceId = seriesPath.getDeviceId(); String measurement = seriesPath.getMeasurement(); - + timeSeriesMetadata = TimeSeriesMetadataCache.getInstance() .get( resource.getTsFilePath(), new TimeSeriesMetadataCache.TimeSeriesMetadataCacheKey( - resource.getTsFileID(), - deviceId, - measurement), + resource.getTsFileID(), deviceId, measurement), allSensors, context.ignoreNotExistsDevice() || resource.getTimeIndexType() == ITimeIndex.FILE_TIME_INDEX_TYPE, @@ -115,8 +112,7 @@ public class FileLoaderUtils { if (timeSeriesMetadata != null) { long t2 = System.nanoTime(); List<ModEntry> pathModifications = - context.getPathModifications( - resource, deviceId, measurement); + context.getPathModifications(resource, deviceId, measurement); timeSeriesMetadata.setModified(!pathModifications.isEmpty()); timeSeriesMetadata.setChunkMetadataLoader( new DiskChunkMetadataLoader(resource, context, globalTimeFilter, pathModifications)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java index 331c18d54c9..41df1928112 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java @@ -53,10 +53,10 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; import org.apache.iotdb.db.schemaengine.table.DataNodeTreeViewSchemaUtils; import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; +import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate; import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.And; import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.SegmentExactMatch; -import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -490,7 +490,8 @@ public class AnalyzeUtils { return combinePredicates(oldPredicate, newPredicate); } - private static TagPredicate combinePredicates(TagPredicate oldPredicate, TagPredicate newPredicate) { + private static TagPredicate combinePredicates( + TagPredicate oldPredicate, TagPredicate newPredicate) { if (oldPredicate == null) { return newPredicate; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java index 8202de5c499..9b2bf168f66 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java @@ -41,6 +41,8 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchema; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolutionFile; import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils; import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper; import org.apache.iotdb.db.storageengine.load.converter.LoadTsFileDataTypeConverter; @@ -73,6 +75,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.stream.Collectors; import static org.apache.iotdb.db.queryengine.plan.execution.config.TableConfigTaskVisitor.DATABASE_NOT_SPECIFIED; import static org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet.ANALYSIS; @@ -106,6 +109,8 @@ public class LoadTsFileAnalyzer implements AutoCloseable { private boolean isMiniTsFileConverted = false; private final List<Boolean> isTableModelTsFile; private int isTableModelTsFileReliableIndex = -1; + private final File schemaEvolutionFile; + private EvolvedSchema evolvedSchema; // User specified configs private final int databaseLevel; @@ -134,6 +139,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable { this.tsFiles = loadTsFileStatement.getTsFiles(); this.isMiniTsFile = new ArrayList<>(Collections.nCopies(this.tsFiles.size(), false)); this.isTableModelTsFile = new ArrayList<>(Collections.nCopies(this.tsFiles.size(), false)); + this.schemaEvolutionFile = loadTsFileStatement.getSchemaEvolutionFile(); this.databaseLevel = loadTsFileStatement.getDatabaseLevel(); this.databaseForTableData = loadTsFileStatement.getDatabase(); @@ -158,6 +164,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable { this.tsFiles = loadTsFileTableStatement.getTsFiles(); this.isMiniTsFile = new ArrayList<>(Collections.nCopies(this.tsFiles.size(), false)); this.isTableModelTsFile = new ArrayList<>(Collections.nCopies(this.tsFiles.size(), false)); + this.schemaEvolutionFile = loadTsFileTableStatement.getSchemaEvolutionFile(); this.databaseLevel = loadTsFileTableStatement.getDatabaseLevel(); this.databaseForTableData = loadTsFileTableStatement.getDatabase(); @@ -200,6 +207,12 @@ public class LoadTsFileAnalyzer implements AutoCloseable { } try { + if (schemaEvolutionFile != null && schemaEvolutionFile.exists()) { + SchemaEvolutionFile sevoFile = + new SchemaEvolutionFile(schemaEvolutionFile.getAbsolutePath()); + evolvedSchema = sevoFile.readAsSchema(); + } + if (!doAnalyzeFileByFile(analysis)) { return analysis; } @@ -526,7 +539,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable { final File tsFile, final TsFileSequenceReader reader, final TsFileSequenceReaderTimeseriesMetadataIterator timeseriesMetadataIterator, - final Map<String, TableSchema> tableSchemaMap) + Map<String, TableSchema> tableSchemaMap) throws IOException, LoadAnalyzeException { // construct tsfile resource final TsFileResource tsFileResource = constructTsFileResource(reader, tsFile); @@ -550,13 +563,28 @@ public class LoadTsFileAnalyzer implements AutoCloseable { } getOrCreateTableSchemaCache().setDatabase(databaseForTableData); + if (evolvedSchema != null) { + tableSchemaMap = evolvedSchema.rewriteToFinal(tableSchemaMap); + } getOrCreateTableSchemaCache().setTableSchemaMap(tableSchemaMap); getOrCreateTableSchemaCache().setCurrentModificationsAndTimeIndex(tsFileResource, reader); while (timeseriesMetadataIterator.hasNext()) { - final Map<IDeviceID, List<TimeseriesMetadata>> device2TimeseriesMetadata = + Map<IDeviceID, List<TimeseriesMetadata>> device2TimeseriesMetadata = timeseriesMetadataIterator.next(); + if (evolvedSchema != null) { + device2TimeseriesMetadata = + device2TimeseriesMetadata.entrySet().stream() + .collect( + Collectors.toMap( + e -> evolvedSchema.rewriteToFinal(e.getKey()), + e -> { + evolvedSchema.rewriteToFinal(e.getKey().getTableName(), e.getValue()); + return e.getValue(); + })); + } + // Update time index no matter if resource file exists or not, because resource file may be // untrusted TsFileResourceUtils.updateTsFileResource( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java index fa39b3ba7dc..a084cba0624 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java @@ -531,7 +531,8 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte context.getQueryId().genPlanNodeId(), loadTsFileStatement.getResources(), isTableModel, - loadTsFileStatement.getDatabase()); + loadTsFileStatement.getDatabase(), + loadTsFileStatement.getSchemaEvolutionFile()); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java index 604fda6e1e8..c0d5b582fbe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java @@ -62,6 +62,7 @@ public class LoadSingleTsFileNode extends WritePlanNode { private final boolean deleteAfterLoad; private final long writePointCount; private boolean needDecodeTsFile; + private final File schemaEvolutionFile; private TRegionReplicaSet localRegionReplicaSet; @@ -71,7 +72,8 @@ public class LoadSingleTsFileNode extends WritePlanNode { boolean isTableModel, String database, boolean deleteAfterLoad, - long writePointCount) { + long writePointCount, + File schemaEvolutionFile) { super(id); this.tsFile = resource.getTsFile(); this.resource = resource; @@ -79,6 +81,7 @@ public class LoadSingleTsFileNode extends WritePlanNode { this.database = database; this.deleteAfterLoad = deleteAfterLoad; this.writePointCount = writePointCount; + this.schemaEvolutionFile = schemaEvolutionFile; } public boolean isTsFileEmpty() { @@ -89,6 +92,12 @@ public class LoadSingleTsFileNode extends WritePlanNode { public boolean needDecodeTsFile( Function<List<Pair<IDeviceID, TTimePartitionSlot>>, List<TRegionReplicaSet>> partitionFetcher) { + if (schemaEvolutionFile != null) { + // with schema evolution, must split + needDecodeTsFile = true; + return needDecodeTsFile; + } + List<Pair<IDeviceID, TTimePartitionSlot>> slotList = new ArrayList<>(); resource .getDevices() @@ -152,6 +161,10 @@ public class LoadSingleTsFileNode extends WritePlanNode { return writePointCount; } + public File getSchemaEvolutionFile() { + return schemaEvolutionFile; + } + /** * only used for load locally. * diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java index 3588b6ddbb0..41ae3b9b90b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java @@ -34,6 +34,7 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.tsfile.exception.NotImplementedException; import java.io.DataOutputStream; +import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -46,13 +47,19 @@ public class LoadTsFileNode extends WritePlanNode { private final List<TsFileResource> resources; private final List<Boolean> isTableModel; private final String database; + private final File schemaEvolutionFile; public LoadTsFileNode( - PlanNodeId id, List<TsFileResource> resources, List<Boolean> isTableModel, String database) { + PlanNodeId id, + List<TsFileResource> resources, + List<Boolean> isTableModel, + String database, + File schemaEvolutionFile) { super(id); this.resources = resources; this.isTableModel = isTableModel; this.database = database; + this.schemaEvolutionFile = schemaEvolutionFile; } @Override @@ -121,7 +128,8 @@ public class LoadTsFileNode extends WritePlanNode { isTableModel.get(i), database, statement.isDeleteAfterLoad(), - statement.getWritePointCount(i))); + statement.getWritePointCount(i), + schemaEvolutionFile)); } return res; } @@ -143,7 +151,8 @@ public class LoadTsFileNode extends WritePlanNode { isTableModel.get(i), database, statement.isDeleteAfterLoad(), - statement.getWritePointCount(i))); + statement.getWritePointCount(i), + schemaEvolutionFile)); } } return res; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java index ce573ce1e2d..c872fe0f274 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java @@ -1299,7 +1299,11 @@ public class RelationPlanner extends AstVisitor<RelationPlan, Void> { } return new RelationPlan( new LoadTsFileNode( - idAllocator.genPlanNodeId(), node.getResources(), isTableModel, node.getDatabase()), + idAllocator.genPlanNodeId(), + node.getResources(), + isTableModel, + node.getDatabase(), + node.getSchemaEvolutionFile()), analysis.getRootScope(), Collections.emptyList(), outerContext); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java index 93fc8c7b583..9e551cb2b20 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java @@ -56,6 +56,7 @@ public class LoadTsFile extends Statement { private List<TsFileResource> resources; private List<Long> writePointCountList; private List<Boolean> isTableModel; + private File schemaEvolutionFile; public LoadTsFile(NodeLocation location, String filePath, Map<String, String> loadAttributes) { super(location); @@ -179,6 +180,10 @@ public class LoadTsFile extends Statement { return writePointCountList.get(resourceIndex); } + public File getSchemaEvolutionFile() { + return schemaEvolutionFile; + } + private void initAttributes() { this.databaseLevel = LoadTsFileConfigurator.parseOrGetDefaultDatabaseLevel(loadAttributes); this.database = LoadTsFileConfigurator.parseDatabaseName(loadAttributes); @@ -189,6 +194,7 @@ public class LoadTsFile extends Statement { LoadTsFileConfigurator.parseOrGetDefaultTabletConversionThresholdBytes(loadAttributes); this.verify = LoadTsFileConfigurator.parseOrGetDefaultVerify(loadAttributes); this.isAsyncLoad = LoadTsFileConfigurator.parseOrGetDefaultAsyncLoad(loadAttributes); + this.schemaEvolutionFile = LoadTsFileConfigurator.parseSevoFile(loadAttributes); } public boolean reconstructStatementIfMiniFileConverted(final List<Boolean> isMiniTsFile) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java index 7709e20b557..d6bd9a398d1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java @@ -315,7 +315,9 @@ public class LoadTsFileScheduler implements IScheduler { final TsFileDataManager tsFileDataManager = new TsFileDataManager(this, node, block); try { new TsFileSplitter( - node.getTsFileResource().getTsFile(), tsFileDataManager::addOrSendTsFileData) + node.getTsFileResource().getTsFile(), + tsFileDataManager::addOrSendTsFileData, + node.getSchemaEvolutionFile()) .splitTsFileByDataPartition(); if (!tsFileDataManager.sendAllTsFileData()) { return false; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java index d1dff1bb9cf..3bd7c62a7ff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java @@ -68,6 +68,7 @@ public class LoadTsFileStatement extends Statement { private List<Boolean> isTableModel; private List<TsFileResource> resources; private List<Long> writePointCountList; + private File schemaEvolutionFile; public LoadTsFileStatement(String filePath) throws FileNotFoundException { this.file = new File(filePath).getAbsoluteFile(); @@ -247,6 +248,10 @@ public class LoadTsFileStatement extends Statement { initAttributes(loadAttributes); } + public File getSchemaEvolutionFile() { + return schemaEvolutionFile; + } + public boolean isAsyncLoad() { return isAsyncLoad; } @@ -264,6 +269,7 @@ public class LoadTsFileStatement extends Statement { if (LoadTsFileConfigurator.parseOrGetDefaultPipeGenerated(loadAttributes)) { markIsGeneratedByPipe(); } + this.schemaEvolutionFile = LoadTsFileConfigurator.parseSevoFile(loadAttributes); } public boolean reconstructStatementIfMiniFileConverted(final List<Boolean> isMiniTsFile) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 835a0977fa5..3e3a4b53b97 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -1262,7 +1262,10 @@ public class DataRegion implements IDataRegionForQuery { } else if (schemaEvolution instanceof ColumnRename) { ColumnRename columnRename = (ColumnRename) schemaEvolution; if (columnRename.getDataType() == TSDataType.OBJECT) { - renameMeasurementForObjects(columnRename.getTableName(), columnRename.getNameBefore(), columnRename.getNameAfter()); + renameMeasurementForObjects( + columnRename.getTableName(), + columnRename.getNameBefore(), + columnRename.getNameAfter()); } } } @@ -3076,7 +3079,8 @@ public class DataRegion implements IDataRegionForQuery { } } - private boolean canBeFullyDeleted(ArrayDeviceTimeIndex deviceTimeIndex, TableDeletionEntry tableDeletionEntry) { + private boolean canBeFullyDeleted( + ArrayDeviceTimeIndex deviceTimeIndex, TableDeletionEntry tableDeletionEntry) { Set<IDeviceID> devicesInFile = deviceTimeIndex.getDevices(); String tableName = tableDeletionEntry.getTableName(); long matchSize = @@ -3153,14 +3157,21 @@ public class DataRegion implements IDataRegionForQuery { && (deletion.getType() == ModType.TABLE_DELETION)) { ArrayDeviceTimeIndex deviceTimeIndex = (ArrayDeviceTimeIndex) timeIndex; TableDeletionEntry tableDeletionEntry = (TableDeletionEntry) deletion; - tableDeletionEntry = evolvedSchema != null? evolvedSchema.rewriteToOriginal(tableDeletionEntry) : tableDeletionEntry; + tableDeletionEntry = + evolvedSchema != null + ? evolvedSchema.rewriteToOriginal(tableDeletionEntry) + : tableDeletionEntry; if (canBeFullyDeleted(deviceTimeIndex, tableDeletionEntry)) { deletedByFiles.add(sealedTsFile); } else { - involvedModificationFiles.add(new Pair<>(sealedTsFile.getModFileForWrite(), tableDeletionEntry)); + involvedModificationFiles.add( + new Pair<>(sealedTsFile.getModFileForWrite(), tableDeletionEntry)); } } else { - involvedModificationFiles.add(new Pair<>(sealedTsFile.getModFileForWrite(), evolvedSchema != null? evolvedSchema.rewriteToOriginal(deletion) : deletion)); + involvedModificationFiles.add( + new Pair<>( + sealedTsFile.getModFileForWrite(), + evolvedSchema != null ? evolvedSchema.rewriteToOriginal(deletion) : deletion)); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java index e6ec34e529b..5f426452a63 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java @@ -33,11 +33,11 @@ import org.apache.iotdb.db.service.metrics.FileMetrics; import org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager; import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; -import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.FullExactMatch; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; import org.apache.iotdb.db.storageengine.dataregion.modification.ModFileManagement; import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.FullExactMatch; import org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDeviceTimeIndex; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TagPredicate.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TagPredicate.java index 8aca6ed5fe7..a0624c8555d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TagPredicate.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TagPredicate.java @@ -84,6 +84,10 @@ public abstract class TagPredicate implements StreamSerializable, BufferSerializ return this; } + public TagPredicate rewriteToFinal(EvolvedSchema evolvedSchema) { + return this; + } + @Override public long serialize(OutputStream stream) throws IOException { return type.serialize(stream); @@ -253,6 +257,11 @@ public abstract class TagPredicate implements StreamSerializable, BufferSerializ public TagPredicate rewriteToOriginal(EvolvedSchema evolvedSchema) { return new FullExactMatch(evolvedSchema.rewriteToOriginal(deviceID)); } + + @Override + public TagPredicate rewriteToFinal(EvolvedSchema evolvedSchema) { + return new FullExactMatch(evolvedSchema.rewriteToFinal(deviceID)); + } } public static class SegmentExactMatch extends TagPredicate { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java index 6e50bf5f0c1..7d04bb1aa3e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java @@ -19,113 +19,151 @@ package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution; -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Collectors; import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry.ModType; import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate; + +import org.apache.tsfile.enums.ColumnCategory; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.IDeviceID.Factory; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.stream.Collectors; public class EvolvedSchema { // the evolved table names after applying all schema evolution operations - private Map<String, String> originalTableNames = new LinkedHashMap<>(); + private Map<String, String> finalToOriginalTableNames = new LinkedHashMap<>(); /** * the first key is the evolved table name, the second key is the evolved column name, and the * value is the original column name before any schema evolution. */ - private Map<String, Map<String, String>> originalColumnNames = new LinkedHashMap<>(); + private Map<String, Map<String, String>> finalToOriginalColumnNames = new LinkedHashMap<>(); + + // the reversed version of finalToOriginalTableNames + private Map<String, String> originalToFinalTableNames = new LinkedHashMap<>(); + + // the reversed version of finalToOriginalColumnNames + private Map<String, Map<String, String>> originalToFinalColumnNames = new LinkedHashMap<>(); public void renameTable(String oldTableName, String newTableName) { - if (!originalTableNames.containsKey(oldTableName) || originalTableNames.get(oldTableName).isEmpty()) { - originalTableNames.put(newTableName, oldTableName); - originalTableNames.put(oldTableName, ""); + if (!finalToOriginalTableNames.containsKey(oldTableName) + || finalToOriginalTableNames.get(oldTableName).isEmpty()) { + finalToOriginalTableNames.put(newTableName, oldTableName); + finalToOriginalTableNames.put(oldTableName, ""); + originalToFinalTableNames.put(oldTableName, newTableName); } else { - // mark the old table name as non-exists - String originalName = originalTableNames.put(oldTableName, ""); - originalTableNames.put(newTableName, originalName); + // mark the old table name as non-exists (empty) + String originalName = finalToOriginalTableNames.put(oldTableName, ""); + finalToOriginalTableNames.put(newTableName, originalName); + originalToFinalTableNames.put(originalName, newTableName); } - if (originalColumnNames.containsKey(oldTableName)) { - Map<String, String> columnMap = originalColumnNames.remove(oldTableName); - originalColumnNames.put(newTableName, columnMap); + if (finalToOriginalColumnNames.containsKey(oldTableName)) { + Map<String, String> columnMap = finalToOriginalColumnNames.remove(oldTableName); + finalToOriginalColumnNames.put(newTableName, columnMap); } } - public void renameColumn(String tableName, String oldColumnName, String newColumnName) { + public void renameColumn(String newTableName, String oldColumnName, String newColumnName) { Map<String, String> columnNameMap = - originalColumnNames.computeIfAbsent(tableName, t -> new LinkedHashMap<>()); + finalToOriginalColumnNames.computeIfAbsent(newTableName, t -> new LinkedHashMap<>()); + String originalTableName = getOriginalTableName(newTableName); if (!columnNameMap.containsKey(oldColumnName) || columnNameMap.get(oldColumnName).isEmpty()) { columnNameMap.put(newColumnName, oldColumnName); columnNameMap.put(oldColumnName, ""); + originalToFinalColumnNames + .computeIfAbsent(originalTableName, t -> new LinkedHashMap<>()) + .put(oldColumnName, newColumnName); } else { // mark the old column name as non-exists String originalName = columnNameMap.put(oldColumnName, ""); columnNameMap.put(newColumnName, originalName); + originalToFinalColumnNames + .computeIfAbsent(originalTableName, t -> new LinkedHashMap<>()) + .put(originalName, newColumnName); } } - public String getOriginalTableName(String evolvedTableName) { - return originalTableNames.getOrDefault(evolvedTableName, evolvedTableName); + public String getOriginalTableName(String finalTableName) { + return finalToOriginalTableNames.getOrDefault(finalTableName, finalTableName); + } + + private String getFinalTableName(String originalTableName) { + return originalToFinalTableNames.getOrDefault(originalTableName, originalTableName); } public String getOriginalColumnName(String tableName, String evolvedColumnName) { - Map<String, String> columnNameMap = originalColumnNames.get(tableName); + Map<String, String> columnNameMap = finalToOriginalColumnNames.get(tableName); if (columnNameMap == null) { return evolvedColumnName; } return columnNameMap.getOrDefault(evolvedColumnName, evolvedColumnName); } + public String getFinalColumnName(String originalTableName, String originalColumnName) { + return originalToFinalColumnNames + .getOrDefault(originalTableName, Collections.emptyMap()) + .getOrDefault(originalColumnName, originalColumnName); + } + @Override public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } EvolvedSchema that = (EvolvedSchema) o; - return Objects.equals(originalTableNames, that.originalTableNames) - && Objects.equals(originalColumnNames, that.originalColumnNames); + return Objects.equals(finalToOriginalTableNames, that.finalToOriginalTableNames) + && Objects.equals(finalToOriginalColumnNames, that.finalToOriginalColumnNames); } @Override public int hashCode() { - return Objects.hash(originalTableNames, originalColumnNames); + return Objects.hash(finalToOriginalTableNames, finalToOriginalColumnNames); } @Override public String toString() { return "EvolvedSchema{" + "originalTableNames=" - + originalTableNames + + finalToOriginalTableNames + ", originalColumnNames=" - + originalColumnNames + + finalToOriginalColumnNames + '}'; } public List<SchemaEvolution> toSchemaEvolutions() { - List<SchemaEvolution> schemaEvolutions = new ArrayList<>(); - originalTableNames.forEach((finalTableName, originalTableName) -> { - if (!originalTableName.isEmpty()) { - schemaEvolutions.add(new TableRename(originalTableName, finalTableName)); - } - }); - originalColumnNames.forEach((finalTableName, originalColumnNameMap) -> { - originalColumnNameMap.forEach((finalColumnName, originalColumnName) -> { - if (!originalColumnName.isEmpty()) { - schemaEvolutions.add(new ColumnRename(finalTableName, originalColumnName, finalColumnName, null)); + List<SchemaEvolution> schemaEvolutions = new ArrayList<>(); + finalToOriginalTableNames.forEach( + (finalTableName, originalTableName) -> { + if (!originalTableName.isEmpty()) { + schemaEvolutions.add(new TableRename(originalTableName, finalTableName)); } }); - }); - return schemaEvolutions; + finalToOriginalColumnNames.forEach( + (finalTableName, originalColumnNameMap) -> { + originalColumnNameMap.forEach( + (finalColumnName, originalColumnName) -> { + if (!originalColumnName.isEmpty()) { + schemaEvolutions.add( + new ColumnRename(finalTableName, originalColumnName, finalColumnName, null)); + } + }); + }); + return schemaEvolutions; } public ModEntry rewriteToOriginal(ModEntry entry) { @@ -135,34 +173,104 @@ public class EvolvedSchema { return entry; } + public ModEntry rewriteToFinal(ModEntry entry) { + if (entry.getType() == ModType.TABLE_DELETION) { + return rewriteToFinal(((TableDeletionEntry) entry)); + } + return entry; + } + public TableDeletionEntry rewriteToOriginal(TableDeletionEntry entry) { DeletionPredicate deletionPredicate = rewriteToOriginal(entry.getPredicate()); return new TableDeletionEntry(deletionPredicate, entry.getTimeRange()); } + public TableDeletionEntry rewriteToFinal(TableDeletionEntry entry) { + DeletionPredicate deletionPredicate = rewriteToFinal(entry.getPredicate()); + return new TableDeletionEntry(deletionPredicate, entry.getTimeRange()); + } + + private DeletionPredicate rewriteToFinal(DeletionPredicate predicate) { + String finalTableName = getFinalTableName(predicate.getTableName()); + TagPredicate tagPredicate = predicate.getTagPredicate(); + tagPredicate = tagPredicate.rewriteToOriginal(this); + List<String> newMeasurements = + predicate.getMeasurementNames().stream() + .map(m -> getFinalColumnName(predicate.getTableName(), m)) + .collect(Collectors.toList()); + return new DeletionPredicate(finalTableName, tagPredicate, newMeasurements); + } + private DeletionPredicate rewriteToOriginal(DeletionPredicate predicate) { String originalTableName = getOriginalTableName(predicate.getTableName()); TagPredicate tagPredicate = predicate.getTagPredicate(); tagPredicate = tagPredicate.rewriteToOriginal(this); List<String> newMeasurements = - predicate.getMeasurementNames().stream().map(m -> getOriginalColumnName(predicate.getTableName(), m)).collect( - Collectors.toList()); + predicate.getMeasurementNames().stream() + .map(m -> getOriginalColumnName(predicate.getTableName(), m)) + .collect(Collectors.toList()); return new DeletionPredicate(originalTableName, tagPredicate, newMeasurements); } public IDeviceID rewriteToOriginal(IDeviceID deviceID) { String tableName = deviceID.getTableName(); String originalTableName = getOriginalTableName(tableName); - return rewriteToOriginal(deviceID, originalTableName); + return rewriteTableName(deviceID, originalTableName); + } + + public IDeviceID rewriteToFinal(IDeviceID deviceID) { + String tableName = deviceID.getTableName(); + String finalTableName = getFinalTableName(tableName); + return rewriteTableName(deviceID, finalTableName); + } + + public void rewriteToFinal( + String originalTableName, List<TimeseriesMetadata> timeseriesMetadataList) { + timeseriesMetadataList.forEach( + timeseriesMetadata -> { + timeseriesMetadata.setMeasurementId( + getFinalColumnName(originalTableName, timeseriesMetadata.getMeasurementId())); + }); + } + + public Map<String, TableSchema> rewriteToFinal(Map<String, TableSchema> tableSchemas) { + Map<String, TableSchema> finalTableSchemas = new HashMap<>(tableSchemas.size()); + for (Map.Entry<String, TableSchema> entry : tableSchemas.entrySet()) { + TableSchema tableSchema = entry.getValue(); + tableSchema = rewriteToFinal(tableSchema); + finalTableSchemas.put(tableSchema.getTableName(), tableSchema); + } + return finalTableSchemas; + } + + public TableSchema rewriteToFinal(TableSchema tableSchema) { + String finalTableName = getFinalTableName(tableSchema.getTableName()); + + List<IMeasurementSchema> measurementSchemas = + new ArrayList<>(tableSchema.getColumnSchemas().size()); + List<ColumnCategory> columnCategories = new ArrayList<>(tableSchema.getColumnTypes().size()); + List<IMeasurementSchema> columnSchemas = tableSchema.getColumnSchemas(); + for (int i = 0, columnSchemasSize = columnSchemas.size(); i < columnSchemasSize; i++) { + IMeasurementSchema measurementSchema = columnSchemas.get(i); + measurementSchemas.add( + new MeasurementSchema( + getFinalColumnName( + tableSchema.getTableName(), measurementSchema.getMeasurementName()), + measurementSchema.getType(), + measurementSchema.getEncodingType(), measurementSchema.getCompressor())); + columnCategories.add(tableSchema.getColumnTypes().get(i)); + } + + return new TableSchema(finalTableName, measurementSchemas, columnCategories); } @SuppressWarnings("SuspiciousSystemArraycopy") - public static IDeviceID rewriteToOriginal(IDeviceID deviceID, String originalTableName) { + public static IDeviceID rewriteTableName(IDeviceID deviceID, String newTableName) { String tableName = deviceID.getTableName(); - if (!tableName.equals(originalTableName)) { + if (!tableName.equals(newTableName)) { Object[] segments = deviceID.getSegments(); String[] newSegments = new String[segments.length]; - newSegments[0] = originalTableName; + newSegments[0] = newTableName; System.arraycopy(segments, 1, newSegments, 1, segments.length - 1); return Factory.DEFAULT_FACTORY.create(newSegments); } @@ -171,8 +279,10 @@ public class EvolvedSchema { public static EvolvedSchema deepCopy(EvolvedSchema evolvedSchema) { EvolvedSchema newEvolvedSchema = new EvolvedSchema(); - newEvolvedSchema.originalTableNames = new LinkedHashMap<>(evolvedSchema.originalTableNames); - newEvolvedSchema.originalColumnNames = new LinkedHashMap<>(evolvedSchema.originalColumnNames); + newEvolvedSchema.finalToOriginalTableNames = + new LinkedHashMap<>(evolvedSchema.finalToOriginalTableNames); + newEvolvedSchema.finalToOriginalColumnNames = + new LinkedHashMap<>(evolvedSchema.finalToOriginalColumnNames); return newEvolvedSchema; } @@ -196,14 +306,14 @@ public class EvolvedSchema { if (schemas[i] != null) { EvolvedSchema newSchema = schemas[i]; for (Entry<String, String> finalOriginalTableName : - newSchema.originalTableNames.entrySet()) { + newSchema.finalToOriginalTableNames.entrySet()) { if (!finalOriginalTableName.getValue().isEmpty()) { mergedSchema.renameTable( finalOriginalTableName.getValue(), finalOriginalTableName.getKey()); } } for (Entry<String, Map<String, String>> finalTableNameColumnNameMapEntry : - newSchema.originalColumnNames.entrySet()) { + newSchema.finalToOriginalColumnNames.entrySet()) { for (Entry<String, String> finalColNameOriginalColNameEntry : finalTableNameColumnNameMapEntry.getValue().entrySet()) { if (!finalColNameOriginalColNameEntry.getValue().isEmpty()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolutionFile.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolutionFile.java index 1c4343cd154..e7dd3326913 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolutionFile.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolutionFile.java @@ -101,4 +101,8 @@ public class SchemaEvolutionFile { } return evolvedSchema; } + + public String getFilePath() { + return filePath; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java index b9fd86316b8..52ec3ab6b8b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java @@ -62,7 +62,9 @@ public class TsFileSet implements Comparable<TsFileSet> { } if (schemaEvolutionFile == null) { - schemaEvolutionFile = new SchemaEvolutionFile(fileSetsDir + File.separator + 0 + SchemaEvolutionFile.FILE_SUFFIX); + schemaEvolutionFile = + new SchemaEvolutionFile( + fileSetsDir + File.separator + 0 + SchemaEvolutionFile.FILE_SUFFIX); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java index 6569a7c6eb3..d699ca433c1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java @@ -34,9 +34,9 @@ import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable; import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunk; import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunkGroup; import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; -import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.FullExactMatch; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.FullExactMatch; import org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndexCacheRecorder; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java index 8478486781b..9bffbfffce4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java @@ -26,6 +26,7 @@ import org.apache.tsfile.external.commons.lang3.StringUtils; import javax.annotation.Nullable; +import java.io.File; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -60,11 +61,21 @@ public class LoadTsFileConfigurator { case ASYNC_LOAD_KEY: validateAsyncLoadParam(value); break; + case SEVO_FILE_PATH_KEY: + validateSevoFilePathParam(value); + break; default: throw new SemanticException("Invalid parameter '" + key + "' for LOAD TSFILE command."); } } + private static void validateSevoFilePathParam(String value) { + File file = new File(value); + if (!file.exists()) { + throw new SemanticException("The sevo file " + value + " does not exist."); + } + } + public static void validateSynonymParameters(final Map<String, String> parameters) { if (parameters.containsKey(DATABASE_KEY) && parameters.containsKey(DATABASE_NAME_KEY)) { throw new SemanticException( @@ -115,6 +126,13 @@ public class LoadTsFileConfigurator { return Objects.nonNull(databaseName) ? databaseName.toLowerCase(Locale.ENGLISH) : null; } + public static final String SEVO_FILE_PATH_KEY = "sevo-file-path"; + + public static @Nullable File parseSevoFile(final Map<String, String> loadAttributes) { + String sevoFilePath = loadAttributes.get(SEVO_FILE_PATH_KEY); + return sevoFilePath != null ? new File(sevoFilePath) : null; + } + public static final String ON_SUCCESS_KEY = "on-success"; public static final String ON_SUCCESS_DELETE_VALUE = "delete"; public static final String ON_SUCCESS_NONE_VALUE = "none"; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java index 72268168258..216a22eaa6f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.storageengine.load.splitter; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.utils.TimePartitionUtils; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchema; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.exception.write.PageException; @@ -66,7 +67,7 @@ public class AlignedChunkData implements ChunkData { protected static final Binary DEFAULT_BINARY = null; protected final TTimePartitionSlot timePartitionSlot; - protected final IDeviceID device; + protected IDeviceID device; protected List<ChunkHeader> chunkHeaderList; protected PublicBAOS byteStream; @@ -508,4 +509,14 @@ public class AlignedChunkData implements ChunkData { + needDecodeChunk + '}'; } + + @Override + public void rewriteToFinal(EvolvedSchema evolvedSchema) { + IDeviceID newDevice = evolvedSchema.rewriteToFinal(device); + chunkHeaderList.forEach( + h -> + h.setMeasurementID( + evolvedSchema.getFinalColumnName(device.getTableName(), h.getMeasurementID()))); + device = newDevice; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/DeletionData.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/DeletionData.java index 0695c7a84de..c140b79bc12 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/DeletionData.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/DeletionData.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.storageengine.load.splitter; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchema; import org.apache.tsfile.utils.ReadWriteIOUtils; @@ -31,7 +32,7 @@ import java.io.IOException; import java.io.InputStream; public class DeletionData implements TsFileData { - private final ModEntry deletion; + private ModEntry deletion; public DeletionData(ModEntry deletion) { this.deletion = deletion; @@ -51,6 +52,11 @@ public class DeletionData implements TsFileData { return TsFileDataType.DELETION; } + @Override + public void rewriteToFinal(EvolvedSchema evolvedSchema) { + deletion = evolvedSchema.rewriteToFinal(deletion); + } + @Override public void serialize(DataOutputStream stream) throws IOException { ReadWriteIOUtils.write(getType().ordinal(), stream); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java index 2310b9cb95c..5ad970c38de 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.storageengine.load.splitter; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.utils.TimePartitionUtils; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchema; import org.apache.tsfile.exception.write.PageException; import org.apache.tsfile.file.header.ChunkHeader; @@ -52,7 +53,7 @@ import static org.apache.iotdb.db.storageengine.load.LoadTsFileManager.MEASUREME public class NonAlignedChunkData implements ChunkData { private final TTimePartitionSlot timePartitionSlot; - private final IDeviceID device; + private IDeviceID device; private final ChunkHeader chunkHeader; private final PublicBAOS byteStream; @@ -316,6 +317,14 @@ public class NonAlignedChunkData implements ChunkData { stream.close(); } + @Override + public void rewriteToFinal(EvolvedSchema evolvedSchema) { + IDeviceID newDevice = evolvedSchema.rewriteToFinal(device); + chunkHeader.setMeasurementID( + evolvedSchema.getFinalColumnName(device.getTableName(), chunkHeader.getMeasurementID())); + device = newDevice; + } + @Override public String toString() { return "NonAlignedChunkData{" diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileData.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileData.java index f24eb45c01b..d3c5d150b37 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileData.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileData.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.storageengine.load.splitter; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchema; import org.apache.tsfile.exception.write.PageException; import org.apache.tsfile.utils.ReadWriteIOUtils; @@ -35,6 +36,8 @@ public interface TsFileData { void serialize(DataOutputStream stream) throws IOException; + void rewriteToFinal(EvolvedSchema evolvedSchema); + static TsFileData deserialize(InputStream stream) throws IOException, PageException, IllegalPathException { final TsFileDataType type = TsFileDataType.values()[ReadWriteIOUtils.readInt(stream)]; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java index 5a75f4fb8e0..f67c0d96ed0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java @@ -26,6 +26,8 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.load.LoadFileException; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchema; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolutionFile; import org.apache.tsfile.common.conf.TSFileConfig; import org.apache.tsfile.common.conf.TSFileDescriptor; @@ -67,7 +69,7 @@ public class TsFileSplitter { private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); private final File tsFile; - private final TsFileDataConsumer consumer; + private TsFileDataConsumer consumer; private Map<Long, IChunkMetadata> offset2ChunkMetadata = new HashMap<>(); private List<ModEntry> deletions = new ArrayList<>(); private Map<Integer, List<AlignedChunkData>> pageIndex2ChunkData = new HashMap<>(); @@ -77,6 +79,7 @@ public class TsFileSplitter { private boolean isAligned; private int timeChunkIndexOfCurrentValueColumn = 0; private Set<TTimePartitionSlot> timePartitionSlots = new HashSet<>(); + private EvolvedSchema evolvedSchema; // Maintain the number of times the chunk of each measurement appears. private Map<String, Integer> valueColumn2TimeChunkIndex = new HashMap<>(); @@ -87,9 +90,18 @@ public class TsFileSplitter { private List<Map<Integer, long[]>> pageIndex2TimesList = null; private List<Boolean> isTimeChunkNeedDecodeList = new ArrayList<>(); - public TsFileSplitter(File tsFile, TsFileDataConsumer consumer) { + public TsFileSplitter(File tsFile, TsFileDataConsumer consumer, File schemaEvolutionFile) { this.tsFile = tsFile; this.consumer = consumer; + if (schemaEvolutionFile != null && schemaEvolutionFile.exists()) { + SchemaEvolutionFile sevoFile = new SchemaEvolutionFile(schemaEvolutionFile.getAbsolutePath()); + try { + this.evolvedSchema = sevoFile.readAsSchema(); + this.consumer = new SchemaEvolutionTsFileDataConsumer(this.consumer, evolvedSchema); + } catch (IOException e) { + logger.error("Cannot read schema evolution file, ignoring it.", e); + } + } } @SuppressWarnings({"squid:S3776", "squid:S6541"}) @@ -588,4 +600,38 @@ public class TsFileSplitter { public interface TsFileDataConsumer { boolean apply(TsFileData tsFileData) throws LoadFileException; } + + public abstract class WrappedTsFileDataConsumer implements TsFileDataConsumer { + + private TsFileDataConsumer delegate; + + public WrappedTsFileDataConsumer(TsFileDataConsumer delegate) { + this.delegate = delegate; + } + + protected abstract TsFileData rewrite(TsFileData tsFileData); + + @Override + public boolean apply(TsFileData tsFileData) throws LoadFileException { + tsFileData = rewrite(tsFileData); + return delegate.apply(tsFileData); + } + } + + private class SchemaEvolutionTsFileDataConsumer extends WrappedTsFileDataConsumer { + + private EvolvedSchema evolvedSchema; + + public SchemaEvolutionTsFileDataConsumer( + TsFileDataConsumer delegate, EvolvedSchema evolvedSchema) { + super(delegate); + this.evolvedSchema = evolvedSchema; + } + + @Override + protected TsFileData rewrite(TsFileData tsFileData) { + tsFileData.rewriteToFinal(evolvedSchema); + return tsFileData; + } + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/path/PatternTreeMapTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/path/PatternTreeMapTest.java index 1db3890d9fe..6f14ac86384 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/path/PatternTreeMapTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/path/PatternTreeMapTest.java @@ -24,9 +24,9 @@ import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PatternTreeMap; import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; -import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.NOP; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.NOP; import org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry; import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory.ModsSerializer; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java index 6eeeda629eb..fcb2ffdb792 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java @@ -30,8 +30,8 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDele import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode; import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; -import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.NOP; import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.NOP; import org.apache.tsfile.read.common.TimeRange; import org.junit.After; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java index bc6dc9e625a..05c1f9361c6 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java @@ -43,8 +43,8 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDele import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode; import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; -import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.NOP; import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.NOP; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.tsfile.read.common.TimeRange; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipePlanTablePatternParseVisitorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipePlanTablePatternParseVisitorTest.java index 63909828e6a..82aee3b1fba 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipePlanTablePatternParseVisitorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipePlanTablePatternParseVisitorTest.java @@ -29,8 +29,8 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.Creat import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.TableDeviceAttributeUpdateNode; import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory; import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; -import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate; import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate; import org.apache.tsfile.read.common.TimeRange; import org.junit.Assert; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java index e425c709815..6b4eec7f63f 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java @@ -41,7 +41,7 @@ public class LoadTsFileNodeTest { TsFileResource resource = new TsFileResource(new File("1")); String database = "root.db"; LoadSingleTsFileNode node = - new LoadSingleTsFileNode(new PlanNodeId(""), resource, false, database, true, 0L); + new LoadSingleTsFileNode(new PlanNodeId(""), resource, false, database, true, 0L, null); Assert.assertTrue(node.isDeleteAfterLoad()); Assert.assertEquals(resource, node.getTsFileResource()); Assert.assertEquals(database, node.getDatabase()); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNodeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNodeTest.java index a6b6bf1bf67..a58b4ae7c00 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNodeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNodeTest.java @@ -24,11 +24,11 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; +import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.And; import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.FullExactMatch; import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.NOP; import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.SegmentExactMatch; -import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALByteBufferForTest; import org.apache.tsfile.file.metadata.IDeviceID.Factory; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java index ec9a4f5eae2..35e2ed84a6c 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java @@ -60,14 +60,12 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.constant import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.constant.InnerUnsequenceCompactionSelector; import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionConfigRestorer; import org.apache.iotdb.db.storageengine.dataregion.flush.FlushManager; -import org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy; import org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy.DirectFlushPolicy; import org.apache.iotdb.db.storageengine.dataregion.memtable.ReadOnlyMemChunk; import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor; import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; -import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate; import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.NOP; import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -1647,9 +1645,7 @@ public class DataRegionTest { dataRegion.syncCloseAllWorkingTsFileProcessors(); Assert.assertFalse(tsFileResource.anyModFileExists()); Assert.assertFalse( - tsFileResource - .getDevices() - .contains(Factory.DEFAULT_FACTORY.create("root.vehicle.d199"))); + tsFileResource.getDevices().contains(Factory.DEFAULT_FACTORY.create("root.vehicle.d199"))); } @Test @@ -1909,13 +1905,12 @@ public class DataRegionTest { } @Test - public void testSchemaEvolutionWithPartialDeletion() - throws WriteProcessException, IOException { + public void testSchemaEvolutionWithPartialDeletion() throws WriteProcessException, IOException { String[] measurements = {"tag1", "s1", "s2"}; MeasurementSchema[] measurementSchemas = { - new MeasurementSchema("tag1", TSDataType.STRING), - new MeasurementSchema("s1", TSDataType.INT64), - new MeasurementSchema("s2", TSDataType.DOUBLE) + new MeasurementSchema("tag1", TSDataType.STRING), + new MeasurementSchema("s1", TSDataType.INT64), + new MeasurementSchema("s2", TSDataType.DOUBLE) }; RelationalInsertRowNode insertRowNode = new RelationalInsertRowNode( @@ -1929,7 +1924,7 @@ public class DataRegionTest { new Object[] {new Binary("tag1".getBytes(StandardCharsets.UTF_8)), 1L, 1.0}, false, new TsTableColumnCategory[] { - TsTableColumnCategory.TAG, TsTableColumnCategory.FIELD, TsTableColumnCategory.FIELD + TsTableColumnCategory.TAG, TsTableColumnCategory.FIELD, TsTableColumnCategory.FIELD }); dataRegion.insert(insertRowNode); insertRowNode.setTime(20); @@ -1938,23 +1933,40 @@ public class DataRegionTest { // table1 -> table2 dataRegion.applySchemaEvolution(Collections.singletonList(new TableRename("table1", "table2"))); // s1 -> s3 - dataRegion.applySchemaEvolution(Collections.singletonList(new ColumnRename("table2", "s1", "s3", null))); + dataRegion.applySchemaEvolution( + Collections.singletonList(new ColumnRename("table2", "s1", "s3", null))); // delete with table2 - TableDeletionEntry tableDeletionEntry = new TableDeletionEntry(new DeletionPredicate("table2"), new TimeRange(0, 15)); - RelationalDeleteDataNode relationalDeleteDataNode = new RelationalDeleteDataNode(new PlanNodeId(""), tableDeletionEntry, dataRegion.getDatabaseName()); + TableDeletionEntry tableDeletionEntry = + new TableDeletionEntry(new DeletionPredicate("table2"), new TimeRange(0, 15)); + RelationalDeleteDataNode relationalDeleteDataNode = + new RelationalDeleteDataNode( + new PlanNodeId(""), tableDeletionEntry, dataRegion.getDatabaseName()); dataRegion.deleteByTable(relationalDeleteDataNode); // delete with s3 - tableDeletionEntry = new TableDeletionEntry(new DeletionPredicate("table2", new NOP(), Collections.singletonList("s3")), new TimeRange(0, 15)); - relationalDeleteDataNode = new RelationalDeleteDataNode(new PlanNodeId(""), tableDeletionEntry, dataRegion.getDatabaseName()); + tableDeletionEntry = + new TableDeletionEntry( + new DeletionPredicate("table2", new NOP(), Collections.singletonList("s3")), + new TimeRange(0, 15)); + relationalDeleteDataNode = + new RelationalDeleteDataNode( + new PlanNodeId(""), tableDeletionEntry, dataRegion.getDatabaseName()); dataRegion.deleteByTable(relationalDeleteDataNode); // delete with table1 - tableDeletionEntry = new TableDeletionEntry(new DeletionPredicate("table1"), new TimeRange(0, 15)); - relationalDeleteDataNode = new RelationalDeleteDataNode(new PlanNodeId(""), tableDeletionEntry, dataRegion.getDatabaseName()); + tableDeletionEntry = + new TableDeletionEntry(new DeletionPredicate("table1"), new TimeRange(0, 15)); + relationalDeleteDataNode = + new RelationalDeleteDataNode( + new PlanNodeId(""), tableDeletionEntry, dataRegion.getDatabaseName()); dataRegion.deleteByTable(relationalDeleteDataNode); // delete with s1 - tableDeletionEntry = new TableDeletionEntry(new DeletionPredicate("table2", new NOP(), Collections.singletonList("s1")), new TimeRange(0, 15)); - relationalDeleteDataNode = new RelationalDeleteDataNode(new PlanNodeId(""), tableDeletionEntry, dataRegion.getDatabaseName()); + tableDeletionEntry = + new TableDeletionEntry( + new DeletionPredicate("table2", new NOP(), Collections.singletonList("s1")), + new TimeRange(0, 15)); + relationalDeleteDataNode = + new RelationalDeleteDataNode( + new PlanNodeId(""), tableDeletionEntry, dataRegion.getDatabaseName()); dataRegion.deleteByTable(relationalDeleteDataNode); List<TsFileResource> sequenceFileList = dataRegion.getSequenceFileList(); @@ -1965,22 +1977,25 @@ public class DataRegionTest { assertEquals("table1", ((TableDeletionEntry) next).getTableName()); next = modEntryIterator.next(); // the s3 modification should be rewritten to s1 - assertEquals(Collections.singletonList("s1"), ((TableDeletionEntry) next).getPredicate().getMeasurementNames()); + assertEquals( + Collections.singletonList("s1"), + ((TableDeletionEntry) next).getPredicate().getMeasurementNames()); next = modEntryIterator.next(); // the table1 modification should be skipped // the s1 modification should be rewritten to empty - assertEquals(Collections.singletonList(""), ((TableDeletionEntry) next).getPredicate().getMeasurementNames()); + assertEquals( + Collections.singletonList(""), + ((TableDeletionEntry) next).getPredicate().getMeasurementNames()); assertFalse(modEntryIterator.hasNext()); } @Test - public void testSchemaEvolutionWithFullDeletion() - throws WriteProcessException, IOException { + public void testSchemaEvolutionWithFullDeletion() throws WriteProcessException, IOException { String[] measurements = {"tag1", "s1", "s2"}; MeasurementSchema[] measurementSchemas = { - new MeasurementSchema("tag1", TSDataType.STRING), - new MeasurementSchema("s1", TSDataType.INT64), - new MeasurementSchema("s2", TSDataType.DOUBLE) + new MeasurementSchema("tag1", TSDataType.STRING), + new MeasurementSchema("s1", TSDataType.INT64), + new MeasurementSchema("s2", TSDataType.DOUBLE) }; RelationalInsertRowNode insertRowNode = new RelationalInsertRowNode( @@ -1994,7 +2009,7 @@ public class DataRegionTest { new Object[] {new Binary("tag1".getBytes(StandardCharsets.UTF_8)), 1L, 1.0}, false, new TsTableColumnCategory[] { - TsTableColumnCategory.TAG, TsTableColumnCategory.FIELD, TsTableColumnCategory.FIELD + TsTableColumnCategory.TAG, TsTableColumnCategory.FIELD, TsTableColumnCategory.FIELD }); dataRegion.insert(insertRowNode); insertRowNode.setTime(20); @@ -2003,11 +2018,15 @@ public class DataRegionTest { // table1 -> table2 dataRegion.applySchemaEvolution(Collections.singletonList(new TableRename("table1", "table2"))); // s1 -> s3 - dataRegion.applySchemaEvolution(Collections.singletonList(new ColumnRename("table2", "s1", "s3", null))); + dataRegion.applySchemaEvolution( + Collections.singletonList(new ColumnRename("table2", "s1", "s3", null))); // delete with table1 - TableDeletionEntry tableDeletionEntry = new TableDeletionEntry(new DeletionPredicate("table1"), new TimeRange(0, 30)); - RelationalDeleteDataNode relationalDeleteDataNode = new RelationalDeleteDataNode(new PlanNodeId(""), tableDeletionEntry, dataRegion.getDatabaseName()); + TableDeletionEntry tableDeletionEntry = + new TableDeletionEntry(new DeletionPredicate("table1"), new TimeRange(0, 30)); + RelationalDeleteDataNode relationalDeleteDataNode = + new RelationalDeleteDataNode( + new PlanNodeId(""), tableDeletionEntry, dataRegion.getDatabaseName()); dataRegion.deleteByTable(relationalDeleteDataNode); // nothing should be deleted List<TsFileResource> sequenceFileList = dataRegion.getSequenceFileList(); @@ -2016,8 +2035,11 @@ public class DataRegionTest { assertFalse(modEntryIterator.hasNext()); // delete with table2 - tableDeletionEntry = new TableDeletionEntry(new DeletionPredicate("table2"), new TimeRange(0, 30)); - relationalDeleteDataNode = new RelationalDeleteDataNode(new PlanNodeId(""), tableDeletionEntry, dataRegion.getDatabaseName()); + tableDeletionEntry = + new TableDeletionEntry(new DeletionPredicate("table2"), new TimeRange(0, 30)); + relationalDeleteDataNode = + new RelationalDeleteDataNode( + new PlanNodeId(""), tableDeletionEntry, dataRegion.getDatabaseName()); dataRegion.deleteByTable(relationalDeleteDataNode); // the file should be deleted sequenceFileList = dataRegion.getSequenceFileList(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/BatchedCompactionWithTsFileSplitterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/BatchedCompactionWithTsFileSplitterTest.java index 272d9e6ae5c..b64e67a5725 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/BatchedCompactionWithTsFileSplitterTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/BatchedCompactionWithTsFileSplitterTest.java @@ -301,7 +301,8 @@ public class BatchedCompactionWithTsFileSplitterTest extends AbstractCompactionT throw new RuntimeException(e); } return true; - }); + }, + null); splitter.splitTsFileByDataPartition(); List<TsFileResource> splitResources = new ArrayList<>(); for (Map.Entry<TTimePartitionSlot, TestLoadTsFileIOWriter> entry : writerMap.entrySet()) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/CompactionWithAllNullRowsTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/CompactionWithAllNullRowsTest.java index a83fa920853..69e83e769a3 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/CompactionWithAllNullRowsTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/CompactionWithAllNullRowsTest.java @@ -35,9 +35,9 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadPointCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask; import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; +import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate; import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.FullExactMatch; -import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.tsfile.exception.write.WriteProcessException;
