This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 38954aec67a [Flaky Test] Fix QueryRoutingTest port conflicts and race 
conditions (#18058)
38954aec67a is described below

commit 38954aec67a51eead03bb738fd5f7ee2fc0f1bff
Author: Xiang Fu <[email protected]>
AuthorDate: Mon Apr 6 12:23:53 2026 -0700

    [Flaky Test] Fix QueryRoutingTest port conflicts and race conditions 
(#18058)
    
    * Fix flaky QueryRoutingTest by using dynamic port allocation
    
    QueryRoutingTest had 12 failures (4x each for testServerDown,
    testValidResponse, testSkipUnavailableServer) in the weekly CI
    digest (2026-03-22 to 2026-03-28).
    
    Root causes:
    - Hardcoded ports (12345/12346/12347) cause binding failures
      when tests run in parallel on shared CI infrastructure
    - No server startup synchronization — queries sent before
      server is ready to accept connections
    - Incomplete shutdown — ports not released fast enough between
      test methods
    
    Fixes:
    - Replace static TEST_PORT with dynamic allocation via
      ServerSocket(0) for OS-assigned free ports
    - Add startQueryServerWithWait() using TestUtils.waitForCondition
      to ensure server channel is active before proceeding
    - Improve @AfterMethod shutdown with try-catch and 100ms delay
      to ensure port release between tests
    - Convert static server instances to per-test instance variables
      for proper test isolation
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    
    * Fix compilation: testSkipUnavailableServer must declare throws Exception
    
    startQueryServerWithWait() throws Exception but testSkipUnavailableServer
    only declared throws IOException, InterruptedException. Java 11 rejects
    this as an unreported checked exception at compile time.
    
    Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
    
    * Address review: eliminate TOCTOU race and duplicate port risk
    
    - Use port 0 (OS-assigned) for QueryServer, then read actual bound port
      from channel.localAddress() after startup — eliminates the TOCTOU race
      where another process could grab the port between discovery and bind
    - In testSkipUnavailableServer, ensure port2 != port1 to prevent
      duplicate key issues in the routing table
    - Remove getAvailablePort() helper (no longer needed)
    
    Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
    
    * Address review: simplify helpers, remove sleep, deterministic unavailable 
port
    
    - Remove _testPort field; pass port directly to initializeTestFixtures()
    - Remove try-catch in shutdownServer() — shutDown() already throws on
      failure; no need to silently swallow exceptions
    - Remove Thread.sleep(100) in @AfterMethod — shutDown() blocks on
      channel.close().sync() so the port is released when it returns
    - Simplify startQueryServerWithWait() to startAndGetPort() — start()
      blocks on bind().sync() so no polling loop is needed
    - Use port 1 for the unavailable server in testSkipUnavailableServer
      instead of ServerSocket(0) — deterministic, no TOCTOU risk, gives
      immediate "connection refused"
    
    Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
    
    ---------
    
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 .../pinot/core/transport/QueryRoutingTest.java     | 204 ++++++++++++---------
 1 file changed, 115 insertions(+), 89 deletions(-)

diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
index 57183839b12..7e9b952cf9d 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
@@ -19,7 +19,7 @@
 package org.apache.pinot.core.transport;
 
 import com.google.common.util.concurrent.Futures;
-import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -53,22 +53,17 @@ import static org.testng.Assert.*;
 
 
 public class QueryRoutingTest {
-  private static final int TEST_PORT = 12345;
-  private static final ServerInstance SERVER_INSTANCE = new 
ServerInstance("localhost", TEST_PORT);
-  private static final ServerRoutingInstance OFFLINE_SERVER_ROUTING_INSTANCE =
-      SERVER_INSTANCE.toServerRoutingInstance(TableType.OFFLINE, 
ServerInstance.RoutingType.NETTY);
-  private static final ServerRoutingInstance REALTIME_SERVER_ROUTING_INSTANCE =
-      SERVER_INSTANCE.toServerRoutingInstance(TableType.REALTIME, 
ServerInstance.RoutingType.NETTY);
   private static final BrokerRequest BROKER_REQUEST =
       CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM testTable");
-  private static final Map<ServerInstance, SegmentsToQuery> ROUTING_TABLE =
-      Collections.singletonMap(SERVER_INSTANCE,
-          new SegmentsToQuery(Collections.emptyList(), 
Collections.emptyList()));
 
   private QueryRouter _queryRouter;
   private ServerRoutingStatsManager _serverRoutingStatsManager;
   int _requestCount;
   private QueryServer _queryServer;
+  private ServerInstance _serverInstance;
+  private ServerRoutingInstance _offlineServerRoutingInstance;
+  private ServerRoutingInstance _realtimeServerRoutingInstance;
+  private Map<ServerInstance, SegmentsToQuery> _routingTable;
 
   @BeforeClass
   public void setUp() {
@@ -84,9 +79,13 @@ public class QueryRoutingTest {
 
   @AfterMethod
   void shutdownServer() {
-    if (_queryServer != null) {
-      _queryServer.shutDown();
-      _queryServer = null;
+    try {
+      if (_queryServer != null && _queryServer.getChannel() != null) {
+        // shutDown() blocks on channel.close().sync(), so port is released 
when this returns
+        _queryServer.shutDown();
+      }
+    } finally {
+      clearTestFixtures();
     }
   }
 
@@ -95,8 +94,26 @@ public class QueryRoutingTest {
     ServerMetrics.deregister();
   }
 
+  private void clearTestFixtures() {
+    _queryServer = null;
+    _serverInstance = null;
+    _offlineServerRoutingInstance = null;
+    _realtimeServerRoutingInstance = null;
+    _routingTable = null;
+  }
+
+  private void initializeTestFixtures(int port) {
+    _serverInstance = new ServerInstance("localhost", port);
+    _offlineServerRoutingInstance =
+        _serverInstance.toServerRoutingInstance(TableType.OFFLINE, 
ServerInstance.RoutingType.NETTY);
+    _realtimeServerRoutingInstance =
+        _serverInstance.toServerRoutingInstance(TableType.REALTIME, 
ServerInstance.RoutingType.NETTY);
+    _routingTable = Collections.singletonMap(_serverInstance,
+        new SegmentsToQuery(Collections.emptyList(), Collections.emptyList()));
+  }
+
   private QueryServer getQueryServer(int responseDelayMs, byte[] 
responseBytes) {
-    return getQueryServer(responseDelayMs, responseBytes, TEST_PORT);
+    return getQueryServer(responseDelayMs, responseBytes, 0);
   }
 
   private QueryServer getQueryServer(int responseDelayMs, byte[] 
responseBytes, int port) {
@@ -106,6 +123,16 @@ public class QueryRoutingTest {
     return new QueryServer(port, null, handler);
   }
 
+  /**
+   * Starts the query server and returns the actual bound port. Uses port 0 to 
let the OS assign a free port,
+   * avoiding TOCTOU race conditions. {@link QueryServer#start()} blocks on 
{@code bind().sync()}, so the
+   * server is ready to accept connections when this method returns.
+   */
+  private int startAndGetPort(QueryServer server) {
+    server.start();
+    return ((InetSocketAddress) server.getChannel().localAddress()).getPort();
+  }
+
   private QueryScheduler mockQueryScheduler(int responseDelayMs, byte[] 
responseBytes) {
     QueryScheduler queryScheduler = mock(QueryScheduler.class);
     when(queryScheduler.submit(any())).thenAnswer(invocation -> {
@@ -122,19 +149,19 @@ public class QueryRoutingTest {
     DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
     dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(requestId));
     byte[] responseBytes = dataTable.toBytes();
-    String serverId = SERVER_INSTANCE.getInstanceId();
 
-    // Start the server
+    // Start the server on port 0 (OS-assigned) to avoid TOCTOU race, then 
initialize test fixtures from actual port
     _queryServer = getQueryServer(0, responseBytes);
-    _queryServer.start();
+    initializeTestFixtures(startAndGetPort(_queryServer));
+    String serverId = _serverInstance.getInstanceId();
 
     // OFFLINE only
     AsyncQueryResponse asyncQueryResponse =
-        _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, 
ROUTING_TABLE, null, null, 600_000L);
+        _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, 
_routingTable, null, null, 600_000L);
     Map<ServerRoutingInstance, ServerResponse> response = 
asyncQueryResponse.getFinalResponses();
     assertEquals(response.size(), 1);
-    assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
-    ServerResponse serverResponse = 
response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
+    assertTrue(response.containsKey(_offlineServerRoutingInstance));
+    ServerResponse serverResponse = 
response.get(_offlineServerRoutingInstance);
     assertNotNull(serverResponse.getDataTable());
     assertEquals(serverResponse.getResponseSize(), responseBytes.length);
     // 2 requests - query submit and query response.
@@ -144,11 +171,11 @@ public class QueryRoutingTest {
 
     // REALTIME only
     asyncQueryResponse =
-        _queryRouter.submitQuery(requestId, "testTable", null, null, 
BROKER_REQUEST, ROUTING_TABLE, 1_000L);
+        _queryRouter.submitQuery(requestId, "testTable", null, null, 
BROKER_REQUEST, _routingTable, 1_000L);
     response = asyncQueryResponse.getFinalResponses();
     assertEquals(response.size(), 1);
-    assertTrue(response.containsKey(REALTIME_SERVER_ROUTING_INSTANCE));
-    serverResponse = response.get(REALTIME_SERVER_ROUTING_INSTANCE);
+    assertTrue(response.containsKey(_realtimeServerRoutingInstance));
+    serverResponse = response.get(_realtimeServerRoutingInstance);
     assertNotNull(serverResponse.getDataTable());
     assertEquals(serverResponse.getResponseSize(), responseBytes.length);
     _requestCount += 2;
@@ -157,16 +184,16 @@ public class QueryRoutingTest {
 
     // Hybrid
     asyncQueryResponse =
-        _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, 
ROUTING_TABLE, BROKER_REQUEST, ROUTING_TABLE,
+        _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, 
_routingTable, BROKER_REQUEST, _routingTable,
             1_000L);
     response = asyncQueryResponse.getFinalResponses();
     assertEquals(response.size(), 2);
-    assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
-    serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
+    assertTrue(response.containsKey(_offlineServerRoutingInstance));
+    serverResponse = response.get(_offlineServerRoutingInstance);
     assertNotNull(serverResponse.getDataTable());
     assertEquals(serverResponse.getResponseSize(), responseBytes.length);
-    assertTrue(response.containsKey(REALTIME_SERVER_ROUTING_INSTANCE));
-    serverResponse = response.get(REALTIME_SERVER_ROUTING_INSTANCE);
+    assertTrue(response.containsKey(_realtimeServerRoutingInstance));
+    serverResponse = response.get(_realtimeServerRoutingInstance);
     assertNotNull(serverResponse.getDataTable());
     assertEquals(serverResponse.getResponseSize(), responseBytes.length);
     _requestCount += 4;
@@ -178,19 +205,19 @@ public class QueryRoutingTest {
   public void testInvalidResponse()
       throws Exception {
     long requestId = 123;
-    String serverId = SERVER_INSTANCE.getInstanceId();
 
-    // Start the server
+    // Start the server on port 0 (OS-assigned) to avoid TOCTOU race
     _queryServer = getQueryServer(0, new byte[0]);
-    _queryServer.start();
+    initializeTestFixtures(startAndGetPort(_queryServer));
+    String serverId = _serverInstance.getInstanceId();
 
     long startTimeMs = System.currentTimeMillis();
     AsyncQueryResponse asyncQueryResponse =
-        _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, 
ROUTING_TABLE, null, null, 1_000L);
+        _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, 
_routingTable, null, null, 1_000L);
     Map<ServerRoutingInstance, ServerResponse> response = 
asyncQueryResponse.getFinalResponses();
     assertEquals(response.size(), 1);
-    assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
-    ServerResponse serverResponse = 
response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
+    assertTrue(response.containsKey(_offlineServerRoutingInstance));
+    ServerResponse serverResponse = 
response.get(_offlineServerRoutingInstance);
     assertNull(serverResponse.getDataTable());
     assertEquals(serverResponse.getResponseDelayMs(), -1);
     assertEquals(serverResponse.getResponseSize(), 0);
@@ -210,18 +237,18 @@ public class QueryRoutingTest {
     dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(requestId));
     dataTable.addException(QueryErrorCode.SERVER_TABLE_MISSING, "Test error 
message");
     byte[] responseBytes = dataTable.toBytes();
-    String serverId = SERVER_INSTANCE.getInstanceId();
-    // Start the server
+    // Start the server on port 0 (OS-assigned) to avoid TOCTOU race
     _queryServer = getQueryServer(0, responseBytes);
-    _queryServer.start();
+    initializeTestFixtures(startAndGetPort(_queryServer));
+    String serverId = _serverInstance.getInstanceId();
 
     // Send a query with ServerSide exception and check if the latency is set 
to timeout value.
     Double latencyBefore = 
_serverRoutingStatsManager.fetchEMALatencyForServer(serverId);
     AsyncQueryResponse asyncQueryResponse =
-        _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, 
ROUTING_TABLE, null, null, 1_000L);
+        _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, 
_routingTable, null, null, 1_000L);
     Map<ServerRoutingInstance, ServerResponse> response = 
asyncQueryResponse.getFinalResponses();
     assertEquals(response.size(), 1);
-    assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
+    assertTrue(response.containsKey(_offlineServerRoutingInstance));
 
     _requestCount += 2;
     waitForStatsUpdate(_requestCount);
@@ -249,20 +276,20 @@ public class QueryRoutingTest {
     dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(requestId));
     dataTable.addException(QueryErrorCode.QUERY_CANCELLATION, "Test error 
message");
     byte[] responseBytes = dataTable.toBytes();
-    String serverId = SERVER_INSTANCE.getInstanceId();
-    // Start the server
+    // Start the server on port 0 (OS-assigned) to avoid TOCTOU race
     _queryServer = getQueryServer(0, responseBytes);
-    _queryServer.start();
+    initializeTestFixtures(startAndGetPort(_queryServer));
+    String serverId = _serverInstance.getInstanceId();
 
     // Send a query with client side errors.
     Double latencyBefore = 
_serverRoutingStatsManager.fetchEMALatencyForServer(serverId);
 
     AsyncQueryResponse asyncQueryResponse =
-        _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, 
ROUTING_TABLE, null, null, 1_000L);
+        _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, 
_routingTable, null, null, 1_000L);
     Map<ServerRoutingInstance, ServerResponse> response = 
asyncQueryResponse.getFinalResponses();
     assertEquals(response.size(), 1);
-    assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
-    ServerResponse serverResponse = 
response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
+    assertTrue(response.containsKey(_offlineServerRoutingInstance));
+    ServerResponse serverResponse = 
response.get(_offlineServerRoutingInstance);
 
     _requestCount += 2;
     waitForStatsUpdate(_requestCount);
@@ -288,19 +315,19 @@ public class QueryRoutingTest {
     dataTable.addException(QueryErrorCode.QUERY_CANCELLATION, "Test 
cancellation error message");
     dataTable.addException(QueryErrorCode.SERVER_TABLE_MISSING, "Test table 
missing error message");
     byte[] responseBytes = dataTable.toBytes();
-    String serverId = SERVER_INSTANCE.getInstanceId();
-    // Start the server
+    // Start the server on port 0 (OS-assigned) to avoid TOCTOU race
     _queryServer = getQueryServer(0, responseBytes);
-    _queryServer.start();
+    initializeTestFixtures(startAndGetPort(_queryServer));
+    String serverId = _serverInstance.getInstanceId();
 
     // Send a query with multiple exceptions. Make sure that the latency is 
set to timeout value even if a single
     //server-side exception is seen.
     Double latencyBefore = 
_serverRoutingStatsManager.fetchEMALatencyForServer(serverId);
     AsyncQueryResponse asyncQueryResponse =
-        _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, 
ROUTING_TABLE, null, null, 1_000L);
+        _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, 
_routingTable, null, null, 1_000L);
     Map<ServerRoutingInstance, ServerResponse> response = 
asyncQueryResponse.getFinalResponses();
     assertEquals(response.size(), 1);
-    assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
+    assertTrue(response.containsKey(_offlineServerRoutingInstance));
 
     _requestCount += 2;
     waitForStatsUpdate(_requestCount);
@@ -326,19 +353,19 @@ public class QueryRoutingTest {
     DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
     dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(requestId));
     byte[] responseBytes = dataTable.toBytes();
-    String serverId = SERVER_INSTANCE.getInstanceId();
-    // Start the server
+    // Start the server on port 0 (OS-assigned) to avoid TOCTOU race
     _queryServer = getQueryServer(0, responseBytes);
-    _queryServer.start();
+    initializeTestFixtures(startAndGetPort(_queryServer));
+    String serverId = _serverInstance.getInstanceId();
 
     // Send a valid query and get latency
     Double latencyBefore = 
_serverRoutingStatsManager.fetchEMALatencyForServer(serverId);
     AsyncQueryResponse asyncQueryResponse =
-        _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, 
ROUTING_TABLE, null, null, 1_000L);
+        _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, 
_routingTable, null, null, 1_000L);
     Map<ServerRoutingInstance, ServerResponse> response = 
asyncQueryResponse.getFinalResponses();
     assertEquals(response.size(), 1);
-    assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
-    ServerResponse serverResponse = 
response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
+    assertTrue(response.containsKey(_offlineServerRoutingInstance));
+    ServerResponse serverResponse = 
response.get(_offlineServerRoutingInstance);
 
     _requestCount += 2;
     waitForStatsUpdate(_requestCount);
@@ -360,19 +387,19 @@ public class QueryRoutingTest {
     DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
     dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(requestId));
     byte[] responseBytes = dataTable.toBytes();
-    String serverId = SERVER_INSTANCE.getInstanceId();
 
-    // Start the server
+    // Start the server on port 0 (OS-assigned) to avoid TOCTOU race
     _queryServer = getQueryServer(0, responseBytes);
-    _queryServer.start();
+    initializeTestFixtures(startAndGetPort(_queryServer));
+    String serverId = _serverInstance.getInstanceId();
 
     long startTimeMs = System.currentTimeMillis();
     AsyncQueryResponse asyncQueryResponse =
-        _queryRouter.submitQuery(requestId + 1, "testTable", BROKER_REQUEST, 
ROUTING_TABLE, null, null, 1_000L);
+        _queryRouter.submitQuery(requestId + 1, "testTable", BROKER_REQUEST, 
_routingTable, null, null, 1_000L);
     Map<ServerRoutingInstance, ServerResponse> response = 
asyncQueryResponse.getFinalResponses();
     assertEquals(response.size(), 1);
-    assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
-    ServerResponse serverResponse = 
response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
+    assertTrue(response.containsKey(_offlineServerRoutingInstance));
+    ServerResponse serverResponse = 
response.get(_offlineServerRoutingInstance);
     assertNull(serverResponse.getDataTable());
     assertEquals(serverResponse.getResponseDelayMs(), -1);
     assertEquals(serverResponse.getResponseSize(), 0);
@@ -394,15 +421,15 @@ public class QueryRoutingTest {
     DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
     dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(requestId));
     byte[] responseBytes = dataTable.toBytes();
-    String serverId = SERVER_INSTANCE.getInstanceId();
 
-    // Start the server
+    // Start the server on port 0 (OS-assigned) to avoid TOCTOU race
     _queryServer = getQueryServer(500, responseBytes);
-    _queryServer.start();
+    initializeTestFixtures(startAndGetPort(_queryServer));
+    String serverId = _serverInstance.getInstanceId();
 
     long startTimeMs = System.currentTimeMillis();
     AsyncQueryResponse asyncQueryResponse =
-        _queryRouter.submitQuery(requestId + 1, "testTable", BROKER_REQUEST, 
ROUTING_TABLE, null, null, timeoutMs);
+        _queryRouter.submitQuery(requestId + 1, "testTable", BROKER_REQUEST, 
_routingTable, null, null, timeoutMs);
 
     // Shut down the server before getting the response
     _queryServer.shutDown();
@@ -413,8 +440,8 @@ public class QueryRoutingTest {
 
       Map<ServerRoutingInstance, ServerResponse> response = 
asyncQueryResponse.getFinalResponses();
       assertEquals(response.size(), 1);
-      assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
-      ServerResponse serverResponse = 
response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
+      assertTrue(response.containsKey(_offlineServerRoutingInstance));
+      ServerResponse serverResponse = 
response.get(_offlineServerRoutingInstance);
       assertNull(serverResponse.getDataTable());
       assertEquals(serverResponse.getResponseDelayMs(), -1);
       assertEquals(serverResponse.getResponseSize(), 0);
@@ -428,11 +455,11 @@ public class QueryRoutingTest {
       // Submit query after server is down
       startTimeMs = System.currentTimeMillis();
       asyncQueryResponse =
-          _queryRouter.submitQuery(requestId + 1, "testTable", BROKER_REQUEST, 
ROUTING_TABLE, null, null, timeoutMs);
+          _queryRouter.submitQuery(requestId + 1, "testTable", BROKER_REQUEST, 
_routingTable, null, null, timeoutMs);
       response = asyncQueryResponse.getFinalResponses();
       assertEquals(response.size(), 1);
-      assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
-      serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
+      assertTrue(response.containsKey(_offlineServerRoutingInstance));
+      serverResponse = response.get(_offlineServerRoutingInstance);
       assertNull(serverResponse.getDataTable());
       assertEquals(serverResponse.getSubmitDelayMs(), -1);
       assertEquals(serverResponse.getResponseDelayMs(), -1);
@@ -445,26 +472,13 @@ public class QueryRoutingTest {
       
assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(),
 0);
     } finally {
       // To be sure we don't close it again on the @AfterMethod method
-      _queryServer = null;
+      clearTestFixtures();
     }
   }
 
   @Test
   public void testSkipUnavailableServer()
-      throws IOException, InterruptedException {
-    // Using a different port is a hack to avoid resource conflict with other 
tests, ideally _queryServer.shutdown()
-    // should ensure there is no possibility of resource conflict.
-    int port = 12346;
-    ServerInstance serverInstance1 = new ServerInstance("localhost", port);
-    ServerInstance serverInstance2 = new ServerInstance("localhost", port + 1);
-    ServerRoutingInstance serverRoutingInstance1 =
-        serverInstance1.toServerRoutingInstance(TableType.OFFLINE, 
ServerInstance.RoutingType.NETTY);
-    ServerRoutingInstance serverRoutingInstance2 =
-        serverInstance2.toServerRoutingInstance(TableType.OFFLINE, 
ServerInstance.RoutingType.NETTY);
-    Map<ServerInstance, SegmentsToQuery> routingTable =
-        Map.of(serverInstance1, new SegmentsToQuery(Collections.emptyList(), 
Collections.emptyList()),
-            serverInstance2, new SegmentsToQuery(Collections.emptyList(), 
Collections.emptyList()));
-
+      throws Exception {
     long requestId = 123;
     DataSchema dataSchema =
         new DataSchema(new String[]{"column1"}, new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING});
@@ -477,9 +491,21 @@ public class QueryRoutingTest {
     dataTableMetadata.put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(requestId));
     byte[] successResponseBytes = dataTableSuccess.toBytes();
 
-    // Only start a single QueryServer, on port from serverInstance1
-    _queryServer = getQueryServer(500, successResponseBytes, port);
-    _queryServer.start();
+    // Start server1 on port 0 (OS-assigned) to avoid TOCTOU race
+    _queryServer = getQueryServer(500, successResponseBytes, 0);
+    int port1 = startAndGetPort(_queryServer);
+
+    ServerInstance serverInstance1 = new ServerInstance("localhost", port1);
+    // For server2 (unavailable server), use a reserved .invalid hostname so 
connection setup fails
+    // deterministically without depending on a transiently free port.
+    ServerInstance serverInstance2 = new ServerInstance("unavailable.invalid", 
port1);
+    ServerRoutingInstance serverRoutingInstance1 =
+        serverInstance1.toServerRoutingInstance(TableType.OFFLINE, 
ServerInstance.RoutingType.NETTY);
+    ServerRoutingInstance serverRoutingInstance2 =
+        serverInstance2.toServerRoutingInstance(TableType.OFFLINE, 
ServerInstance.RoutingType.NETTY);
+    Map<ServerInstance, SegmentsToQuery> routingTable =
+        Map.of(serverInstance1, new SegmentsToQuery(Collections.emptyList(), 
Collections.emptyList()),
+            serverInstance2, new SegmentsToQuery(Collections.emptyList(), 
Collections.emptyList()));
 
     // Submit the query with skipUnavailableServers=true, the single started 
server should return a valid response
     BrokerRequest brokerRequest =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to