This is an automated email from the ASF dual-hosted git repository.

JackieTien97 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5af5e2b8000 Fix flaky datanode UTs under parallel surefire forks 
(#17712)
5af5e2b8000 is described below

commit 5af5e2b80006e2791f7fb42973a7a62d5356bdef
Author: Jackie Tien <[email protected]>
AuthorDate: Tue May 19 10:24:04 2026 +0800

    Fix flaky datanode UTs under parallel surefire forks (#17712)
---
 .../operator/AlignedSeriesScanOperatorTest.java    |  9 ++++++
 .../DataNodeInternalRPCServiceImplTest.java        | 11 +++++--
 .../apache/iotdb/db/utils/EnvironmentUtils.java    | 36 +++++++++++++++++-----
 3 files changed, 46 insertions(+), 10 deletions(-)

diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesScanOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesScanOperatorTest.java
index 573107fe2d3..e6c567fac96 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesScanOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesScanOperatorTest.java
@@ -140,6 +140,9 @@ public class AlignedSeriesScanOperatorTest {
       int count = 0;
       while (seriesScanOperator.hasNext()) {
         TsBlock tsBlock = seriesScanOperator.next();
+        if (tsBlock == null || tsBlock.isEmpty()) {
+          continue;
+        }
         assertEquals(6, tsBlock.getValueColumnCount());
         assertTrue(tsBlock.getColumn(0) instanceof BooleanColumn);
         assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
@@ -420,6 +423,9 @@ public class AlignedSeriesScanOperatorTest {
       int count = 0;
       while (timeJoinOperator.isBlocked().isDone() && 
timeJoinOperator.hasNext()) {
         TsBlock tsBlock = timeJoinOperator.next();
+        if (tsBlock == null || tsBlock.isEmpty()) {
+          continue;
+        }
         assertEquals(18, tsBlock.getValueColumnCount());
         assertTrue(tsBlock.getColumn(0) instanceof BooleanColumn);
         assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
@@ -727,6 +733,9 @@ public class AlignedSeriesScanOperatorTest {
       int count = 499;
       while (timeJoinOperator.isBlocked().isDone() && 
timeJoinOperator.hasNext()) {
         TsBlock tsBlock = timeJoinOperator.next();
+        if (tsBlock == null || tsBlock.isEmpty()) {
+          continue;
+        }
         assertEquals(18, tsBlock.getValueColumnCount());
         assertTrue(tsBlock.getColumn(0) instanceof BooleanColumn);
         assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
index 15484df7e47..ca68525243d 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
@@ -93,6 +93,12 @@ public class DataNodeInternalRPCServiceImplTest {
   private static final File storageDir = new File("target" + 
java.io.File.separator + "impl");
   private static DataRegion dataRegion;
 
+  // Each parallel surefire fork binds its own consensus port. Reuses
+  // EnvironmentUtils.FORK_PORT_OFFSET so this binding and examinePorts() are
+  // guaranteed to look at the same port — fork-local leaks still fail, sibling
+  // forks no longer cause cross-fork false positives.
+  private static final int CONSENSUS_PORT = 6667 + 
EnvironmentUtils.FORK_PORT_OFFSET;
+
   @BeforeClass
   public static void setUpBeforeClass() throws IOException, MetadataException, 
ConsensusException {
     // In standalone mode, we need to set dataNodeId to 0 for RaftPeerId in 
RatisConsensus
@@ -109,7 +115,7 @@ public class DataNodeInternalRPCServiceImplTest {
                 ConsensusFactory.IOT_CONSENSUS,
                 ConsensusConfig.newBuilder()
                     .setThisNodeId(1)
-                    .setThisNode(new TEndPoint("0.0.0.0", 6667))
+                    .setThisNode(new TEndPoint("0.0.0.0", CONSENSUS_PORT))
                     .setStorageDir(storageDir.getAbsolutePath())
                     .setConsensusGroupType(TConsensusGroupType.DataRegion)
                     .build(),
@@ -124,7 +130,8 @@ public class DataNodeInternalRPCServiceImplTest {
         ((IoTConsensus) DataRegionConsensusImpl.getInstance()).getImpl(new 
DataRegionId(1)))) {
       DataRegionConsensusImpl.getInstance()
           .createLocalPeer(
-              id, Collections.singletonList(new Peer(id, 1, new 
TEndPoint("0.0.0.0", 6667))));
+              id,
+              Collections.singletonList(new Peer(id, 1, new 
TEndPoint("0.0.0.0", CONSENSUS_PORT))));
     }
     DataRegionConsensusImpl.getInstance().start();
     SchemaRegionConsensusImpl.getInstance().start();
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 7aac119cd37..440e32df46e 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -96,6 +96,17 @@ public class EnvironmentUtils {
   public static boolean examinePorts =
       Boolean.parseBoolean(System.getProperty("test.port.closed", "false"));
 
+  // Per-fork port namespace. Each surefire fork shifts every test-bound port
+  // (and the corresponding examinePorts() check) by FORK_PORT_OFFSET so that
+  // parallel forks live in disjoint port spaces. This keeps the leak-detection
+  // power of examinePorts() (a fork still sees its own un-closed ports) while
+  // removing the cross-fork false positive (one fork's bound port no longer
+  // looks like another fork's leak). surefire.forkNumber starts at 1; defaults
+  // to 1 outside surefire (IDE). Modulo 30 caps the offset so the highest
+  // checked port (31999 + 30*1000 = 61999) stays within range.
+  public static final int FORK_PORT_OFFSET =
+      ((Integer.parseInt(System.getProperty("surefire.forkNumber", "1")) - 1) 
% 30 + 1) * 1000;
+
   public static void cleanEnv() throws IOException, StorageEngineException {
     // wait all compaction finished
     CompactionTaskManager.getInstance().waitAllCompactionFinish();
@@ -174,11 +185,19 @@ public class EnvironmentUtils {
   }
 
   private static boolean examinePorts() {
-    TTransport transport = TSocketWrapper.wrap(tConfiguration, "127.0.0.1", 
6667, 100);
+    // All checks use this fork's port namespace (base + FORK_PORT_OFFSET) so
+    // that we only ever see ports bound by THIS JVM — sibling forks live in
+    // disjoint namespaces.
+    int rpcPort = 6667 + FORK_PORT_OFFSET;
+    int syncPort = 5555 + FORK_PORT_OFFSET;
+    int jmxPort = 31999 + FORK_PORT_OFFSET;
+    int metricPort = 9091 + FORK_PORT_OFFSET;
+
+    TTransport transport = TSocketWrapper.wrap(tConfiguration, "127.0.0.1", 
rpcPort, 100);
     if (transport != null && !transport.isOpen()) {
       try {
         transport.open();
-        logger.error("stop daemon failed. 6667 can be connected now.");
+        logger.error("stop daemon failed. {} can be connected now.", rpcPort);
         transport.close();
         return false;
       } catch (TTransportException e) {
@@ -186,11 +205,11 @@ public class EnvironmentUtils {
       }
     }
     // try sync service
-    transport = TSocketWrapper.wrap(tConfiguration, "127.0.0.1", 5555, 100);
+    transport = TSocketWrapper.wrap(tConfiguration, "127.0.0.1", syncPort, 
100);
     if (transport != null && !transport.isOpen()) {
       try {
         transport.open();
-        logger.error("stop Sync daemon failed. 5555 can be connected now.");
+        logger.error("stop Sync daemon failed. {} can be connected now.", 
syncPort);
         transport.close();
         return false;
       } catch (TTransportException e) {
@@ -199,9 +218,10 @@ public class EnvironmentUtils {
     }
     // try jmx connection
     try {
-      JMXServiceURL url = new 
JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:31999/jmxrmi");
+      JMXServiceURL url =
+          new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:" + 
jmxPort + "/jmxrmi");
       JMXConnector jmxConnector = JMXConnectorFactory.connect(url);
-      logger.error("stop JMX failed. 31999 can be connected now.");
+      logger.error("stop JMX failed. {} can be connected now.", jmxPort);
       jmxConnector.close();
       return false;
     } catch (IOException e) {
@@ -209,8 +229,8 @@ public class EnvironmentUtils {
     }
     // try MetricService
     try (Socket socket = new Socket()) {
-      socket.connect(new InetSocketAddress("127.0.0.1", 9091), 100);
-      logger.error("stop MetricService failed. 9091 can be connected now.");
+      socket.connect(new InetSocketAddress("127.0.0.1", metricPort), 100);
+      logger.error("stop MetricService failed. {} can be connected now.", 
metricPort);
       return false;
     } catch (Exception e) {
       // do nothing

Reply via email to