This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch fix-region-migrate-deletion-multidir in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 513e324c2e0c4becf64a10d77f861ba335cbac1f Author: Caideyipi <[email protected]> AuthorDate: Tue Jun 30 12:14:18 2026 +0800 Fix deletion visibility in region migration IT --- ...TDBRegionMigrateWithDeletionMultiDataDirIT.java | 58 +++---- ...gionMigrateWithDeletionMultiDataDirTableIT.java | 58 +++---- .../daily/iotv1/RegionMigrateFileAssertions.java | 167 +++++++++++++++++++++ 3 files changed, 203 insertions(+), 80 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithDeletionMultiDataDirIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithDeletionMultiDataDirIT.java index 8214bf621cc..eabb11b3155 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithDeletionMultiDataDirIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithDeletionMultiDataDirIT.java @@ -37,13 +37,18 @@ import org.junit.runner.RunWith; import java.sql.Connection; import java.sql.ResultSet; import java.sql.Statement; -import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getAllDataNodes; import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getDataRegionMapWithLeader; +import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.MULTI_DATA_DIRS; +import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.awaitModsVisibleOnReplicas; +import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.awaitRegionReplicas; +import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.awaitTsFileResourceVisibleOnReplicas; +import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.awaitTsFileVisibleOnReplicas; +import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.getReplicaDataNodeIds; /** * Tree-model coverage for IoTConsensus region migration over multiple data dirs: a deletion (mods) @@ -56,9 +61,6 @@ import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperatio @Category({ClusterIT.class}) public class IoTDBRegionMigrateWithDeletionMultiDataDirIT { - private static final String MULTI_DATA_DIRS = - "data/datanode/data/disk0,data/datanode/data/disk1,data/datanode/data/disk2"; - @Before public void setUp() throws Exception { EnvFactory.getEnv() @@ -83,13 +85,21 @@ public class IoTDBRegionMigrateWithDeletionMultiDataDirIT { statement.execute( "INSERT INTO root.db.d1(timestamp, s1) VALUES (100, 100), (200, 200), (300, 300)"); statement.execute("FLUSH"); - statement.execute("DELETE FROM root.db.d1.s1 WHERE time <= 200"); - statement.execute("FLUSH"); Map<Integer, Pair<Integer, Set<Integer>>> dataRegionMapWithLeader = getDataRegionMapWithLeader(statement); int dataRegionIdForTest = dataRegionMapWithLeader.keySet().stream().max(Integer::compareTo).orElseThrow(); + Set<Integer> initialReplicaDataNodeIds = + getReplicaDataNodeIds(statement, dataRegionIdForTest); + + awaitTsFileVisibleOnReplicas("root.db", dataRegionIdForTest, initialReplicaDataNodeIds); + awaitTsFileResourceVisibleOnReplicas( + statement, "root.db", dataRegionIdForTest, initialReplicaDataNodeIds); + + statement.execute("DELETE FROM root.db.d1.s1 WHERE time <= 200"); + statement.execute("FLUSH"); + awaitModsVisibleOnReplicas("root.db", dataRegionIdForTest, initialReplicaDataNodeIds); assertDeletionVisibleOnAllReplicas(dataRegionIdForTest, 1); Pair<Integer, Set<Integer>> leaderAndNodes = dataRegionMapWithLeader.get(dataRegionIdForTest); @@ -107,25 +117,8 @@ public class IoTDBRegionMigrateWithDeletionMultiDataDirIT { String.format( "migrate region %d from %d to %d", dataRegionIdForTest, leaderId, destDataNodeId)); - final int finalDestDataNodeId = destDataNodeId; - Awaitility.await() - .atMost(10, TimeUnit.MINUTES) - .pollDelay(1, TimeUnit.SECONDS) - .pollInterval(2, TimeUnit.SECONDS) - .untilAsserted( - () -> { - try (ResultSet showRegions = statement.executeQuery("SHOW REGIONS")) { - boolean migrated = false; - while (showRegions.next()) { - if (showRegions.getInt("RegionId") == dataRegionIdForTest - && showRegions.getInt("DataNodeId") == finalDestDataNodeId) { - migrated = true; - break; - } - } - Assert.assertTrue(migrated); - } - }); + awaitRegionReplicas(statement, dataRegionIdForTest, Set.of(followerId, destDataNodeId)); + awaitModsVisibleOnReplicas("root.db", dataRegionIdForTest, Set.of(destDataNodeId)); assertDeletionVisibleOnAllReplicas(dataRegionIdForTest, 1); } @@ -164,19 +157,4 @@ public class IoTDBRegionMigrateWithDeletionMultiDataDirIT { } } } - - private Set<Integer> getReplicaDataNodeIds(Statement statement, int dataRegionId) - throws Exception { - Set<Integer> replicaDataNodeIds = new HashSet<>(); - try (ResultSet showRegions = statement.executeQuery("SHOW REGIONS")) { - while (showRegions.next()) { - if ("DataRegion".equals(showRegions.getString("Type")) - && showRegions.getInt("RegionId") == dataRegionId) { - replicaDataNodeIds.add(showRegions.getInt("DataNodeId")); - } - } - } - Assert.assertFalse(replicaDataNodeIds.isEmpty()); - return replicaDataNodeIds; - } } diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithDeletionMultiDataDirTableIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithDeletionMultiDataDirTableIT.java index 59d5a18ccf4..ee71d48fc66 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithDeletionMultiDataDirTableIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithDeletionMultiDataDirTableIT.java @@ -39,13 +39,18 @@ import org.junit.runner.RunWith; import java.sql.Connection; import java.sql.ResultSet; import java.sql.Statement; -import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getAllDataNodes; import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getDataRegionMapWithLeader; +import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.MULTI_DATA_DIRS; +import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.awaitModsVisibleOnReplicas; +import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.awaitRegionReplicas; +import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.awaitTsFileResourceVisibleOnReplicas; +import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.awaitTsFileVisibleOnReplicas; +import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.getReplicaDataNodeIds; /** * Table-model twin of {@link IoTDBRegionMigrateWithDeletionMultiDataDirIT}: a deletion (mods) must @@ -56,9 +61,6 @@ import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperatio @Category({TableClusterIT.class}) public class IoTDBRegionMigrateWithDeletionMultiDataDirTableIT { - private static final String MULTI_DATA_DIRS = - "data/datanode/data/disk0,data/datanode/data/disk1,data/datanode/data/disk2"; - @Before public void setUp() throws Exception { EnvFactory.getEnv() @@ -84,13 +86,21 @@ public class IoTDBRegionMigrateWithDeletionMultiDataDirTableIT { statement.execute("CREATE TABLE t1 (s1 INT64 FIELD)"); statement.execute("INSERT INTO t1 (time, s1) VALUES (100, 100), (200, 200), (300, 300)"); statement.execute("FLUSH"); - statement.execute("DELETE FROM t1 WHERE time <= 200"); - statement.execute("FLUSH"); Map<Integer, Pair<Integer, Set<Integer>>> dataRegionMapWithLeader = getDataRegionMapWithLeader(statement); int dataRegionIdForTest = dataRegionMapWithLeader.keySet().stream().max(Integer::compareTo).orElseThrow(); + Set<Integer> initialReplicaDataNodeIds = + getReplicaDataNodeIds(statement, dataRegionIdForTest); + + awaitTsFileVisibleOnReplicas("test", dataRegionIdForTest, initialReplicaDataNodeIds); + awaitTsFileResourceVisibleOnReplicas( + statement, "test", dataRegionIdForTest, initialReplicaDataNodeIds); + + statement.execute("DELETE FROM t1 WHERE time <= 200"); + statement.execute("FLUSH"); + awaitModsVisibleOnReplicas("test", dataRegionIdForTest, initialReplicaDataNodeIds); assertDeletionVisibleOnAllReplicas(statement, dataRegionIdForTest, 1); Pair<Integer, Set<Integer>> leaderAndNodes = dataRegionMapWithLeader.get(dataRegionIdForTest); @@ -108,25 +118,8 @@ public class IoTDBRegionMigrateWithDeletionMultiDataDirTableIT { String.format( "migrate region %d from %d to %d", dataRegionIdForTest, leaderId, destDataNodeId)); - final int finalDestDataNodeId = destDataNodeId; - Awaitility.await() - .atMost(10, TimeUnit.MINUTES) - .pollDelay(1, TimeUnit.SECONDS) - .pollInterval(2, TimeUnit.SECONDS) - .untilAsserted( - () -> { - try (ResultSet showRegions = statement.executeQuery("SHOW REGIONS")) { - boolean migrated = false; - while (showRegions.next()) { - if (showRegions.getInt("RegionId") == dataRegionIdForTest - && showRegions.getInt("DataNodeId") == finalDestDataNodeId) { - migrated = true; - break; - } - } - Assert.assertTrue(migrated); - } - }); + awaitRegionReplicas(statement, dataRegionIdForTest, Set.of(followerId, destDataNodeId)); + awaitModsVisibleOnReplicas("test", dataRegionIdForTest, Set.of(destDataNodeId)); assertDeletionVisibleOnAllReplicas(statement, dataRegionIdForTest, 1); } @@ -167,19 +160,4 @@ public class IoTDBRegionMigrateWithDeletionMultiDataDirTableIT { } } } - - private Set<Integer> getReplicaDataNodeIds(Statement statement, int dataRegionId) - throws Exception { - Set<Integer> replicaDataNodeIds = new HashSet<>(); - try (ResultSet showRegions = statement.executeQuery("SHOW REGIONS")) { - while (showRegions.next()) { - if ("DataRegion".equals(showRegions.getString("Type")) - && showRegions.getInt("RegionId") == dataRegionId) { - replicaDataNodeIds.add(showRegions.getInt("DataNodeId")); - } - } - } - Assert.assertFalse(replicaDataNodeIds.isEmpty()); - return replicaDataNodeIds; - } } diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/RegionMigrateFileAssertions.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/RegionMigrateFileAssertions.java new file mode 100644 index 00000000000..1cdf5981d34 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/RegionMigrateFileAssertions.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1; + +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; + +import org.awaitility.Awaitility; +import org.junit.Assert; + +import java.io.File; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +final class RegionMigrateFileAssertions { + + static final String MULTI_DATA_DIRS = + "data/datanode/data/disk0,data/datanode/data/disk1,data/datanode/data/disk2"; + + private static final String SEQUENCE_FOLDER = "sequence"; + private static final String TSFILE_SUFFIX = ".tsfile"; + private static final String TSFILE_RESOURCE_SUFFIX = ".tsfile.resource"; + private static final String MODS_SUFFIX = ".mods2"; + + private RegionMigrateFileAssertions() {} + + static void awaitTsFileVisibleOnReplicas( + String database, int dataRegionId, Set<Integer> dataNodeIds) { + awaitFileVisibleOnReplicas(database, dataRegionId, dataNodeIds, TSFILE_SUFFIX); + } + + static void awaitTsFileResourceVisibleOnReplicas( + String database, int dataRegionId, Set<Integer> dataNodeIds) { + awaitFileVisibleOnReplicas(database, dataRegionId, dataNodeIds, TSFILE_RESOURCE_SUFFIX); + } + + static void awaitTsFileResourceVisibleOnReplicas( + Statement statement, String database, int dataRegionId, Set<Integer> dataNodeIds) { + awaitFileVisibleOnReplicas( + statement, database, dataRegionId, dataNodeIds, TSFILE_RESOURCE_SUFFIX); + } + + static void awaitModsVisibleOnReplicas( + String database, int dataRegionId, Set<Integer> dataNodeIds) { + awaitFileVisibleOnReplicas(database, dataRegionId, dataNodeIds, MODS_SUFFIX); + } + + static void awaitRegionReplicas( + Statement statement, int dataRegionId, Set<Integer> expectedReplicaDataNodeIds) { + Awaitility.await() + .atMost(10, TimeUnit.MINUTES) + .pollDelay(1, TimeUnit.SECONDS) + .pollInterval(2, TimeUnit.SECONDS) + .untilAsserted( + () -> + Assert.assertEquals( + expectedReplicaDataNodeIds, getReplicaDataNodeIds(statement, dataRegionId))); + } + + static Set<Integer> getReplicaDataNodeIds(Statement statement, int dataRegionId) + throws Exception { + Set<Integer> replicaDataNodeIds = new HashSet<>(); + try (ResultSet showRegions = statement.executeQuery("SHOW REGIONS")) { + while (showRegions.next()) { + if ("DataRegion".equals(showRegions.getString("Type")) + && showRegions.getInt("RegionId") == dataRegionId) { + replicaDataNodeIds.add(showRegions.getInt("DataNodeId")); + } + } + } + Assert.assertFalse(replicaDataNodeIds.isEmpty()); + return replicaDataNodeIds; + } + + private static void awaitFileVisibleOnReplicas( + String database, int dataRegionId, Set<Integer> dataNodeIds, String suffix) { + awaitFileVisibleOnReplicas(null, database, dataRegionId, dataNodeIds, suffix); + } + + private static void awaitFileVisibleOnReplicas( + Statement flushStatement, + String database, + int dataRegionId, + Set<Integer> dataNodeIds, + String suffix) { + for (int dataNodeId : dataNodeIds) { + DataNodeWrapper dataNodeWrapper = + EnvFactory.getEnv().dataNodeIdToWrapper(dataNodeId).orElseThrow(); + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .pollDelay(500, TimeUnit.MILLISECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .untilAsserted( + () -> { + if (flushStatement != null) { + flushStatement.execute("FLUSH"); + } + Assert.assertTrue( + String.format( + "Expected file with suffix %s for database %s region %d on DataNode %d", + suffix, database, dataRegionId, dataNodeId), + containsSequenceFileWithSuffix( + dataNodeWrapper, database, dataRegionId, suffix)); + }); + } + } + + private static boolean containsSequenceFileWithSuffix( + DataNodeWrapper dataNodeWrapper, String database, int dataRegionId, String suffix) { + for (String dataDir : MULTI_DATA_DIRS.split(",")) { + File regionDir = + new File( + dataNodeWrapper.getNodePath(), + dataDir + + File.separator + + SEQUENCE_FOLDER + + File.separator + + database + + File.separator + + dataRegionId); + if (containsFileWithSuffix(regionDir, suffix)) { + return true; + } + } + return false; + } + + private static boolean containsFileWithSuffix(File file, String suffix) { + if (!file.exists()) { + return false; + } + if (file.isFile()) { + // IoTConsensus followers can create an open zero-byte TsFile before a later FLUSH closes it. + return file.getName().endsWith(suffix) && (TSFILE_SUFFIX.equals(suffix) || file.length() > 0); + } + File[] children = file.listFiles(); + if (children == null) { + return false; + } + for (File child : children) { + if (containsFileWithSuffix(child, suffix)) { + return true; + } + } + return false; + } +}
