This is an automated email from the ASF dual-hosted git repository.
tuglu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 1f981bb6e0b Fix extra router statusCode metric edge cases (#18699)
1f981bb6e0b is described below
commit 1f981bb6e0b0148579662ca08c7102ef6fd646a5
Author: jtuglu1 <[email protected]>
AuthorDate: Tue Oct 28 11:13:00 2025 -0700
Fix extra router statusCode metric edge cases (#18699)
The `statusCode` dimension on `query/time` metric will misreport if
connection exceptions occur while router/broker are communicating (e.g. there's
no HTTP status code yet – Jetty sets defaults this to 0) or when a client
manually closes the connection while a downstream query is still in-flight. We
cannot perfectly map every error in this case (nor do I think we should), but
it's better than misreporting incorrect codes. Since we need to report a metric
event anyways, maintaining a f [...]
---
.../druid/server/AsyncQueryForwardingServlet.java | 33 +++++-
.../server/AsyncQueryForwardingServletTest.java | 127 +++++++++++++++++++++
2 files changed, 156 insertions(+), 4 deletions(-)
diff --git
a/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
index 3eedf19a4b2..85abd832fef 100644
---
a/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
+++
b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
@@ -107,6 +107,8 @@ public class AsyncQueryForwardingServlet extends
AsyncProxyServlet implements Qu
private static final String PROPERTY_SQL_ENABLE_DEFAULT = "false";
private static final long CANCELLATION_TIMEOUT_MILLIS =
TimeUnit.SECONDS.toMillis(5);
+ // Jetty-specific default (un-assigned) status code
+ private static final int UNASSIGNED_DEFAULT_STATUS_CODE = 0;
private final AtomicLong successfulQueryCount = new AtomicLong();
private final AtomicLong failedQueryCount = new AtomicLong();
@@ -760,8 +762,8 @@ public class AsyncQueryForwardingServlet extends
AsyncProxyServlet implements Qu
return;
}
- final int statusCode = result.getResponse().getStatus();
- boolean success = result.isSucceeded() && statusCode ==
Status.OK.getStatusCode();
+ final boolean success = result.isSucceeded() &&
result.getResponse().getStatus() == Status.OK.getStatusCode();
+ final int statusCode = determineStatusCode(success,
result.getResponse().getStatus());
if (success) {
successfulQueryCount.incrementAndGet();
} else {
@@ -770,6 +772,7 @@ public class AsyncQueryForwardingServlet extends
AsyncProxyServlet implements Qu
// As router is simply a proxy, we don't make an effort to construct the
error code from the exception ourselves.
// We rely on broker to set this for us if the error occurs downstream.
+ // Otherwise, if there's a router/client error, we log this as an
unknown error.
emitQueryTime(requestTimeNs, success, sqlQueryId, queryId, statusCode);
AuthenticationResult authenticationResult =
AuthorizationUtils.authenticationResultFromRequest(req);
@@ -857,8 +860,10 @@ public class AsyncQueryForwardingServlet extends
AsyncProxyServlet implements Qu
failedQueryCount.incrementAndGet();
// As router is simply a proxy, we don't make an effort to construct the
error code from the exception ourselves.
- // We rely on broker to set this for us if the error occurs downstream.
- emitQueryTime(requestTimeNs, false, sqlQueryId, queryId,
response.getStatus());
+ // We rely on broker to set this for us if the error occurs downstream.
+ // Otherwise, if there's a router/client error, we log this as an
unknown error.
+ final int statusCode = determineStatusCode(false, response.getStatus());
+ emitQueryTime(requestTimeNs, false, sqlQueryId, queryId, statusCode);
AuthenticationResult authenticationResult =
AuthorizationUtils.authenticationResultFromRequest(req);
//noinspection VariableNotUsedInsideIf
@@ -960,4 +965,24 @@ public class AsyncQueryForwardingServlet extends
AsyncProxyServlet implements Qu
queryMetrics.reportQueryTime(requestTimeNs).emit(emitter);
}
}
+
+ /**
+ * Helper method to assign reasonable status codes in ambigious cases like
client/broker connection errors.
+ *
+ * @param success Whether the query was successful
+ * @param statusCode Status code reported by the broker (or {@value
UNASSIGNED_DEFAULT_STATUS_CODE})
+ */
+ private static int determineStatusCode(boolean success, int statusCode)
+ {
+ if (success) {
+ if (statusCode == UNASSIGNED_DEFAULT_STATUS_CODE) {
+ statusCode = Status.OK.getStatusCode();
+ }
+ } else {
+ if (statusCode == UNASSIGNED_DEFAULT_STATUS_CODE || statusCode ==
Status.OK.getStatusCode()) {
+ statusCode = Status.INTERNAL_SERVER_ERROR.getStatusCode();
+ }
+ }
+ return statusCode;
+ }
}
diff --git
a/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
b/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
index bd5487c6474..372ad96174c 100644
---
a/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
+++
b/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
@@ -92,6 +92,7 @@ import org.eclipse.jetty.ee8.servlet.ServletHolder;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpVersion;
+import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.junit.Assert;
@@ -530,6 +531,132 @@ public class AsyncQueryForwardingServletTest extends
BaseJettyTest
Assert.assertEquals("false",
stubServiceEmitter.getMetricEvents("query/time").get(0).toMap().get("success"));
}
+ @Test
+ public void testOnCompleteWithClosedException()
+ {
+ final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
+ .dataSource("foo")
+ .intervals("2000/P1D")
+ .granularity(Granularities.ALL)
+ .context(ImmutableMap.of("queryId",
"closed-test"))
+ .build();
+
+ final HttpServletRequest requestMock =
Mockito.mock(HttpServletRequest.class);
+
Mockito.when(requestMock.getAttribute("org.apache.druid.proxy.avaticaQuery")).thenReturn(null);
+
Mockito.when(requestMock.getAttribute("org.apache.druid.proxy.query")).thenReturn(query);
+
Mockito.when(requestMock.getAttribute("org.apache.druid.proxy.sqlQuery")).thenReturn(null);
+ Mockito.when(requestMock.getRemoteAddr()).thenReturn("127.0.0.1");
+
Mockito.when(requestMock.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT))
+ .thenReturn(new AuthenticationResult("testUser", "basic", "basic",
null));
+
+ final Request proxyRequestMock = Mockito.mock(Request.class);
+ final Response responseMock = Mockito.mock(Response.class);
+ Mockito.when(responseMock.getStatus()).thenReturn(200); // Status OK
+ Mockito.when(responseMock.getHeaders()).thenReturn(HttpFields.build());
+ Mockito.when(responseMock.getRequest()).thenReturn(proxyRequestMock);
+
+ // Result where connection is closed prematurely
+ final Result result = new Result(proxyRequestMock, responseMock)
+ {
+ @Override
+ public boolean isSucceeded()
+ {
+ return false;
+ }
+
+ @Override
+ public Throwable getFailure()
+ {
+ return new EofException("Stream closed");
+ }
+ };
+
+ final StubServiceEmitter stubServiceEmitter = new StubServiceEmitter("",
"");
+ final AsyncQueryForwardingServlet servlet = new
AsyncQueryForwardingServlet(
+ new MapQueryToolChestWarehouse(ImmutableMap.of()),
+ TestHelper.makeJsonMapper(),
+ TestHelper.makeSmileMapper(),
+ null,
+ null,
+ null,
+ stubServiceEmitter,
+ NoopRequestLogger.instance(),
+ new DefaultGenericQueryMetricsFactory(),
+ new AuthenticatorMapper(ImmutableMap.of()),
+ new Properties(),
+ new ServerConfig()
+ );
+
+ try {
+ servlet.newProxyResponseListener(requestMock, null).onComplete(result);
+ }
+ catch (NullPointerException ignored) {
+ }
+
+ stubServiceEmitter.verifyEmitted("query/time", 1);
+ Assert.assertEquals("closed-test",
stubServiceEmitter.getEvents().get(0).toMap().get("id"));
+ Assert.assertEquals(
+ 500,
+
stubServiceEmitter.getMetricEvents("query/time").get(0).toMap().get(DruidMetrics.STATUS_CODE)
+ );
+ Assert.assertEquals("false",
stubServiceEmitter.getMetricEvents("query/time").get(0).toMap().get("success"));
+ }
+
+ @Test
+ public void testOnFailureWithExceptionAndUnassignedStatusCode()
+ {
+ final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
+ .dataSource("foo")
+ .intervals("2000/P1D")
+ .granularity(Granularities.ALL)
+ .context(ImmutableMap.of("queryId",
"zero-status-test"))
+ .build();
+
+ final HttpServletRequest requestMock =
Mockito.mock(HttpServletRequest.class);
+
Mockito.when(requestMock.getAttribute("org.apache.druid.proxy.avaticaQuery")).thenReturn(null);
+
Mockito.when(requestMock.getAttribute("org.apache.druid.proxy.query")).thenReturn(query);
+
Mockito.when(requestMock.getAttribute("org.apache.druid.proxy.sqlQuery")).thenReturn(null);
+ Mockito.when(requestMock.getRemoteAddr()).thenReturn("127.0.0.1");
+
Mockito.when(requestMock.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT))
+ .thenReturn(new AuthenticationResult("testUser", "basic", "basic",
null));
+
+ final Response responseMock = Mockito.mock(Response.class);
+ Mockito.when(responseMock.getStatus()).thenReturn(0); // Test unassigned
http status code case from server
+ Mockito.when(responseMock.getHeaders()).thenReturn(HttpFields.build());
+
+ final StubServiceEmitter stubServiceEmitter = new StubServiceEmitter("",
"");
+ final AsyncQueryForwardingServlet servlet = new
AsyncQueryForwardingServlet(
+ new MapQueryToolChestWarehouse(ImmutableMap.of()),
+ TestHelper.makeJsonMapper(),
+ TestHelper.makeSmileMapper(),
+ null,
+ null,
+ null,
+ stubServiceEmitter,
+ NoopRequestLogger.instance(),
+ new DefaultGenericQueryMetricsFactory(),
+ new AuthenticatorMapper(ImmutableMap.of()),
+ new Properties(),
+ new ServerConfig()
+ );
+
+ try {
+ servlet.newProxyResponseListener(requestMock, null)
+ .onFailure(responseMock, new IOException("Connection reset by
peer"));
+ }
+ catch (NullPointerException ignored) {
+ }
+
+ stubServiceEmitter.verifyEmitted("query/time", 1);
+ Assert.assertEquals("zero-status-test",
stubServiceEmitter.getEvents().get(0).toMap().get("id"));
+ Assert.assertEquals(
+ 500, // Should default to 500 when status is 0
+
stubServiceEmitter.getMetricEvents("query/time").get(0).toMap().get(DruidMetrics.STATUS_CODE)
+ );
+ Assert.assertEquals("false",
stubServiceEmitter.getMetricEvents("query/time").get(0).toMap().get("success"));
+ }
+
+
@Test
public void testNoParseExceptionOnGroupByWithFilteredAggregationOnLookups()
throws Exception
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]