This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch fixIT0512 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 83148342d4f15900f7d08db17cc842685c4d6454 Author: shuwenwei <[email protected]> AuthorDate: Wed May 13 09:58:09 2026 +0800 Fix flaky table aggregation partition IT --- ...ableAggregationQueryWithNetworkPartitionIT.java | 60 ++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationQueryWithNetworkPartitionIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationQueryWithNetworkPartitionIT.java index bb00451ea8f..3e8bc96f1db 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationQueryWithNetworkPartitionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationQueryWithNetworkPartitionIT.java @@ -38,11 +38,19 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import static org.apache.iotdb.db.it.utils.TestUtils.prepareTableData; @@ -55,6 +63,7 @@ public class IoTDBTableAggregationQueryWithNetworkPartitionIT { private static final int testReplicationFactor = 3; private static final long testTimePartitionInterval = 604800000; private static final int testDataRegionGroupPerDatabase = 4; + private static final String[] TARGET_TIME_PARTITIONS = new String[] {"0", "-1"}; protected static final String DATABASE_NAME = "test"; protected static final String[] createSqls = new String[] { @@ -80,6 +89,57 @@ public class IoTDBTableAggregationQueryWithNetworkPartitionIT { .setEnableTopologyProbing(true); EnvFactory.getEnv().initClusterEnvironment(1, 3); prepareTableData(createSqls); + waitTsFilesOnDataNodes(); + } + + private static void waitTsFilesOnDataNodes() + throws IoTDBConnectionException, + StatementExecutionException, + IOException, + InterruptedException { + for (int i = 0; i < 30; i++) { + boolean allReady = true; + for (DataNodeWrapper dataNode : EnvFactory.getEnv().getDataNodeWrapperList()) { + if (!hasTsFilesInTimePartitions(dataNode, DATABASE_NAME, TARGET_TIME_PARTITIONS)) { + allReady = false; + break; + } + } + if (allReady) { + Thread.sleep(TimeUnit.SECONDS.toMillis(1)); + try (ITableSession session = + EnvFactory.getEnv().getTableSessionConnectionWithDB(DATABASE_NAME)) { + session.executeNonQueryStatement("flush"); + } + return; + } + Thread.sleep(TimeUnit.SECONDS.toMillis(1)); + } + Assert.fail("Data is not synchronized to all DataNodes"); + } + + private static boolean hasTsFilesInTimePartitions( + DataNodeWrapper dataNode, String database, String[] timePartitions) throws IOException { + Path dataDir = Paths.get(dataNode.getDataNodeDir(), "data"); + if (!Files.exists(dataDir)) { + return false; + } + + Set<String> existingTimePartitions = new HashSet<>(); + try (Stream<Path> paths = Files.walk(dataDir)) { + paths + .filter(path -> path.getFileName().toString().endsWith(".tsfile")) + .filter(path -> path.toString().contains(File.separator + database + File.separator)) + .filter(path -> path.getParent() != null) + .forEach(path -> existingTimePartitions.add(path.getParent().getFileName().toString())); + } + + for (String timePartition : timePartitions) { + if (!existingTimePartitions.contains(timePartition)) { + return false; + } + } + return true; } @AfterClass
