This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch force_ci/support_schema_evolution in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3d2c88095c1e1da0ff7cf44fb54f7b2db18d4a20 Author: Tian Jiang <[email protected]> AuthorDate: Tue Dec 30 18:45:58 2025 +0800 remove empty file sets in start up --- .../relational/it/db/it/IoTDBLoadTsFileIT.java | 23 ++++++++++++++++- .../db/storageengine/dataregion/DataRegion.java | 30 +++++++++++++++++++--- .../dataregion/tsfile/evolution/ColumnRename.java | 6 ++++- .../dataregion/tsfile/fileset/TsFileSet.java | 5 ++++ 4 files changed, 58 insertions(+), 6 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java index 46cc2e63adb..b9241b66182 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java @@ -20,6 +20,7 @@ package org.apache.iotdb.relational.it.db.it; import java.sql.SQLException; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.ColumnRename; import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution; import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolutionFile; import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.TableRename; @@ -293,7 +294,9 @@ public class IoTDBLoadTsFileIT { SchemaEvolutionFile schemaEvolutionFile = new SchemaEvolutionFile(sevoFile.getAbsolutePath()); SchemaEvolution schemaEvolution = new TableRename(SchemaConfig.TABLE_0, SchemaConfig.TABLE_1); schemaEvolutionFile.append(Collections.singletonList(schemaEvolution)); - // rename INT322INT32 + // rename INT322INT32 to INT322INT32_NEW + schemaEvolution = new ColumnRename(SchemaConfig.TABLE_1, "INT322INT32", "INT322INT32_NEW"); + schemaEvolutionFile.append(Collections.singletonList(schemaEvolution)); try (final Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); @@ -323,6 +326,24 @@ public class IoTDBLoadTsFileIT { } } + // cannot query using INT322INT32 + try (final ResultSet resultSet = + statement.executeQuery(String.format("select count(%s) from %s", "INT322INT32", SchemaConfig.TABLE_1))) { + fail(); + } catch (SQLException e) { + assertEquals("616: Column 'int322int32' cannot be resolved", e.getMessage()); + } + + // can query with INT322INT32_NEW + try (final ResultSet resultSet = + statement.executeQuery(String.format("select count(%s) from %s", "INT322INT32_NEW", SchemaConfig.TABLE_1))) { + if (resultSet.next()) { + Assert.assertEquals(lineCount, resultSet.getLong(1)); + } else { + Assert.fail("This ResultSet is empty."); + } + } + try (final ResultSet resultSet = statement.executeQuery("show tables")) { Assert.assertTrue(resultSet.next()); assertEquals(SchemaConfig.TABLE_1, resultSet.getString(1)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 3e3a4b53b97..e64350500a3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -660,6 +660,7 @@ public class DataRegion implements IDataRegionForQuery { } // ensure that seq and unseq files in the same partition have the same TsFileSet Map<Long, List<TsFileSet>> recoveredPartitionTsFileSetMap = new HashMap<>(); + Map<Long, Long> partitionMinimalVersion = new HashMap<>(); for (Entry<Long, List<TsFileResource>> partitionFiles : partitionTmpSeqTsFiles.entrySet()) { Callable<Void> asyncRecoverTask = @@ -669,7 +670,8 @@ public class DataRegion implements IDataRegionForQuery { partitionFiles.getValue(), fileTimeIndexMap, true, - recoveredPartitionTsFileSetMap); + recoveredPartitionTsFileSetMap, + partitionMinimalVersion); if (asyncRecoverTask != null) { asyncTsFileResourceRecoverTaskList.add(asyncRecoverTask); } @@ -683,7 +685,7 @@ public class DataRegion implements IDataRegionForQuery { partitionFiles.getValue(), fileTimeIndexMap, false, - recoveredPartitionTsFileSetMap); + recoveredPartitionTsFileSetMap, partitionMinimalVersion); if (asyncRecoverTask != null) { asyncTsFileResourceRecoverTaskList.add(asyncRecoverTask); } @@ -698,6 +700,18 @@ public class DataRegion implements IDataRegionForQuery { Long.MAX_VALUE, lastFlushTimeMap.getMemSize(latestPartitionId))); } + + // remove empty file sets + for (Entry<Long, List<TsFileSet>> entry : recoveredPartitionTsFileSetMap.entrySet()) { + long partitionId = entry.getKey(); + // if no file in the partition, all filesets should be cleared + long minimumFileVersion = partitionMinimalVersion.getOrDefault(partitionId, Long.MAX_VALUE); + for (TsFileSet tsFileSet : entry.getValue()) { + if (tsFileSet.getEndVersion() < minimumFileVersion) { + tsFileSet.remove(); + } + } + } } // wait until all unsealed TsFiles have been recovered for (WALRecoverListener recoverListener : recoverListeners) { @@ -1063,16 +1077,24 @@ public class DataRegion implements IDataRegionForQuery { List<TsFileResource> resourceList, Map<TsFileID, FileTimeIndex> fileTimeIndexMap, boolean isSeq, - Map<Long, List<TsFileSet>> partitionTsFileSetMap) { + Map<Long, List<TsFileSet>> partitionTsFileSetMap, Map<Long, Long> partitionMinimalVersion) { List<TsFileResource> resourceListForAsyncRecover = new ArrayList<>(); List<TsFileResource> resourceListForSyncRecover = new ArrayList<>(); Callable<Void> asyncRecoverTask = null; + List<TsFileSet> tsFileSets = recoverTsFileSets(partitionId, partitionTsFileSetMap); for (TsFileResource tsFileResource : resourceList) { - List<TsFileSet> tsFileSets = recoverTsFileSets(partitionId, partitionTsFileSetMap); long fileVersion = tsFileResource.getTsFileID().fileVersion; + partitionMinimalVersion.compute(partitionId, (pid, oldVersion) -> { + if (oldVersion == null) { + return fileVersion; + } + return Math.min(oldVersion, fileVersion); + }); + int i = Collections.binarySearch(tsFileSets, TsFileSet.comparatorKey(fileVersion)); if (i < 0) { + // if the binary search does not find an exact match, -i indicates the closest one i = -i; } if (i < tsFileSets.size()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/ColumnRename.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/ColumnRename.java index 9d13ce4f7e6..adb6e7e9359 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/ColumnRename.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/ColumnRename.java @@ -40,10 +40,14 @@ public class ColumnRename implements SchemaEvolution { // for deserialization public ColumnRename() {} - public ColumnRename(String tableName, String nameBefore, String nameAfter, TSDataType dataType) { + public ColumnRename(String tableName, String nameBefore, String nameAfter) { this.tableName = tableName.toLowerCase(); this.nameBefore = nameBefore.toLowerCase(); this.nameAfter = nameAfter.toLowerCase(); + } + + public ColumnRename(String tableName, String nameBefore, String nameAfter, TSDataType dataType) { + this(tableName, nameBefore, nameAfter); this.dataType = dataType; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java index 52ec3ab6b8b..c2364b20b61 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java @@ -27,6 +27,7 @@ import java.io.File; import java.io.IOException; import java.util.Collection; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.tsfile.external.commons.io.FileUtils; /** TsFileSet represents a set of TsFiles in a time partition whose version <= endVersion. */ public class TsFileSet implements Comparable<TsFileSet> { @@ -127,4 +128,8 @@ public class TsFileSet implements Comparable<TsFileSet> { public String toString() { return "TsFileSet{" + "endVersion=" + endVersion + ", fileSetDir=" + fileSetDir + '}'; } + + public void remove() { + FileUtils.deleteQuietly(fileSetDir); + } }
