This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 38954aec67a [Flaky Test] Fix QueryRoutingTest port conflicts and race
conditions (#18058)
38954aec67a is described below
commit 38954aec67a51eead03bb738fd5f7ee2fc0f1bff
Author: Xiang Fu <[email protected]>
AuthorDate: Mon Apr 6 12:23:53 2026 -0700
[Flaky Test] Fix QueryRoutingTest port conflicts and race conditions
(#18058)
* Fix flaky QueryRoutingTest by using dynamic port allocation
QueryRoutingTest had 12 failures (4x each for testServerDown,
testValidResponse, testSkipUnavailableServer) in the weekly CI
digest (2026-03-22 to 2026-03-28).
Root causes:
- Hardcoded ports (12345/12346/12347) cause binding failures
when tests run in parallel on shared CI infrastructure
- No server startup synchronization — queries sent before
server is ready to accept connections
- Incomplete shutdown — ports not released fast enough between
test methods
Fixes:
- Replace static TEST_PORT with dynamic allocation via
ServerSocket(0) for OS-assigned free ports
- Add startQueryServerWithWait() using TestUtils.waitForCondition
to ensure server channel is active before proceeding
- Improve @AfterMethod shutdown with try-catch and 100ms delay
to ensure port release between tests
- Convert static server instances to per-test instance variables
for proper test isolation
Co-Authored-By: Claude Opus 4.6 <[email protected]>
* Fix compilation: testSkipUnavailableServer must declare throws Exception
startQueryServerWithWait() throws Exception but testSkipUnavailableServer
only declared throws IOException, InterruptedException. Java 11 rejects
this as an unreported checked exception at compile time.
Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
* Address review: eliminate TOCTOU race and duplicate port risk
- Use port 0 (OS-assigned) for QueryServer, then read actual bound port
from channel.localAddress() after startup — eliminates the TOCTOU race
where another process could grab the port between discovery and bind
- In testSkipUnavailableServer, ensure port2 != port1 to prevent
duplicate key issues in the routing table
- Remove getAvailablePort() helper (no longer needed)
Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
* Address review: simplify helpers, remove sleep, deterministic unavailable
port
- Remove _testPort field; pass port directly to initializeTestFixtures()
- Remove try-catch in shutdownServer() — shutDown() already throws on
failure; no need to silently swallow exceptions
- Remove Thread.sleep(100) in @AfterMethod — shutDown() blocks on
channel.close().sync() so the port is released when it returns
- Simplify startQueryServerWithWait() to startAndGetPort() — start()
blocks on bind().sync() so no polling loop is needed
- Use port 1 for the unavailable server in testSkipUnavailableServer
instead of ServerSocket(0) — deterministic, no TOCTOU risk, gives
immediate "connection refused"
Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
---------
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
.../pinot/core/transport/QueryRoutingTest.java | 204 ++++++++++++---------
1 file changed, 115 insertions(+), 89 deletions(-)
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
index 57183839b12..7e9b952cf9d 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
@@ -19,7 +19,7 @@
package org.apache.pinot.core.transport;
import com.google.common.util.concurrent.Futures;
-import java.io.IOException;
+import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -53,22 +53,17 @@ import static org.testng.Assert.*;
public class QueryRoutingTest {
- private static final int TEST_PORT = 12345;
- private static final ServerInstance SERVER_INSTANCE = new
ServerInstance("localhost", TEST_PORT);
- private static final ServerRoutingInstance OFFLINE_SERVER_ROUTING_INSTANCE =
- SERVER_INSTANCE.toServerRoutingInstance(TableType.OFFLINE,
ServerInstance.RoutingType.NETTY);
- private static final ServerRoutingInstance REALTIME_SERVER_ROUTING_INSTANCE =
- SERVER_INSTANCE.toServerRoutingInstance(TableType.REALTIME,
ServerInstance.RoutingType.NETTY);
private static final BrokerRequest BROKER_REQUEST =
CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM testTable");
- private static final Map<ServerInstance, SegmentsToQuery> ROUTING_TABLE =
- Collections.singletonMap(SERVER_INSTANCE,
- new SegmentsToQuery(Collections.emptyList(),
Collections.emptyList()));
private QueryRouter _queryRouter;
private ServerRoutingStatsManager _serverRoutingStatsManager;
int _requestCount;
private QueryServer _queryServer;
+ private ServerInstance _serverInstance;
+ private ServerRoutingInstance _offlineServerRoutingInstance;
+ private ServerRoutingInstance _realtimeServerRoutingInstance;
+ private Map<ServerInstance, SegmentsToQuery> _routingTable;
@BeforeClass
public void setUp() {
@@ -84,9 +79,13 @@ public class QueryRoutingTest {
@AfterMethod
void shutdownServer() {
- if (_queryServer != null) {
- _queryServer.shutDown();
- _queryServer = null;
+ try {
+ if (_queryServer != null && _queryServer.getChannel() != null) {
+ // shutDown() blocks on channel.close().sync(), so port is released
when this returns
+ _queryServer.shutDown();
+ }
+ } finally {
+ clearTestFixtures();
}
}
@@ -95,8 +94,26 @@ public class QueryRoutingTest {
ServerMetrics.deregister();
}
+ private void clearTestFixtures() {
+ _queryServer = null;
+ _serverInstance = null;
+ _offlineServerRoutingInstance = null;
+ _realtimeServerRoutingInstance = null;
+ _routingTable = null;
+ }
+
+ private void initializeTestFixtures(int port) {
+ _serverInstance = new ServerInstance("localhost", port);
+ _offlineServerRoutingInstance =
+ _serverInstance.toServerRoutingInstance(TableType.OFFLINE,
ServerInstance.RoutingType.NETTY);
+ _realtimeServerRoutingInstance =
+ _serverInstance.toServerRoutingInstance(TableType.REALTIME,
ServerInstance.RoutingType.NETTY);
+ _routingTable = Collections.singletonMap(_serverInstance,
+ new SegmentsToQuery(Collections.emptyList(), Collections.emptyList()));
+ }
+
private QueryServer getQueryServer(int responseDelayMs, byte[]
responseBytes) {
- return getQueryServer(responseDelayMs, responseBytes, TEST_PORT);
+ return getQueryServer(responseDelayMs, responseBytes, 0);
}
private QueryServer getQueryServer(int responseDelayMs, byte[]
responseBytes, int port) {
@@ -106,6 +123,16 @@ public class QueryRoutingTest {
return new QueryServer(port, null, handler);
}
+ /**
+ * Starts the query server and returns the actual bound port. Uses port 0 to
let the OS assign a free port,
+ * avoiding TOCTOU race conditions. {@link QueryServer#start()} blocks on
{@code bind().sync()}, so the
+ * server is ready to accept connections when this method returns.
+ */
+ private int startAndGetPort(QueryServer server) {
+ server.start();
+ return ((InetSocketAddress) server.getChannel().localAddress()).getPort();
+ }
+
private QueryScheduler mockQueryScheduler(int responseDelayMs, byte[]
responseBytes) {
QueryScheduler queryScheduler = mock(QueryScheduler.class);
when(queryScheduler.submit(any())).thenAnswer(invocation -> {
@@ -122,19 +149,19 @@ public class QueryRoutingTest {
DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(),
Long.toString(requestId));
byte[] responseBytes = dataTable.toBytes();
- String serverId = SERVER_INSTANCE.getInstanceId();
- // Start the server
+ // Start the server on port 0 (OS-assigned) to avoid TOCTOU race, then
initialize test fixtures from actual port
_queryServer = getQueryServer(0, responseBytes);
- _queryServer.start();
+ initializeTestFixtures(startAndGetPort(_queryServer));
+ String serverId = _serverInstance.getInstanceId();
// OFFLINE only
AsyncQueryResponse asyncQueryResponse =
- _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST,
ROUTING_TABLE, null, null, 600_000L);
+ _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST,
_routingTable, null, null, 600_000L);
Map<ServerRoutingInstance, ServerResponse> response =
asyncQueryResponse.getFinalResponses();
assertEquals(response.size(), 1);
- assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
- ServerResponse serverResponse =
response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
+ assertTrue(response.containsKey(_offlineServerRoutingInstance));
+ ServerResponse serverResponse =
response.get(_offlineServerRoutingInstance);
assertNotNull(serverResponse.getDataTable());
assertEquals(serverResponse.getResponseSize(), responseBytes.length);
// 2 requests - query submit and query response.
@@ -144,11 +171,11 @@ public class QueryRoutingTest {
// REALTIME only
asyncQueryResponse =
- _queryRouter.submitQuery(requestId, "testTable", null, null,
BROKER_REQUEST, ROUTING_TABLE, 1_000L);
+ _queryRouter.submitQuery(requestId, "testTable", null, null,
BROKER_REQUEST, _routingTable, 1_000L);
response = asyncQueryResponse.getFinalResponses();
assertEquals(response.size(), 1);
- assertTrue(response.containsKey(REALTIME_SERVER_ROUTING_INSTANCE));
- serverResponse = response.get(REALTIME_SERVER_ROUTING_INSTANCE);
+ assertTrue(response.containsKey(_realtimeServerRoutingInstance));
+ serverResponse = response.get(_realtimeServerRoutingInstance);
assertNotNull(serverResponse.getDataTable());
assertEquals(serverResponse.getResponseSize(), responseBytes.length);
_requestCount += 2;
@@ -157,16 +184,16 @@ public class QueryRoutingTest {
// Hybrid
asyncQueryResponse =
- _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST,
ROUTING_TABLE, BROKER_REQUEST, ROUTING_TABLE,
+ _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST,
_routingTable, BROKER_REQUEST, _routingTable,
1_000L);
response = asyncQueryResponse.getFinalResponses();
assertEquals(response.size(), 2);
- assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
- serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
+ assertTrue(response.containsKey(_offlineServerRoutingInstance));
+ serverResponse = response.get(_offlineServerRoutingInstance);
assertNotNull(serverResponse.getDataTable());
assertEquals(serverResponse.getResponseSize(), responseBytes.length);
- assertTrue(response.containsKey(REALTIME_SERVER_ROUTING_INSTANCE));
- serverResponse = response.get(REALTIME_SERVER_ROUTING_INSTANCE);
+ assertTrue(response.containsKey(_realtimeServerRoutingInstance));
+ serverResponse = response.get(_realtimeServerRoutingInstance);
assertNotNull(serverResponse.getDataTable());
assertEquals(serverResponse.getResponseSize(), responseBytes.length);
_requestCount += 4;
@@ -178,19 +205,19 @@ public class QueryRoutingTest {
public void testInvalidResponse()
throws Exception {
long requestId = 123;
- String serverId = SERVER_INSTANCE.getInstanceId();
- // Start the server
+ // Start the server on port 0 (OS-assigned) to avoid TOCTOU race
_queryServer = getQueryServer(0, new byte[0]);
- _queryServer.start();
+ initializeTestFixtures(startAndGetPort(_queryServer));
+ String serverId = _serverInstance.getInstanceId();
long startTimeMs = System.currentTimeMillis();
AsyncQueryResponse asyncQueryResponse =
- _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST,
ROUTING_TABLE, null, null, 1_000L);
+ _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST,
_routingTable, null, null, 1_000L);
Map<ServerRoutingInstance, ServerResponse> response =
asyncQueryResponse.getFinalResponses();
assertEquals(response.size(), 1);
- assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
- ServerResponse serverResponse =
response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
+ assertTrue(response.containsKey(_offlineServerRoutingInstance));
+ ServerResponse serverResponse =
response.get(_offlineServerRoutingInstance);
assertNull(serverResponse.getDataTable());
assertEquals(serverResponse.getResponseDelayMs(), -1);
assertEquals(serverResponse.getResponseSize(), 0);
@@ -210,18 +237,18 @@ public class QueryRoutingTest {
dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(),
Long.toString(requestId));
dataTable.addException(QueryErrorCode.SERVER_TABLE_MISSING, "Test error
message");
byte[] responseBytes = dataTable.toBytes();
- String serverId = SERVER_INSTANCE.getInstanceId();
- // Start the server
+ // Start the server on port 0 (OS-assigned) to avoid TOCTOU race
_queryServer = getQueryServer(0, responseBytes);
- _queryServer.start();
+ initializeTestFixtures(startAndGetPort(_queryServer));
+ String serverId = _serverInstance.getInstanceId();
// Send a query with ServerSide exception and check if the latency is set
to timeout value.
Double latencyBefore =
_serverRoutingStatsManager.fetchEMALatencyForServer(serverId);
AsyncQueryResponse asyncQueryResponse =
- _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST,
ROUTING_TABLE, null, null, 1_000L);
+ _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST,
_routingTable, null, null, 1_000L);
Map<ServerRoutingInstance, ServerResponse> response =
asyncQueryResponse.getFinalResponses();
assertEquals(response.size(), 1);
- assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
+ assertTrue(response.containsKey(_offlineServerRoutingInstance));
_requestCount += 2;
waitForStatsUpdate(_requestCount);
@@ -249,20 +276,20 @@ public class QueryRoutingTest {
dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(),
Long.toString(requestId));
dataTable.addException(QueryErrorCode.QUERY_CANCELLATION, "Test error
message");
byte[] responseBytes = dataTable.toBytes();
- String serverId = SERVER_INSTANCE.getInstanceId();
- // Start the server
+ // Start the server on port 0 (OS-assigned) to avoid TOCTOU race
_queryServer = getQueryServer(0, responseBytes);
- _queryServer.start();
+ initializeTestFixtures(startAndGetPort(_queryServer));
+ String serverId = _serverInstance.getInstanceId();
// Send a query with client side errors.
Double latencyBefore =
_serverRoutingStatsManager.fetchEMALatencyForServer(serverId);
AsyncQueryResponse asyncQueryResponse =
- _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST,
ROUTING_TABLE, null, null, 1_000L);
+ _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST,
_routingTable, null, null, 1_000L);
Map<ServerRoutingInstance, ServerResponse> response =
asyncQueryResponse.getFinalResponses();
assertEquals(response.size(), 1);
- assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
- ServerResponse serverResponse =
response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
+ assertTrue(response.containsKey(_offlineServerRoutingInstance));
+ ServerResponse serverResponse =
response.get(_offlineServerRoutingInstance);
_requestCount += 2;
waitForStatsUpdate(_requestCount);
@@ -288,19 +315,19 @@ public class QueryRoutingTest {
dataTable.addException(QueryErrorCode.QUERY_CANCELLATION, "Test
cancellation error message");
dataTable.addException(QueryErrorCode.SERVER_TABLE_MISSING, "Test table
missing error message");
byte[] responseBytes = dataTable.toBytes();
- String serverId = SERVER_INSTANCE.getInstanceId();
- // Start the server
+ // Start the server on port 0 (OS-assigned) to avoid TOCTOU race
_queryServer = getQueryServer(0, responseBytes);
- _queryServer.start();
+ initializeTestFixtures(startAndGetPort(_queryServer));
+ String serverId = _serverInstance.getInstanceId();
// Send a query with multiple exceptions. Make sure that the latency is
set to timeout value even if a single
//server-side exception is seen.
Double latencyBefore =
_serverRoutingStatsManager.fetchEMALatencyForServer(serverId);
AsyncQueryResponse asyncQueryResponse =
- _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST,
ROUTING_TABLE, null, null, 1_000L);
+ _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST,
_routingTable, null, null, 1_000L);
Map<ServerRoutingInstance, ServerResponse> response =
asyncQueryResponse.getFinalResponses();
assertEquals(response.size(), 1);
- assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
+ assertTrue(response.containsKey(_offlineServerRoutingInstance));
_requestCount += 2;
waitForStatsUpdate(_requestCount);
@@ -326,19 +353,19 @@ public class QueryRoutingTest {
DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(),
Long.toString(requestId));
byte[] responseBytes = dataTable.toBytes();
- String serverId = SERVER_INSTANCE.getInstanceId();
- // Start the server
+ // Start the server on port 0 (OS-assigned) to avoid TOCTOU race
_queryServer = getQueryServer(0, responseBytes);
- _queryServer.start();
+ initializeTestFixtures(startAndGetPort(_queryServer));
+ String serverId = _serverInstance.getInstanceId();
// Send a valid query and get latency
Double latencyBefore =
_serverRoutingStatsManager.fetchEMALatencyForServer(serverId);
AsyncQueryResponse asyncQueryResponse =
- _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST,
ROUTING_TABLE, null, null, 1_000L);
+ _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST,
_routingTable, null, null, 1_000L);
Map<ServerRoutingInstance, ServerResponse> response =
asyncQueryResponse.getFinalResponses();
assertEquals(response.size(), 1);
- assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
- ServerResponse serverResponse =
response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
+ assertTrue(response.containsKey(_offlineServerRoutingInstance));
+ ServerResponse serverResponse =
response.get(_offlineServerRoutingInstance);
_requestCount += 2;
waitForStatsUpdate(_requestCount);
@@ -360,19 +387,19 @@ public class QueryRoutingTest {
DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(),
Long.toString(requestId));
byte[] responseBytes = dataTable.toBytes();
- String serverId = SERVER_INSTANCE.getInstanceId();
- // Start the server
+ // Start the server on port 0 (OS-assigned) to avoid TOCTOU race
_queryServer = getQueryServer(0, responseBytes);
- _queryServer.start();
+ initializeTestFixtures(startAndGetPort(_queryServer));
+ String serverId = _serverInstance.getInstanceId();
long startTimeMs = System.currentTimeMillis();
AsyncQueryResponse asyncQueryResponse =
- _queryRouter.submitQuery(requestId + 1, "testTable", BROKER_REQUEST,
ROUTING_TABLE, null, null, 1_000L);
+ _queryRouter.submitQuery(requestId + 1, "testTable", BROKER_REQUEST,
_routingTable, null, null, 1_000L);
Map<ServerRoutingInstance, ServerResponse> response =
asyncQueryResponse.getFinalResponses();
assertEquals(response.size(), 1);
- assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
- ServerResponse serverResponse =
response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
+ assertTrue(response.containsKey(_offlineServerRoutingInstance));
+ ServerResponse serverResponse =
response.get(_offlineServerRoutingInstance);
assertNull(serverResponse.getDataTable());
assertEquals(serverResponse.getResponseDelayMs(), -1);
assertEquals(serverResponse.getResponseSize(), 0);
@@ -394,15 +421,15 @@ public class QueryRoutingTest {
DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(),
Long.toString(requestId));
byte[] responseBytes = dataTable.toBytes();
- String serverId = SERVER_INSTANCE.getInstanceId();
- // Start the server
+ // Start the server on port 0 (OS-assigned) to avoid TOCTOU race
_queryServer = getQueryServer(500, responseBytes);
- _queryServer.start();
+ initializeTestFixtures(startAndGetPort(_queryServer));
+ String serverId = _serverInstance.getInstanceId();
long startTimeMs = System.currentTimeMillis();
AsyncQueryResponse asyncQueryResponse =
- _queryRouter.submitQuery(requestId + 1, "testTable", BROKER_REQUEST,
ROUTING_TABLE, null, null, timeoutMs);
+ _queryRouter.submitQuery(requestId + 1, "testTable", BROKER_REQUEST,
_routingTable, null, null, timeoutMs);
// Shut down the server before getting the response
_queryServer.shutDown();
@@ -413,8 +440,8 @@ public class QueryRoutingTest {
Map<ServerRoutingInstance, ServerResponse> response =
asyncQueryResponse.getFinalResponses();
assertEquals(response.size(), 1);
- assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
- ServerResponse serverResponse =
response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
+ assertTrue(response.containsKey(_offlineServerRoutingInstance));
+ ServerResponse serverResponse =
response.get(_offlineServerRoutingInstance);
assertNull(serverResponse.getDataTable());
assertEquals(serverResponse.getResponseDelayMs(), -1);
assertEquals(serverResponse.getResponseSize(), 0);
@@ -428,11 +455,11 @@ public class QueryRoutingTest {
// Submit query after server is down
startTimeMs = System.currentTimeMillis();
asyncQueryResponse =
- _queryRouter.submitQuery(requestId + 1, "testTable", BROKER_REQUEST,
ROUTING_TABLE, null, null, timeoutMs);
+ _queryRouter.submitQuery(requestId + 1, "testTable", BROKER_REQUEST,
_routingTable, null, null, timeoutMs);
response = asyncQueryResponse.getFinalResponses();
assertEquals(response.size(), 1);
- assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
- serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
+ assertTrue(response.containsKey(_offlineServerRoutingInstance));
+ serverResponse = response.get(_offlineServerRoutingInstance);
assertNull(serverResponse.getDataTable());
assertEquals(serverResponse.getSubmitDelayMs(), -1);
assertEquals(serverResponse.getResponseDelayMs(), -1);
@@ -445,26 +472,13 @@ public class QueryRoutingTest {
assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(),
0);
} finally {
// To be sure we don't close it again on the @AfterMethod method
- _queryServer = null;
+ clearTestFixtures();
}
}
@Test
public void testSkipUnavailableServer()
- throws IOException, InterruptedException {
- // Using a different port is a hack to avoid resource conflict with other
tests, ideally _queryServer.shutdown()
- // should ensure there is no possibility of resource conflict.
- int port = 12346;
- ServerInstance serverInstance1 = new ServerInstance("localhost", port);
- ServerInstance serverInstance2 = new ServerInstance("localhost", port + 1);
- ServerRoutingInstance serverRoutingInstance1 =
- serverInstance1.toServerRoutingInstance(TableType.OFFLINE,
ServerInstance.RoutingType.NETTY);
- ServerRoutingInstance serverRoutingInstance2 =
- serverInstance2.toServerRoutingInstance(TableType.OFFLINE,
ServerInstance.RoutingType.NETTY);
- Map<ServerInstance, SegmentsToQuery> routingTable =
- Map.of(serverInstance1, new SegmentsToQuery(Collections.emptyList(),
Collections.emptyList()),
- serverInstance2, new SegmentsToQuery(Collections.emptyList(),
Collections.emptyList()));
-
+ throws Exception {
long requestId = 123;
DataSchema dataSchema =
new DataSchema(new String[]{"column1"}, new
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING});
@@ -477,9 +491,21 @@ public class QueryRoutingTest {
dataTableMetadata.put(MetadataKey.REQUEST_ID.getName(),
Long.toString(requestId));
byte[] successResponseBytes = dataTableSuccess.toBytes();
- // Only start a single QueryServer, on port from serverInstance1
- _queryServer = getQueryServer(500, successResponseBytes, port);
- _queryServer.start();
+ // Start server1 on port 0 (OS-assigned) to avoid TOCTOU race
+ _queryServer = getQueryServer(500, successResponseBytes, 0);
+ int port1 = startAndGetPort(_queryServer);
+
+ ServerInstance serverInstance1 = new ServerInstance("localhost", port1);
+ // For server2 (unavailable server), use a reserved .invalid hostname so
connection setup fails
+ // deterministically without depending on a transiently free port.
+ ServerInstance serverInstance2 = new ServerInstance("unavailable.invalid",
port1);
+ ServerRoutingInstance serverRoutingInstance1 =
+ serverInstance1.toServerRoutingInstance(TableType.OFFLINE,
ServerInstance.RoutingType.NETTY);
+ ServerRoutingInstance serverRoutingInstance2 =
+ serverInstance2.toServerRoutingInstance(TableType.OFFLINE,
ServerInstance.RoutingType.NETTY);
+ Map<ServerInstance, SegmentsToQuery> routingTable =
+ Map.of(serverInstance1, new SegmentsToQuery(Collections.emptyList(),
Collections.emptyList()),
+ serverInstance2, new SegmentsToQuery(Collections.emptyList(),
Collections.emptyList()));
// Submit the query with skipUnavailableServers=true, the single started
server should return a valid response
BrokerRequest brokerRequest =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]