This is an automated email from the ASF dual-hosted git repository.
vvivekiyer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 61aa6ce468 Reposition query submission spot for adaptive server
selection (#13327)
61aa6ce468 is described below
commit 61aa6ce468888c1c472a3818c8116cabe7566656
Author: Vivek Iyer Vaidyanathan <[email protected]>
AuthorDate: Sat Jun 8 08:55:28 2024 -0700
Reposition query submission spot for adaptive server selection (#13327)
* Refactor ADSS querysubmission stats to avoid missing servers
* Address review comments.
---
.../AdaptiveServerSelectorTest.java | 20 ++++++++++----------
.../pinot/core/transport/AsyncQueryResponse.java | 6 +++++-
.../org/apache/pinot/core/transport/QueryRouter.java | 4 ----
.../routing/stats/ServerRoutingStatsManager.java | 4 ++--
.../routing/stats/ServerRoutingStatsManagerTest.java | 6 +++---
5 files changed, 20 insertions(+), 20 deletions(-)
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java
index 926de0699d..427065b76e 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java
@@ -153,7 +153,7 @@ public class AdaptiveServerSelectorTest {
}
for (int ii = 0; ii < 10; ii++) {
for (String server : _servers) {
- serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, server);
+ serverRoutingStatsManager.recordStatsForQuerySubmission(-1, server);
waitForStatsUpdate(serverRoutingStatsManager, ++taskCount);
}
}
@@ -187,7 +187,7 @@ public class AdaptiveServerSelectorTest {
for (int ii = 0; ii < _servers.size(); ii++) {
for (int jj = 0; jj < ii; jj++) {
- serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1,
_servers.get(ii));
+ serverRoutingStatsManager.recordStatsForQuerySubmission(-1,
_servers.get(ii));
waitForStatsUpdate(serverRoutingStatsManager, ++taskCount);
}
}
@@ -232,15 +232,15 @@ public class AdaptiveServerSelectorTest {
numInflightReqMap.put("server2", 11);
numInflightReqMap.put("server3", 15);
numInflightReqMap.put("server4", 13);
- serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1,
_servers.get(0));
+ serverRoutingStatsManager.recordStatsForQuerySubmission(-1,
_servers.get(0));
waitForStatsUpdate(serverRoutingStatsManager, ++taskCount);
- serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1,
_servers.get(0));
+ serverRoutingStatsManager.recordStatsForQuerySubmission(-1,
_servers.get(0));
waitForStatsUpdate(serverRoutingStatsManager, ++taskCount);
- serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1,
_servers.get(2));
+ serverRoutingStatsManager.recordStatsForQuerySubmission(-1,
_servers.get(2));
waitForStatsUpdate(serverRoutingStatsManager, ++taskCount);
- serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1,
_servers.get(2));
+ serverRoutingStatsManager.recordStatsForQuerySubmission(-1,
_servers.get(2));
waitForStatsUpdate(serverRoutingStatsManager, ++taskCount);
- serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1,
_servers.get(2));
+ serverRoutingStatsManager.recordStatsForQuerySubmission(-1,
_servers.get(2));
waitForStatsUpdate(serverRoutingStatsManager, ++taskCount);
serverRankingWithVal = selector.fetchAllServerRankingsWithScores();
@@ -290,7 +290,7 @@ public class AdaptiveServerSelectorTest {
// Route the request to the best server.
selectedServer = serverRankingWithVal.get(0).getLeft();
- serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1,
selectedServer);
+ serverRoutingStatsManager.recordStatsForQuerySubmission(-1,
selectedServer);
waitForStatsUpdate(serverRoutingStatsManager, ++taskCount);
int numReq = numInflightReqMap.get(selectedServer) + 1;
numInflightReqMap.put(selectedServer, numReq);
@@ -484,7 +484,7 @@ public class AdaptiveServerSelectorTest {
// TEST 2: Populate all servers with equal numInFlightRequests and
latencies.
for (int ii = 0; ii < 10; ii++) {
for (String server : _servers) {
- serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, server);
+ serverRoutingStatsManager.recordStatsForQuerySubmission(-1, server);
waitForStatsUpdate(serverRoutingStatsManager, ++taskCount);
}
}
@@ -571,7 +571,7 @@ public class AdaptiveServerSelectorTest {
// Route the request to the best server.
selectedServer = serverRankingWithVal.get(0).getLeft();
- serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1,
selectedServer);
+ serverRoutingStatsManager.recordStatsForQuerySubmission(-1,
selectedServer);
waitForStatsUpdate(serverRoutingStatsManager, ++taskCount);
if (rand.nextBoolean()) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
index f03509fb54..7bcc90b50d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
@@ -56,13 +56,17 @@ public class AsyncQueryResponse implements QueryResponse {
_requestId = requestId;
int numServersQueried = serversQueried.size();
_responseMap = new
ConcurrentHashMap<>(HashUtil.getHashMapCapacity(numServersQueried));
+ _serverRoutingStatsManager = serverRoutingStatsManager;
for (ServerRoutingInstance serverRoutingInstance : serversQueried) {
+ // Record stats related to query submission just before sending the
request. Otherwise, if the response is
+ // received immediately, there's a possibility of updating query
response stats before updating query
+ // submission stats.
+ _serverRoutingStatsManager.recordStatsForQuerySubmission(requestId,
serverRoutingInstance.getInstanceId());
_responseMap.put(serverRoutingInstance, new ServerResponse(startTimeMs));
}
_countDownLatch = new CountDownLatch(numServersQueried);
_timeoutMs = timeoutMs;
_maxEndTimeMs = startTimeMs + timeoutMs;
- _serverRoutingStatsManager = serverRoutingStatsManager;
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
index 086211ad5f..e5b96840e7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
@@ -126,10 +126,6 @@ public class QueryRouter {
ServerRoutingInstance serverRoutingInstance = entry.getKey();
ServerChannels serverChannels = serverRoutingInstance.isTlsEnabled() ?
_serverChannelsTls : _serverChannels;
try {
- // Record stats related to query submission just before sending the
request. Otherwise, if the response is
- // received immediately, there's a possibility of updating query
response stats before updating query
- // submission stats.
- _serverRoutingStatsManager.recordStatsAfterQuerySubmission(requestId,
serverRoutingInstance.getInstanceId());
serverChannels.sendRequest(rawTableName, asyncQueryResponse,
serverRoutingInstance, entry.getValue(),
timeoutMs);
asyncQueryResponse.markRequestSubmitted(serverRoutingInstance);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
index 1057fc084f..a21906d23b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
@@ -135,9 +135,9 @@ public class ServerRoutingStatsManager {
}
/**
- * Called when a query is submitted to a server. Updates stats corresponding
to query submission.
+ * Called just before submitting a query to a server. Updates stats
corresponding to query submission.
*/
- public void recordStatsAfterQuerySubmission(long requestId, String
serverInstanceId) {
+ public void recordStatsForQuerySubmission(long requestId, String
serverInstanceId) {
if (!_isEnabled) {
return;
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java
index 84983f87ec..1ef3022a09 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java
@@ -131,7 +131,7 @@ public class ServerRoutingStatsManagerTest {
int requestId = 0;
// Submit stats for server1.
- manager.recordStatsAfterQuerySubmission(requestId++, "server1");
+ manager.recordStatsForQuerySubmission(requestId++, "server1");
waitForStatsUpdate(manager, requestId);
List<Pair<String, Integer>> numInFlightReqList =
manager.fetchNumInFlightRequestsForAllServers();
@@ -156,7 +156,7 @@ public class ServerRoutingStatsManagerTest {
assertEquals(score, 0.0);
// Submit more stats for server 1.
- manager.recordStatsAfterQuerySubmission(requestId++, "server1");
+ manager.recordStatsForQuerySubmission(requestId++, "server1");
waitForStatsUpdate(manager, requestId);
numInFlightReqList = manager.fetchNumInFlightRequestsForAllServers();
@@ -181,7 +181,7 @@ public class ServerRoutingStatsManagerTest {
assertEquals(score, 0.0);
// Add a new server server2.
- manager.recordStatsAfterQuerySubmission(requestId++, "server2");
+ manager.recordStatsForQuerySubmission(requestId++, "server2");
waitForStatsUpdate(manager, requestId);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]