This is an automated email from the ASF dual-hosted git repository.
JackieTien97 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5af5e2b8000 Fix flaky datanode UTs under parallel surefire forks
(#17712)
5af5e2b8000 is described below
commit 5af5e2b80006e2791f7fb42973a7a62d5356bdef
Author: Jackie Tien <[email protected]>
AuthorDate: Tue May 19 10:24:04 2026 +0800
Fix flaky datanode UTs under parallel surefire forks (#17712)
---
.../operator/AlignedSeriesScanOperatorTest.java | 9 ++++++
.../DataNodeInternalRPCServiceImplTest.java | 11 +++++--
.../apache/iotdb/db/utils/EnvironmentUtils.java | 36 +++++++++++++++++-----
3 files changed, 46 insertions(+), 10 deletions(-)
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesScanOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesScanOperatorTest.java
index 573107fe2d3..e6c567fac96 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesScanOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesScanOperatorTest.java
@@ -140,6 +140,9 @@ public class AlignedSeriesScanOperatorTest {
int count = 0;
while (seriesScanOperator.hasNext()) {
TsBlock tsBlock = seriesScanOperator.next();
+ if (tsBlock == null || tsBlock.isEmpty()) {
+ continue;
+ }
assertEquals(6, tsBlock.getValueColumnCount());
assertTrue(tsBlock.getColumn(0) instanceof BooleanColumn);
assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
@@ -420,6 +423,9 @@ public class AlignedSeriesScanOperatorTest {
int count = 0;
while (timeJoinOperator.isBlocked().isDone() &&
timeJoinOperator.hasNext()) {
TsBlock tsBlock = timeJoinOperator.next();
+ if (tsBlock == null || tsBlock.isEmpty()) {
+ continue;
+ }
assertEquals(18, tsBlock.getValueColumnCount());
assertTrue(tsBlock.getColumn(0) instanceof BooleanColumn);
assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
@@ -727,6 +733,9 @@ public class AlignedSeriesScanOperatorTest {
int count = 499;
while (timeJoinOperator.isBlocked().isDone() &&
timeJoinOperator.hasNext()) {
TsBlock tsBlock = timeJoinOperator.next();
+ if (tsBlock == null || tsBlock.isEmpty()) {
+ continue;
+ }
assertEquals(18, tsBlock.getValueColumnCount());
assertTrue(tsBlock.getColumn(0) instanceof BooleanColumn);
assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
index 15484df7e47..ca68525243d 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
@@ -93,6 +93,12 @@ public class DataNodeInternalRPCServiceImplTest {
private static final File storageDir = new File("target" +
java.io.File.separator + "impl");
private static DataRegion dataRegion;
+ // Each parallel surefire fork binds its own consensus port. Reuses
+ // EnvironmentUtils.FORK_PORT_OFFSET so this binding and examinePorts() are
+ // guaranteed to look at the same port — fork-local leaks still fail, sibling
+ // forks no longer cause cross-fork false positives.
+ private static final int CONSENSUS_PORT = 6667 +
EnvironmentUtils.FORK_PORT_OFFSET;
+
@BeforeClass
public static void setUpBeforeClass() throws IOException, MetadataException,
ConsensusException {
// In standalone mode, we need to set dataNodeId to 0 for RaftPeerId in
RatisConsensus
@@ -109,7 +115,7 @@ public class DataNodeInternalRPCServiceImplTest {
ConsensusFactory.IOT_CONSENSUS,
ConsensusConfig.newBuilder()
.setThisNodeId(1)
- .setThisNode(new TEndPoint("0.0.0.0", 6667))
+ .setThisNode(new TEndPoint("0.0.0.0", CONSENSUS_PORT))
.setStorageDir(storageDir.getAbsolutePath())
.setConsensusGroupType(TConsensusGroupType.DataRegion)
.build(),
@@ -124,7 +130,8 @@ public class DataNodeInternalRPCServiceImplTest {
((IoTConsensus) DataRegionConsensusImpl.getInstance()).getImpl(new
DataRegionId(1)))) {
DataRegionConsensusImpl.getInstance()
.createLocalPeer(
- id, Collections.singletonList(new Peer(id, 1, new
TEndPoint("0.0.0.0", 6667))));
+ id,
+ Collections.singletonList(new Peer(id, 1, new
TEndPoint("0.0.0.0", CONSENSUS_PORT))));
}
DataRegionConsensusImpl.getInstance().start();
SchemaRegionConsensusImpl.getInstance().start();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 7aac119cd37..440e32df46e 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -96,6 +96,17 @@ public class EnvironmentUtils {
public static boolean examinePorts =
Boolean.parseBoolean(System.getProperty("test.port.closed", "false"));
+ // Per-fork port namespace. Each surefire fork shifts every test-bound port
+ // (and the corresponding examinePorts() check) by FORK_PORT_OFFSET so that
+ // parallel forks live in disjoint port spaces. This keeps the leak-detection
+ // power of examinePorts() (a fork still sees its own un-closed ports) while
+ // removing the cross-fork false positive (one fork's bound port no longer
+ // looks like another fork's leak). surefire.forkNumber starts at 1; defaults
+ // to 1 outside surefire (IDE). Modulo 30 caps the offset so the highest
+ // checked port (31999 + 30*1000 = 61999) stays within range.
+ public static final int FORK_PORT_OFFSET =
+ ((Integer.parseInt(System.getProperty("surefire.forkNumber", "1")) - 1)
% 30 + 1) * 1000;
+
public static void cleanEnv() throws IOException, StorageEngineException {
// wait all compaction finished
CompactionTaskManager.getInstance().waitAllCompactionFinish();
@@ -174,11 +185,19 @@ public class EnvironmentUtils {
}
private static boolean examinePorts() {
- TTransport transport = TSocketWrapper.wrap(tConfiguration, "127.0.0.1",
6667, 100);
+ // All checks use this fork's port namespace (base + FORK_PORT_OFFSET) so
+ // that we only ever see ports bound by THIS JVM — sibling forks live in
+ // disjoint namespaces.
+ int rpcPort = 6667 + FORK_PORT_OFFSET;
+ int syncPort = 5555 + FORK_PORT_OFFSET;
+ int jmxPort = 31999 + FORK_PORT_OFFSET;
+ int metricPort = 9091 + FORK_PORT_OFFSET;
+
+ TTransport transport = TSocketWrapper.wrap(tConfiguration, "127.0.0.1",
rpcPort, 100);
if (transport != null && !transport.isOpen()) {
try {
transport.open();
- logger.error("stop daemon failed. 6667 can be connected now.");
+ logger.error("stop daemon failed. {} can be connected now.", rpcPort);
transport.close();
return false;
} catch (TTransportException e) {
@@ -186,11 +205,11 @@ public class EnvironmentUtils {
}
}
// try sync service
- transport = TSocketWrapper.wrap(tConfiguration, "127.0.0.1", 5555, 100);
+ transport = TSocketWrapper.wrap(tConfiguration, "127.0.0.1", syncPort,
100);
if (transport != null && !transport.isOpen()) {
try {
transport.open();
- logger.error("stop Sync daemon failed. 5555 can be connected now.");
+ logger.error("stop Sync daemon failed. {} can be connected now.",
syncPort);
transport.close();
return false;
} catch (TTransportException e) {
@@ -199,9 +218,10 @@ public class EnvironmentUtils {
}
// try jmx connection
try {
- JMXServiceURL url = new
JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:31999/jmxrmi");
+ JMXServiceURL url =
+ new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:" +
jmxPort + "/jmxrmi");
JMXConnector jmxConnector = JMXConnectorFactory.connect(url);
- logger.error("stop JMX failed. 31999 can be connected now.");
+ logger.error("stop JMX failed. {} can be connected now.", jmxPort);
jmxConnector.close();
return false;
} catch (IOException e) {
@@ -209,8 +229,8 @@ public class EnvironmentUtils {
}
// try MetricService
try (Socket socket = new Socket()) {
- socket.connect(new InetSocketAddress("127.0.0.1", 9091), 100);
- logger.error("stop MetricService failed. 9091 can be connected now.");
+ socket.connect(new InetSocketAddress("127.0.0.1", metricPort), 100);
+ logger.error("stop MetricService failed. {} can be connected now.",
metricPort);
return false;
} catch (Exception e) {
// do nothing