This is an automated email from the ASF dual-hosted git repository.

jtuglu1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 3bb525bc5ce fix: emit query/node/{bytes/time} metrics even on query 
failure to data node (#19453)
3bb525bc5ce is described below

commit 3bb525bc5cef210d1c96fbc1762379034452cf4c
Author: jtuglu1 <[email protected]>
AuthorDate: Tue May 12 00:03:59 2026 -0700

    fix: emit query/node/{bytes/time} metrics even on query failure to data 
node (#19453)
    
    Failed requests to data nodes (e.g. timeouts) will not log 
query/node/bytes, query/node/time, nor any backpressure metrics which are all 
useful in debugging bottlenecks for queries. Make sure these are emitted 
irrespective of whether the query was successful or not.
---
 .../org/apache/druid/client/DirectDruidClient.java | 27 ++++++----
 .../apache/druid/client/DirectDruidClientTest.java | 63 +++++++++++++++++++++-
 2 files changed, 80 insertions(+), 10 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java 
b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
index ec4ab6f289a..3c746e41070 100644
--- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
+++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
@@ -181,6 +181,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
         private final AtomicLong channelSuspendedTime = new AtomicLong(0);
         private final BlockingQueue<InputStreamHolder> queue = new 
LinkedBlockingQueue<>();
         private final AtomicBoolean done = new AtomicBoolean(false);
+        private final AtomicBoolean nodeMetricsEmitted = new 
AtomicBoolean(false);
         private final AtomicReference<String> fail = new AtomicReference<>();
         private final AtomicReference<TrafficCop> trafficCopRef = new 
AtomicReference<>();
 
@@ -359,15 +360,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
               // Floating math; division by zero will yield Inf, not exception
               totalByteCount.get() / (0.001 * nodeTimeMs)
           );
-          QueryMetrics<? super Query<T>> responseMetrics = 
acquireResponseMetrics();
-          responseMetrics.reportNodeTime(nodeTimeNs);
-          responseMetrics.reportNodeBytes(totalByteCount.get());
-
-          if (usingBackpressure) {
-            responseMetrics.reportBackPressureTime(channelSuspendedTime.get());
-          }
-
-          responseMetrics.emit(emitter);
+          emitNodeMetrics(nodeTimeNs);
           synchronized (done) {
             try {
               // An empty byte array is put at the end to give the 
SequenceInputStream.close() as something to close out
@@ -400,6 +393,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
 
         private void setupResponseReadFailure(String msg, Throwable th)
         {
+          emitNodeMetrics(System.nanoTime() - requestStartTimeNs);
           fail.set(msg);
           queue.clear();
           queue.offer(
@@ -422,6 +416,21 @@ public class DirectDruidClient<T> implements QueryRunner<T>
           );
         }
 
+        // Emit exactly once, regardless of whether we reach this via done() 
or setupResponseReadFailure().
+        private void emitNodeMetrics(long nodeTimeNs)
+        {
+          if (!nodeMetricsEmitted.compareAndSet(false, true)) {
+            return;
+          }
+          QueryMetrics<? super Query<T>> responseMetrics = 
acquireResponseMetrics();
+          responseMetrics.reportNodeTime(nodeTimeNs);
+          responseMetrics.reportNodeBytes(totalByteCount.get());
+          if (usingBackpressure) {
+            responseMetrics.reportBackPressureTime(channelSuspendedTime.get());
+          }
+          responseMetrics.emit(emitter);
+        }
+
         // Returns remaining timeout or throws exception if timeout already 
elapsed.
         private long checkQueryTimeout()
         {
diff --git 
a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java 
b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java
index 5452e7636bd..fa835b009fd 100644
--- a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java
+++ b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java
@@ -29,8 +29,10 @@ import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.apache.druid.query.Druids;
 import org.apache.druid.query.NestedDataTestUtils;
 import org.apache.druid.query.QueryContexts;
@@ -322,6 +324,60 @@ public class DirectDruidClientTest
     Assert.assertEquals(0, client.getNumOpenConnections());
   }
 
+  @Test
+  public void testNodeMetricsEmittedOnSuccess()
+  {
+    StubServiceEmitter stubEmitter = StubServiceEmitter.createStarted();
+    DirectDruidClient client = 
makeDirectDruidClient(initHttpClientWithSuccessfulQuery(), stubEmitter);
+
+    client.run(getQueryPlus(), responseContext).toList();
+
+    Assert.assertEquals(1, stubEmitter.getMetricEventCount("query/node/time"));
+    Assert.assertEquals(1, 
stubEmitter.getMetricEventCount("query/node/bytes"));
+  }
+
+  @Test
+  public void testNodeMetricsEmittedOnError()
+  {
+    // Only setupResponseReadFailure fires (checkQueryTimeout during 
handleResponse) — done() is never called.
+    StubServiceEmitter stubEmitter = StubServiceEmitter.createStarted();
+    final TestHttpClient testHttpClient = new TestHttpClient(objectMapper, 
110);
+    DirectDruidClient client = 
makeDirectDruidClient(initHttpClientFromExistingClient(testHttpClient, false), 
stubEmitter);
+
+    final QueryPlus queryPlus = getQueryPlus(Map.of(
+        DirectDruidClient.QUERY_FAIL_TIME, System.currentTimeMillis() + 50
+    ));
+
+    Assert.assertThrows(QueryTimeoutException.class, () -> 
client.run(queryPlus, responseContext));
+
+    Assert.assertEquals(1, stubEmitter.getMetricEventCount("query/node/time"));
+    Assert.assertEquals(1, 
stubEmitter.getMetricEventCount("query/node/bytes"));
+  }
+
+  @Test
+  public void testNodeMetricsEmittedExactlyOnceWhenDoneAndTimeoutBothFire() 
throws InterruptedException
+  {
+    // done() fires synchronously during run(), then results.toList() calls 
checkQueryTimeout() after the
+    // timeout has already expired, triggering setupResponseReadFailure(). The 
compareAndSet guard must
+    // prevent the second emitNodeMetrics() call from emitting.
+    StubServiceEmitter stubEmitter = StubServiceEmitter.createStarted();
+    DirectDruidClient client = 
makeDirectDruidClient(initHttpClientWithSuccessfulQuery(), stubEmitter);
+
+    // Timeout far enough in the future that handleResponse + done() complete 
during run(), but we sleep
+    // past it before consuming the sequence so that checkQueryTimeout() fires 
during toList().
+    final QueryPlus queryPlus = getQueryPlus(Map.of(
+        DirectDruidClient.QUERY_FAIL_TIME, System.currentTimeMillis() + 500
+    ));
+
+    Sequence results = client.run(queryPlus, responseContext);
+    Thread.sleep(600);
+
+    Assert.assertThrows(QueryTimeoutException.class, results::toList);
+
+    Assert.assertEquals(1, stubEmitter.getMetricEventCount("query/node/time"));
+    Assert.assertEquals(1, 
stubEmitter.getMetricEventCount("query/node/bytes"));
+  }
+
   @Test
   public void testResourceLimitExceededException()
   {
@@ -346,6 +402,11 @@ public class DirectDruidClientTest
   }
 
   private DirectDruidClient makeDirectDruidClient(HttpClient httpClient)
+  {
+    return makeDirectDruidClient(httpClient, new NoopServiceEmitter());
+  }
+
+  private DirectDruidClient makeDirectDruidClient(HttpClient httpClient, 
ServiceEmitter emitter)
   {
     return new DirectDruidClient(
         conglomerateRule.getConglomerate(),
@@ -354,7 +415,7 @@ public class DirectDruidClientTest
         httpClient,
         "http",
         hostName,
-        new NoopServiceEmitter(),
+        emitter,
         queryCancellationExecutor
     );
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to