This is an automated email from the ASF dual-hosted git repository.
jtuglu1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 3bb525bc5ce fix: emit query/node/{bytes/time} metrics even on query
failure to data node (#19453)
3bb525bc5ce is described below
commit 3bb525bc5cef210d1c96fbc1762379034452cf4c
Author: jtuglu1 <[email protected]>
AuthorDate: Tue May 12 00:03:59 2026 -0700
fix: emit query/node/{bytes/time} metrics even on query failure to data
node (#19453)
Failed requests to data nodes (e.g. timeouts) will not log
query/node/bytes, query/node/time, nor any backpressure metrics which are all
useful in debugging bottlenecks for queries. Make sure these are emitted
irrespective of whether the query was successful or not.
---
.../org/apache/druid/client/DirectDruidClient.java | 27 ++++++----
.../apache/druid/client/DirectDruidClientTest.java | 63 +++++++++++++++++++++-
2 files changed, 80 insertions(+), 10 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
index ec4ab6f289a..3c746e41070 100644
--- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
+++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
@@ -181,6 +181,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
private final AtomicLong channelSuspendedTime = new AtomicLong(0);
private final BlockingQueue<InputStreamHolder> queue = new
LinkedBlockingQueue<>();
private final AtomicBoolean done = new AtomicBoolean(false);
+ private final AtomicBoolean nodeMetricsEmitted = new
AtomicBoolean(false);
private final AtomicReference<String> fail = new AtomicReference<>();
private final AtomicReference<TrafficCop> trafficCopRef = new
AtomicReference<>();
@@ -359,15 +360,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
// Floating math; division by zero will yield Inf, not exception
totalByteCount.get() / (0.001 * nodeTimeMs)
);
- QueryMetrics<? super Query<T>> responseMetrics =
acquireResponseMetrics();
- responseMetrics.reportNodeTime(nodeTimeNs);
- responseMetrics.reportNodeBytes(totalByteCount.get());
-
- if (usingBackpressure) {
- responseMetrics.reportBackPressureTime(channelSuspendedTime.get());
- }
-
- responseMetrics.emit(emitter);
+ emitNodeMetrics(nodeTimeNs);
synchronized (done) {
try {
// An empty byte array is put at the end to give the
SequenceInputStream.close() as something to close out
@@ -400,6 +393,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
private void setupResponseReadFailure(String msg, Throwable th)
{
+ emitNodeMetrics(System.nanoTime() - requestStartTimeNs);
fail.set(msg);
queue.clear();
queue.offer(
@@ -422,6 +416,21 @@ public class DirectDruidClient<T> implements QueryRunner<T>
);
}
+ // Emit exactly once, regardless of whether we reach this via done()
or setupResponseReadFailure().
+ private void emitNodeMetrics(long nodeTimeNs)
+ {
+ if (!nodeMetricsEmitted.compareAndSet(false, true)) {
+ return;
+ }
+ QueryMetrics<? super Query<T>> responseMetrics =
acquireResponseMetrics();
+ responseMetrics.reportNodeTime(nodeTimeNs);
+ responseMetrics.reportNodeBytes(totalByteCount.get());
+ if (usingBackpressure) {
+ responseMetrics.reportBackPressureTime(channelSuspendedTime.get());
+ }
+ responseMetrics.emit(emitter);
+ }
+
// Returns remaining timeout or throws exception if timeout already
elapsed.
private long checkQueryTimeout()
{
diff --git
a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java
b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java
index 5452e7636bd..fa835b009fd 100644
--- a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java
+++ b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java
@@ -29,8 +29,10 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.Druids;
import org.apache.druid.query.NestedDataTestUtils;
import org.apache.druid.query.QueryContexts;
@@ -322,6 +324,60 @@ public class DirectDruidClientTest
Assert.assertEquals(0, client.getNumOpenConnections());
}
+ @Test
+ public void testNodeMetricsEmittedOnSuccess()
+ {
+ StubServiceEmitter stubEmitter = StubServiceEmitter.createStarted();
+ DirectDruidClient client =
makeDirectDruidClient(initHttpClientWithSuccessfulQuery(), stubEmitter);
+
+ client.run(getQueryPlus(), responseContext).toList();
+
+ Assert.assertEquals(1, stubEmitter.getMetricEventCount("query/node/time"));
+ Assert.assertEquals(1,
stubEmitter.getMetricEventCount("query/node/bytes"));
+ }
+
+ @Test
+ public void testNodeMetricsEmittedOnError()
+ {
+ // Only setupResponseReadFailure fires (checkQueryTimeout during
handleResponse) — done() is never called.
+ StubServiceEmitter stubEmitter = StubServiceEmitter.createStarted();
+ final TestHttpClient testHttpClient = new TestHttpClient(objectMapper,
110);
+ DirectDruidClient client =
makeDirectDruidClient(initHttpClientFromExistingClient(testHttpClient, false),
stubEmitter);
+
+ final QueryPlus queryPlus = getQueryPlus(Map.of(
+ DirectDruidClient.QUERY_FAIL_TIME, System.currentTimeMillis() + 50
+ ));
+
+ Assert.assertThrows(QueryTimeoutException.class, () ->
client.run(queryPlus, responseContext));
+
+ Assert.assertEquals(1, stubEmitter.getMetricEventCount("query/node/time"));
+ Assert.assertEquals(1,
stubEmitter.getMetricEventCount("query/node/bytes"));
+ }
+
+ @Test
+ public void testNodeMetricsEmittedExactlyOnceWhenDoneAndTimeoutBothFire()
throws InterruptedException
+ {
+ // done() fires synchronously during run(), then results.toList() calls
checkQueryTimeout() after the
+ // timeout has already expired, triggering setupResponseReadFailure(). The
compareAndSet guard must
+ // prevent the second emitNodeMetrics() call from emitting.
+ StubServiceEmitter stubEmitter = StubServiceEmitter.createStarted();
+ DirectDruidClient client =
makeDirectDruidClient(initHttpClientWithSuccessfulQuery(), stubEmitter);
+
+ // Timeout far enough in the future that handleResponse + done() complete
during run(), but we sleep
+ // past it before consuming the sequence so that checkQueryTimeout() fires
during toList().
+ final QueryPlus queryPlus = getQueryPlus(Map.of(
+ DirectDruidClient.QUERY_FAIL_TIME, System.currentTimeMillis() + 500
+ ));
+
+ Sequence results = client.run(queryPlus, responseContext);
+ Thread.sleep(600);
+
+ Assert.assertThrows(QueryTimeoutException.class, results::toList);
+
+ Assert.assertEquals(1, stubEmitter.getMetricEventCount("query/node/time"));
+ Assert.assertEquals(1,
stubEmitter.getMetricEventCount("query/node/bytes"));
+ }
+
@Test
public void testResourceLimitExceededException()
{
@@ -346,6 +402,11 @@ public class DirectDruidClientTest
}
private DirectDruidClient makeDirectDruidClient(HttpClient httpClient)
+ {
+ return makeDirectDruidClient(httpClient, new NoopServiceEmitter());
+ }
+
+ private DirectDruidClient makeDirectDruidClient(HttpClient httpClient,
ServiceEmitter emitter)
{
return new DirectDruidClient(
conglomerateRule.getConglomerate(),
@@ -354,7 +415,7 @@ public class DirectDruidClientTest
httpClient,
"http",
hostName,
- new NoopServiceEmitter(),
+ emitter,
queryCancellationExecutor
);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]