This is an automated email from the ASF dual-hosted git repository.
CRZbulabula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f5175b97aad Fix flaky region-migration/cluster ITs and enable IoTV2
daily migration tests (#17924)
f5175b97aad is described below
commit f5175b97aad34ca3cb75b1fe24ae8812da877ad2
Author: Yongzao <[email protected]>
AuthorDate: Mon Jun 15 11:44:35 2026 +0800
Fix flaky region-migration/cluster ITs and enable IoTV2 daily migration
tests (#17924)
---
...IoTDBRegionOperationReliabilityITFramework.java | 31 +++++++++++++
...BRegionMigrateDataNodeCrashForIoTV2BatchIT.java | 19 ++++----
...RegionMigrateDataNodeCrashForIoTV2StreamIT.java | 19 ++++----
...IoTDBRegionMigrateClusterCrashIoTV2BatchIT.java | 6 +--
...DBRegionMigrateConfigNodeCrashIoTV2BatchIT.java | 9 ++--
...oTDBRegionMigrateClusterCrashIoTV2StreamIT.java | 9 ++--
...BRegionMigrateConfigNodeCrashIoTV2StreamIT.java | 9 ++--
.../iotdb/db/it/IoTDBCustomizedClusterIT.java | 54 +++++++++++++++-------
.../IoTDBIoTConsensusV23C3DBasicITBase.java | 48 +++++++++++++++++--
9 files changed, 145 insertions(+), 59 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java
index 9b9c4ad41a1..4ab85264fc8 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java
@@ -355,6 +355,17 @@ public class IoTDBRegionOperationReliabilityITFramework {
Assert.fail();
}
+ // The kill point is detected by a background thread tailing the node
log, so the migration
+ // result (observed by awaitUntilSuccess above) can become visible
before that thread has read
+ // and processed the kill-point line of the last migration phase (e.g.
+ // RemoveRegionLocationCache). Give that thread a short grace period to
catch up before
+ // asserting, otherwise checkKillPointsAllTriggered may fail spuriously
with "Some kill points
+ // was not triggered". This is best-effort: the authoritative assertion
remains
+ // checkKillPointsAllTriggered, which still fails the test if a kill
point genuinely never
+ // triggers (e.g. the badKillPoint test).
+ graceWaitForKillPointsTriggered(configNodeKeywords);
+ graceWaitForKillPointsTriggered(dataNodeKeywords);
+
// make sure all kill points have been triggered
checkKillPointsAllTriggered(configNodeKeywords);
checkKillPointsAllTriggered(dataNodeKeywords);
@@ -520,6 +531,26 @@ public class IoTDBRegionOperationReliabilityITFramework {
Awaitility.await().atMost(2, TimeUnit.MINUTES).until(killPoints::isEmpty);
}
+ /**
+ * Best-effort wait for all kill points to be triggered. The kill point is
detected by a
+ * background thread tailing the node log, so there can be a short lag
between the migration
+ * result becoming visible and that thread processing the kill-point line.
This gives it a brief
+ * grace period to catch up. Unlike {@link #awaitKillPointsTriggered}, it
never throws: the
+ * authoritative check is {@link #checkKillPointsAllTriggered}, so a kill
point that genuinely
+ * never triggers (e.g. the badKillPoint test) is still caught there as an
AssertionError rather
+ * than masked here.
+ */
+ private static void graceWaitForKillPointsTriggered(KeySetView<String,
Boolean> killPoints) {
+ if (killPoints.isEmpty()) {
+ return;
+ }
+ try {
+ Awaitility.await().atMost(1,
TimeUnit.MINUTES).until(killPoints::isEmpty);
+ } catch (ConditionTimeoutException ignored) {
+ // Fall through to checkKillPointsAllTriggered, which makes the real
assertion.
+ }
+ }
+
private static String buildRegionMigrateCommand(int who, int from, int to) {
String result = String.format(REGION_MIGRATE_COMMAND_FORMAT, who, from,
to);
LOGGER.info(result);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/datanodecrash/iotv2/batch/IoTDBRegionMigrateDataNodeCrashForIoTV2BatchIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/datanodecrash/iotv2/batch/IoTDBRegionMigrateDataNodeCrashForIoTV2BatchIT.java
index ad973814498..ea9a40e4699 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/datanodecrash/iotv2/batch/IoTDBRegionMigrateDataNodeCrashForIoTV2BatchIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/datanodecrash/iotv2/batch/IoTDBRegionMigrateDataNodeCrashForIoTV2BatchIT.java
@@ -25,11 +25,10 @@ import
org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliab
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.DailyIT;
+import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
-// TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein
-
@Category({DailyIT.class})
@RunWith(IoTDBTestRunner.class)
public class IoTDBRegionMigrateDataNodeCrashForIoTV2BatchIT
@@ -41,7 +40,7 @@ public class IoTDBRegionMigrateDataNodeCrashForIoTV2BatchIT
private final int configNodeNum = 1;
private final int dataNodeNum = 3;
- // @Test
+ @Test
public void coordinatorCrashDuringAddPeerTransition() throws Exception {
failTest(
2,
@@ -53,7 +52,7 @@ public class IoTDBRegionMigrateDataNodeCrashForIoTV2BatchIT
KillNode.COORDINATOR_DATANODE);
}
- // @Test
+ @Test
public void coordinatorCrashDuringAddPeerDone() throws Exception {
failTest(
2,
@@ -69,9 +68,13 @@ public class IoTDBRegionMigrateDataNodeCrashForIoTV2BatchIT
// region Original DataNode crash tests
- // @Test
+ @Test
public void originalCrashDuringAddPeerDone() throws Exception {
- failTest(
+ // Once the add-peer phase is done, the new peer already holds the data,
so the migration is
+ // designed to tolerate the original (source) DataNode crashing
afterwards: it completes
+ // successfully and merely leaves the region files on the dead node to be
cleaned up later.
+ // Hence this is a successTest, not a failTest.
+ successTest(
2,
2,
1,
@@ -85,7 +88,7 @@ public class IoTDBRegionMigrateDataNodeCrashForIoTV2BatchIT
// region Destination DataNode crash tests
- // @Test
+ @Test
public void destinationCrashDuringCreateLocalPeer() throws Exception {
failTest(
2,
@@ -97,7 +100,7 @@ public class IoTDBRegionMigrateDataNodeCrashForIoTV2BatchIT
KillNode.DESTINATION_DATANODE);
}
- // @Test
+ @Test
public void destinationCrashDuringAddPeerDone() throws Exception {
failTest(
2,
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/datanodecrash/iotv2/stream/IoTDBRegionMigrateDataNodeCrashForIoTV2StreamIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/datanodecrash/iotv2/stream/IoTDBRegionMigrateDataNodeCrashForIoTV2StreamIT.java
index eeca6dacc19..a8aa62f81b0 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/datanodecrash/iotv2/stream/IoTDBRegionMigrateDataNodeCrashForIoTV2StreamIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/datanodecrash/iotv2/stream/IoTDBRegionMigrateDataNodeCrashForIoTV2StreamIT.java
@@ -28,11 +28,10 @@ import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.DailyIT;
import org.junit.Before;
+import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
-// TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein
-
@Category({DailyIT.class})
@RunWith(IoTDBTestRunner.class)
public class IoTDBRegionMigrateDataNodeCrashForIoTV2StreamIT
@@ -54,7 +53,7 @@ public class IoTDBRegionMigrateDataNodeCrashForIoTV2StreamIT
.setIoTConsensusV2Mode(ConsensusFactory.IOT_CONSENSUS_V2_STREAM_MODE);
}
- // @Test
+ @Test
public void coordinatorCrashDuringAddPeerTransition() throws Exception {
failTest(
2,
@@ -66,7 +65,7 @@ public class IoTDBRegionMigrateDataNodeCrashForIoTV2StreamIT
KillNode.COORDINATOR_DATANODE);
}
- // @Test
+ @Test
public void coordinatorCrashDuringAddPeerDone() throws Exception {
failTest(
2,
@@ -82,9 +81,13 @@ public class IoTDBRegionMigrateDataNodeCrashForIoTV2StreamIT
// region Original DataNode crash tests
- // @Test
+ @Test
public void originalCrashDuringAddPeerDone() throws Exception {
- failTest(
+ // Once the add-peer phase is done, the new peer already holds the data,
so the migration is
+ // designed to tolerate the original (source) DataNode crashing
afterwards: it completes
+ // successfully and merely leaves the region files on the dead node to be
cleaned up later.
+ // Hence this is a successTest, not a failTest.
+ successTest(
2,
2,
1,
@@ -98,7 +101,7 @@ public class IoTDBRegionMigrateDataNodeCrashForIoTV2StreamIT
// region Destination DataNode crash tests
- // @Test
+ @Test
public void destinationCrashDuringCreateLocalPeer() throws Exception {
failTest(
2,
@@ -110,7 +113,7 @@ public class IoTDBRegionMigrateDataNodeCrashForIoTV2StreamIT
KillNode.DESTINATION_DATANODE);
}
- // @Test
+ @Test
public void destinationCrashDuringAddPeerDone() throws Exception {
failTest(
2,
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/batch/IoTDBRegionMigrateClusterCrashIoTV2BatchIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/batch/IoTDBRegionMigrateClusterCrashIoTV2BatchIT.java
index a276acc4d01..6a74d99c096 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/batch/IoTDBRegionMigrateClusterCrashIoTV2BatchIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/batch/IoTDBRegionMigrateClusterCrashIoTV2BatchIT.java
@@ -39,8 +39,7 @@ public class IoTDBRegionMigrateClusterCrashIoTV2BatchIT
killClusterTest(buildSet(AddRegionPeerState.CREATE_NEW_REGION_PEER), true);
}
- // TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein
- // @Test
+ @Test
public void clusterCrash2() throws Exception {
killClusterTest(buildSet(AddRegionPeerState.DO_ADD_REGION_PEER), false);
}
@@ -60,8 +59,7 @@ public class IoTDBRegionMigrateClusterCrashIoTV2BatchIT
killClusterTest(buildSet(RemoveRegionPeerState.REMOVE_REGION_PEER), true);
}
- // TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein
- // @Test
+ @Test
public void clusterCrash7() throws Exception {
killClusterTest(buildSet(RemoveRegionPeerState.DELETE_OLD_REGION_PEER),
true);
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/batch/IoTDBRegionMigrateConfigNodeCrashIoTV2BatchIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/batch/IoTDBRegionMigrateConfigNodeCrashIoTV2BatchIT.java
index bc4f477b6bd..853d8349666 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/batch/IoTDBRegionMigrateConfigNodeCrashIoTV2BatchIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/batch/IoTDBRegionMigrateConfigNodeCrashIoTV2BatchIT.java
@@ -66,8 +66,7 @@ public class IoTDBRegionMigrateConfigNodeCrashIoTV2BatchIT
KillNode.CONFIG_NODE);
}
- // TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein
- // @Test
+ @Test
public void testCnCrashDuringDoAddPeer() throws Exception {
successTest(
1,
@@ -127,8 +126,7 @@ public class IoTDBRegionMigrateConfigNodeCrashIoTV2BatchIT
KillNode.CONFIG_NODE);
}
- // TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein
- // @Test
+ @Test
public void cnCrashDuringRemoveRegionLocationCacheTest() throws Exception {
successTest(
1,
@@ -140,8 +138,7 @@ public class IoTDBRegionMigrateConfigNodeCrashIoTV2BatchIT
KillNode.CONFIG_NODE);
}
- // TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein
- // @Test
+ @Test
public void cnCrashTest() throws Exception {
ConcurrentHashMap.KeySetView<String, Boolean> killConfigNodeKeywords =
noKillPoints();
killConfigNodeKeywords.addAll(
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/stream/IoTDBRegionMigrateClusterCrashIoTV2StreamIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/stream/IoTDBRegionMigrateClusterCrashIoTV2StreamIT.java
index 384f5e61dd7..5f0f2fe3cee 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/stream/IoTDBRegionMigrateClusterCrashIoTV2StreamIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/stream/IoTDBRegionMigrateClusterCrashIoTV2StreamIT.java
@@ -47,14 +47,12 @@ public class IoTDBRegionMigrateClusterCrashIoTV2StreamIT
.setIoTConsensusV2Mode(ConsensusFactory.IOT_CONSENSUS_V2_STREAM_MODE);
}
- // TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein
- // @Test
+ @Test
public void clusterCrash1() throws Exception {
killClusterTest(buildSet(AddRegionPeerState.CREATE_NEW_REGION_PEER), true);
}
- // TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein
- // @Test
+ @Test
public void clusterCrash2() throws Exception {
killClusterTest(buildSet(AddRegionPeerState.DO_ADD_REGION_PEER), false);
}
@@ -74,8 +72,7 @@ public class IoTDBRegionMigrateClusterCrashIoTV2StreamIT
killClusterTest(buildSet(RemoveRegionPeerState.REMOVE_REGION_PEER), true);
}
- // TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein
- // @Test
+ @Test
public void clusterCrash7() throws Exception {
killClusterTest(buildSet(RemoveRegionPeerState.DELETE_OLD_REGION_PEER),
true);
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/stream/IoTDBRegionMigrateConfigNodeCrashIoTV2StreamIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/stream/IoTDBRegionMigrateConfigNodeCrashIoTV2StreamIT.java
index 39b5953de4a..f29482811f0 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/stream/IoTDBRegionMigrateConfigNodeCrashIoTV2StreamIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/stream/IoTDBRegionMigrateConfigNodeCrashIoTV2StreamIT.java
@@ -80,8 +80,7 @@ public class IoTDBRegionMigrateConfigNodeCrashIoTV2StreamIT
KillNode.CONFIG_NODE);
}
- // TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein
- // @Test
+ @Test
public void testCnCrashDuringDoAddPeer() throws Exception {
successTest(
1,
@@ -141,8 +140,7 @@ public class IoTDBRegionMigrateConfigNodeCrashIoTV2StreamIT
KillNode.CONFIG_NODE);
}
- // TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein
- // @Test
+ @Test
public void cnCrashDuringRemoveRegionLocationCacheTest() throws Exception {
successTest(
1,
@@ -154,8 +152,7 @@ public class IoTDBRegionMigrateConfigNodeCrashIoTV2StreamIT
KillNode.CONFIG_NODE);
}
- // TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein
- // @Test
+ @Test
public void cnCrashTest() throws Exception {
ConcurrentHashMap.KeySetView<String, Boolean> killConfigNodeKeywords =
noKillPoints();
killConfigNodeKeywords.addAll(
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java
index 74391b99bb3..42d91eb7409 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java
@@ -25,11 +25,13 @@ import org.apache.iotdb.it.env.cluster.env.SimpleEnv;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.DailyIT;
+import org.apache.iotdb.itbase.exception.InconsistentDataException;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.session.Session;
+import org.awaitility.Awaitility;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -64,22 +66,42 @@ public class IoTDBCustomizedClusterIT {
testRepeatedlyRestartWholeCluster(
(s, i, env) -> {
if (i != 0) {
- ResultSet resultSet = s.executeQuery("SELECT last s1 FROM
root.**");
- ResultSetMetaData metaData = resultSet.getMetaData();
- assertEquals(4, metaData.getColumnCount());
- int cnt = 0;
- while (resultSet.next()) {
- cnt++;
- StringBuilder result = new StringBuilder();
- for (int j = 0; j < metaData.getColumnCount(); j++) {
- result
- .append(metaData.getColumnName(j + 1))
- .append(":")
- .append(resultSet.getString(j + 1))
- .append(",");
- }
- System.out.println(result);
- }
+ // This query is fanned out to every DataNode and the results are
compared across
+ // replicas. Right after a restart the last cache on each
coordinator is reloaded
+ // lazily, so the cross-replica comparison may transiently observe
an inconsistent
+ // result until the cluster converges. ORDER BY TIMESERIES makes
the row order
+ // deterministic across coordinators (the root cause of the
observed flakiness), and the
+ // retry tolerates the brief convergence window (e.g. a replica
that has not finished
+ // recovering yet) without masking a genuine, persistent
inconsistency.
+ //
+ // ignoreExceptionsMatching(InconsistentDataException) is
required: a mismatch surfaces
+ // as InconsistentDataException (a RuntimeException) thrown from
getString(), and
+ // untilAsserted() only retries on AssertionError by default, so
without it the retry
+ // would not actually cover this failure. We match only
InconsistentDataException so a
+ // genuine error (e.g. a real SQLException) still fails fast
instead of being retried.
+ Awaitility.await()
+ .atMost(60, TimeUnit.SECONDS)
+ .pollInterval(2, TimeUnit.SECONDS)
+ .ignoreExceptionsMatching(e -> e instanceof
InconsistentDataException)
+ .untilAsserted(
+ () -> {
+ try (ResultSet resultSet =
+ s.executeQuery("SELECT last s1 FROM root.** ORDER BY
TIMESERIES ASC")) {
+ ResultSetMetaData metaData = resultSet.getMetaData();
+ assertEquals(4, metaData.getColumnCount());
+ while (resultSet.next()) {
+ StringBuilder result = new StringBuilder();
+ for (int j = 0; j < metaData.getColumnCount(); j++) {
+ result
+ .append(metaData.getColumnName(j + 1))
+ .append(":")
+ .append(resultSet.getString(j + 1))
+ .append(",");
+ }
+ System.out.println(result);
+ }
+ }
+ });
}
s.execute("INSERT INTO root.db1.d1 (time, s1) VALUES (1, 1)");
s.execute("INSERT INTO root.db2.d1 (time, s1) VALUES (1, 1)");
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java
index ec04aab39bd..e07e6e4ebc1 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java
@@ -342,11 +342,12 @@ public abstract class IoTDBIoTConsensusV23C3DBasicITBase
// Restart the stopped node before moving to the next iteration
LOGGER.info("Restarting {}", stoppedDesc);
stoppedNode.start();
- // Wait for the restarted node to rejoin
- Awaitility.await()
- .atMost(120, TimeUnit.SECONDS)
- .pollInterval(2, TimeUnit.SECONDS)
- .until(stoppedNode::isAlive);
+ // Wait for the restarted node to actually be able to serve queries
again, not just for
+ // its process to be up. The next loop iteration will treat this
node as a surviving node
+ // and connect to it, so if we only waited for isAlive() (process
started) the node might
+ // still be in startup (RPC port not yet open / not registered),
causing a spurious
+ // "Connection refused" failure.
+ waitUntilDataNodeQueryable(stoppedNode, stoppedDesc);
}
}
@@ -356,6 +357,43 @@ public abstract class IoTDBIoTConsensusV23C3DBasicITBase
}
}
+ /**
+ * Wait until the given DataNode can actually serve queries again after a
restart. A node's
+ * process being alive ({@link DataNodeWrapper#isAlive()}) does not mean its
client RPC service is
+ * open and it has rejoined the cluster, so we poll a real connection plus a
trivial query until
+ * it succeeds.
+ */
+ private void waitUntilDataNodeQueryable(DataNodeWrapper node, String
nodeDesc) {
+ Awaitility.await()
+ .atMost(120, TimeUnit.SECONDS)
+ .pollDelay(1, TimeUnit.SECONDS)
+ .pollInterval(2, TimeUnit.SECONDS)
+ .until(
+ () -> {
+ if (!node.isAlive()) {
+ return false;
+ }
+ try (Connection conn =
+ EnvFactory.getEnv()
+ .getConnection(
+ node,
+ SessionConfig.DEFAULT_USER,
+ SessionConfig.DEFAULT_PASSWORD,
+ BaseEnv.TREE_SQL_DIALECT);
+ Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery(SHOW_TIMESERIES_D1)) {
+ // Drain the result set to make sure the query fully executes.
+ while (rs.next()) {
+ // no-op
+ }
+ return true;
+ } catch (Exception e) {
+ LOGGER.info("{} not queryable yet, retrying: {}", nodeDesc,
e.getMessage());
+ return false;
+ }
+ });
+ }
+
/**
* Verify that after deleting root.sg.d1.speed, only temperature and power
timeseries remain, and
* that data queries do not return the deleted timeseries.