This is an automated email from the ASF dual-hosted git repository.
vvivekiyer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3452ef9fa9 Improve Adaptive Server Selection to penalize servers
returning server side exceptions (#14029)
3452ef9fa9 is described below
commit 3452ef9fa9035134ad90d9da8a4f72acd5081c7f
Author: Kirupha Balasubramanian <[email protected]>
AuthorDate: Mon Sep 30 11:03:21 2024 -0700
Improve Adaptive Server Selection to penalize servers returning server side
exceptions (#14029)
* PINOT-19249 ADSS penalize server with hardware issues WIP push
PINOT-19249 Adding more unit test cases
PINOT-19249 Addressing comments and changing code to cover edge cases
PINOT-19249 Code changes based on comments
PINOT-19249 Fixing test cases for ADSS penalizing servers with exceptions
* Fixing linter
* Empty-Commit
* Review comments - Adding more comments and delta to assertEquals
---
.../pinot/core/transport/AsyncQueryResponse.java | 37 ++++-
.../pinot/core/transport/QueryRoutingTest.java | 176 +++++++++++++++++++++
2 files changed, 208 insertions(+), 5 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
index 2ec90ab3b9..a9546cc054 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.utils.HashUtil;
import
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
@@ -96,11 +97,14 @@ public class AsyncQueryResponse implements QueryResponse {
// servers even if the query times out or if servers have not responded.
for (Map.Entry<ServerRoutingInstance, ServerResponse> entry :
_responseMap.entrySet()) {
ServerResponse response = entry.getValue();
-
- // ServerResponse returns -1 if responseDelayMs is not set. This
indicates that a response was not received
- // from the server. Hence we set the latency to the timeout value.
- long latency =
- (response != null && response.getResponseDelayMs() >= 0) ?
response.getResponseDelayMs() : _timeoutMs;
+ long latency;
+
+ // If server has not responded or if the server response has
exceptions, the latency is set to timeout
+ if (hasServerNotResponded(response) ||
hasServerReturnedExceptions(response)) {
+ latency = _timeoutMs;
+ } else {
+ latency = response.getResponseDelayMs();
+ }
_serverRoutingStatsManager.recordStatsUponResponseArrival(_requestId,
entry.getKey().getInstanceId(), latency);
}
@@ -108,6 +112,29 @@ public class AsyncQueryResponse implements QueryResponse {
}
}
+ private boolean hasServerReturnedExceptions(ServerResponse response) {
+ if (response.getDataTable() != null &&
response.getDataTable().getExceptions().size() > 0) {
+ DataTable dataTable = response.getDataTable();
+ Map<Integer, String> exceptions = dataTable.getExceptions();
+
+ // If Server response has exceptions in Datatable set the latency for
timeout value.
+ for (Map.Entry<Integer, String> exception : exceptions.entrySet()) {
+ // Check if the exceptions received are server side exceptions
+ if (!QueryException.isClientError(exception.getKey())) {
+ return true;
+ }
+ }
+ return false;
+ }
+ return false;
+ }
+
+ private boolean hasServerNotResponded(ServerResponse response) {
+ // ServerResponse returns -1 if responseDelayMs is not set. This indicates
that a response was not received
+ // from the server. Hence we set the latency to the timeout value.
+ return response == null || response.getResponseDelayMs() < 0;
+ }
+
@Override
public String getServerStats() {
StringBuilder stringBuilder = new StringBuilder(
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
index cec413e424..1b32149d06 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
@@ -27,9 +27,11 @@ import java.util.Map;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.datatable.DataTable.MetadataKey;
+import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.datatable.DataTableBuilder;
import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
@@ -201,6 +203,180 @@ public class QueryRoutingTest {
queryServer.shutDown();
}
+ @Test
+ public void testLatencyForQueryServerException()
+ throws Exception {
+ long requestId = 123;
+ DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
+ dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(),
Long.toString(requestId));
+ Exception exception = new UnsupportedOperationException("Caught
exception.");
+ ProcessingException processingException =
+ QueryException.getException(QueryException.SERVER_TABLE_MISSING_ERROR,
exception);
+ dataTable.addException(processingException);
+ byte[] responseBytes = dataTable.toBytes();
+ String serverId = SERVER_INSTANCE.getInstanceId();
+ // Start the server
+ QueryServer queryServer = getQueryServer(0, responseBytes);
+ queryServer.start();
+
+ // Send a query with ServerSide exception and check if the latency is set
to timeout value.
+ Double latencyBefore =
_serverRoutingStatsManager.fetchEMALatencyForServer(serverId);
+ AsyncQueryResponse asyncQueryResponse =
+ _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST,
ROUTING_TABLE, null, null, 1_000L);
+ Map<ServerRoutingInstance, ServerResponse> response =
asyncQueryResponse.getFinalResponses();
+ assertEquals(response.size(), 1);
+ assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
+
+ _requestCount += 2;
+ waitForStatsUpdate(_requestCount);
+ Double latencyAfter =
_serverRoutingStatsManager.fetchEMALatencyForServer(serverId);
+
+ if (latencyBefore == null) {
+ // This means that no queries were run before this test. So we can just
make sure that latencyAfter is equal to
+ //666.334.
+ // This corresponds to the EWMA value when a latency timeout value of
1000 is set. Latency set to timeout value
+ //when server side exception occurs.
+ double serverEWMALatency = 666.334;
+ // Leaving an error budget of 2%
+ double delta = 13.32;
+ assertEquals(latencyAfter, serverEWMALatency, delta);
+ } else {
+ assertTrue(latencyAfter > latencyBefore, latencyAfter + " should be
greater than " + latencyBefore);
+ }
+
+ // Shut down the server
+ queryServer.shutDown();
+ }
+
+ @Test
+ public void testLatencyForClientException()
+ throws Exception {
+ long requestId = 123;
+ DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
+ dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(),
Long.toString(requestId));
+ Exception exception = new UnsupportedOperationException("Caught
exception.");
+ ProcessingException processingException =
+ QueryException.getException(QueryException.QUERY_CANCELLATION_ERROR,
exception);
+ dataTable.addException(processingException);
+ byte[] responseBytes = dataTable.toBytes();
+ String serverId = SERVER_INSTANCE.getInstanceId();
+ // Start the server
+ QueryServer queryServer = getQueryServer(0, responseBytes);
+ queryServer.start();
+
+ // Send a query with client side errors.
+ Double latencyBefore =
_serverRoutingStatsManager.fetchEMALatencyForServer(serverId);
+
+ AsyncQueryResponse asyncQueryResponse =
+ _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST,
ROUTING_TABLE, null, null, 1_000L);
+ Map<ServerRoutingInstance, ServerResponse> response =
asyncQueryResponse.getFinalResponses();
+ assertEquals(response.size(), 1);
+ assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
+ ServerResponse serverResponse =
response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
+
+ _requestCount += 2;
+ waitForStatsUpdate(_requestCount);
+
+ Double latencyAfter =
_serverRoutingStatsManager.fetchEMALatencyForServer(serverId);
+
+ if (latencyBefore == null) {
+ // Latency for the server with client side exception is assigned as
serverResponse.getResponseDelayMs() and the
+ //calculated
+ // EWMLatency for the server will be less than
serverResponse.getResponseDelayMs()
+ assertTrue(latencyAfter <= serverResponse.getResponseDelayMs());
+ } else {
+ assertTrue(latencyAfter < latencyBefore, latencyAfter + " should be
lesser than " + latencyBefore);
+ }
+
+ // Shut down the server
+ queryServer.shutDown();
+ }
+
+ @Test
+ public void testLatencyForMultipleExceptions()
+ throws Exception {
+ long requestId = 123;
+ DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
+ dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(),
Long.toString(requestId));
+ Exception exception = new UnsupportedOperationException("Caught
exception.");
+ ProcessingException processingException =
+ QueryException.getException(QueryException.QUERY_CANCELLATION_ERROR,
exception);
+ ProcessingException processingServerException =
+ QueryException.getException(QueryException.SERVER_TABLE_MISSING_ERROR,
exception);
+ dataTable.addException(processingServerException);
+ dataTable.addException(processingException);
+ byte[] responseBytes = dataTable.toBytes();
+ String serverId = SERVER_INSTANCE.getInstanceId();
+ // Start the server
+ QueryServer queryServer = getQueryServer(0, responseBytes);
+ queryServer.start();
+
+ // Send a query with multiple exceptions. Make sure that the latency is
set to timeout value even if a single
+ //server-side exception is seen.
+ Double latencyBefore =
_serverRoutingStatsManager.fetchEMALatencyForServer(serverId);
+ AsyncQueryResponse asyncQueryResponse =
+ _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST,
ROUTING_TABLE, null, null, 1_000L);
+ Map<ServerRoutingInstance, ServerResponse> response =
asyncQueryResponse.getFinalResponses();
+ assertEquals(response.size(), 1);
+ assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
+
+ _requestCount += 2;
+ waitForStatsUpdate(_requestCount);
+ Double latencyAfter =
_serverRoutingStatsManager.fetchEMALatencyForServer(serverId);
+
+ if (latencyBefore == null) {
+ // This means that no queries where run before this test. So we can just
make sure that latencyAfter is equal
+ //to 666.334.
+ // This corresponds to the EWMA value when a latency timeout value of
1000 is set.
+ double serverEWMALatency = 666.334;
+ // Leaving an error budget of 2%
+ double delta = 13.32;
+ assertEquals(latencyAfter, serverEWMALatency, delta);
+ } else {
+ assertTrue(latencyAfter > latencyBefore, latencyAfter + " should be
greater than " + latencyBefore);
+ }
+
+ // Shut down the server
+ queryServer.shutDown();
+ }
+
+ @Test
+ public void testLatencyForNoException()
+ throws Exception {
+ long requestId = 123;
+ DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
+ dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(),
Long.toString(requestId));
+ byte[] responseBytes = dataTable.toBytes();
+ String serverId = SERVER_INSTANCE.getInstanceId();
+ // Start the server
+ QueryServer queryServer = getQueryServer(0, responseBytes);
+ queryServer.start();
+
+ // Send a valid query and get latency
+ Double latencyBefore =
_serverRoutingStatsManager.fetchEMALatencyForServer(serverId);
+ AsyncQueryResponse asyncQueryResponse =
+ _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST,
ROUTING_TABLE, null, null, 1_000L);
+ Map<ServerRoutingInstance, ServerResponse> response =
asyncQueryResponse.getFinalResponses();
+ assertEquals(response.size(), 1);
+ assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
+ ServerResponse serverResponse =
response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
+
+ _requestCount += 2;
+ waitForStatsUpdate(_requestCount);
+ Double latencyAfter =
_serverRoutingStatsManager.fetchEMALatencyForServer(serverId);
+
+ if (latencyBefore == null) {
+ // Latency for the server with no exceptions is assigned as
serverResponse.getResponseDelayMs() and the calculated
+ // EWMLatency for the server will be less than
serverResponse.getResponseDelayMs()
+ assertTrue(latencyAfter <= serverResponse.getResponseDelayMs());
+ } else {
+ assertTrue(latencyAfter < latencyBefore, latencyAfter + " should be
lesser than " + latencyBefore);
+ }
+
+ // Shut down the server
+ queryServer.shutDown();
+ }
+
@Test
public void testNonMatchingRequestId()
throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]