This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a894f8  Added backpressure metric (#6335)
5a894f8 is described below

commit 5a894f830b6d0dd7630bdab87a9fe0fd0237c704
Author: Shiv Toolsidass <[email protected]>
AuthorDate: Sat Sep 29 14:24:04 2018 -0700

    Added backpressure metric (#6335)
    
    * Added backpressure metric
    
    * Updated channelReadable to AtomicBoolean and fixed broken test
    
    * Moved backpressure metric logic to NettyHttpClient
    
    * Fix placement of calculating backPressureDuration
---
 .../apache/druid/java/util/http/client/NettyHttpClient.java   |  6 ++++++
 .../java/util/http/client/response/HttpResponseHandler.java   |  3 ++-
 .../main/java/org/apache/druid/query/DefaultQueryMetrics.java |  6 ++++++
 .../src/main/java/org/apache/druid/query/QueryMetrics.java    |  5 +++++
 .../apache/druid/query/search/DefaultSearchQueryMetrics.java  |  6 ++++++
 .../apache/druid/query/select/DefaultSelectQueryMetrics.java  |  6 ++++++
 .../java/org/apache/druid/query/DefaultQueryMetricsTest.java  |  5 +++++
 .../main/java/org/apache/druid/client/DirectDruidClient.java  | 11 +++++++++--
 8 files changed, 45 insertions(+), 3 deletions(-)

diff --git 
a/java-util/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java
 
b/java-util/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java
index d15e1e6..4603a33 100644
--- 
a/java-util/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java
+++ 
b/java-util/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java
@@ -72,6 +72,7 @@ public class NettyHttpClient extends AbstractHttpClient
   private final ResourcePool<String, ChannelFuture> pool;
   private final HttpClientConfig.CompressionCodec compressionCodec;
   private final Duration defaultReadTimeout;
+  private long backPressureStartTimeNs;
 
   NettyHttpClient(
       ResourcePool<String, ChannelFuture> pool,
@@ -212,9 +213,13 @@ public class NettyHttpClient extends AbstractHttpClient
                     if (suspendWatermark >= 0 && resumeWatermark >= 
suspendWatermark) {
                       suspendWatermark = -1;
                       channel.setReadable(true);
+                      long backPressureDuration = System.nanoTime() - 
backPressureStartTimeNs;
                       log.debug("[%s] Resumed reads from channel (chunkNum = 
%,d).", requestDesc, resumeChunkNum);
+                      return backPressureDuration;
                     }
                   }
+
+                  return 0; //If we didn't resume, don't know if backpressure 
was happening
                 };
                 response = handler.handleResponse(httpResponse, trafficCop);
                 if (response.isFinished()) {
@@ -271,6 +276,7 @@ public class NettyHttpClient extends AbstractHttpClient
                 suspendWatermark = Math.max(suspendWatermark, currentChunkNum);
                 if (suspendWatermark > resumeWatermark) {
                   channel.setReadable(false);
+                  backPressureStartTimeNs = System.nanoTime();
                   log.debug("[%s] Suspended reads from channel (chunkNum = 
%,d).", requestDesc, currentChunkNum);
                 }
               }
diff --git 
a/java-util/src/main/java/org/apache/druid/java/util/http/client/response/HttpResponseHandler.java
 
b/java-util/src/main/java/org/apache/druid/java/util/http/client/response/HttpResponseHandler.java
index 02d6caa..03e54e7 100644
--- 
a/java-util/src/main/java/org/apache/druid/java/util/http/client/response/HttpResponseHandler.java
+++ 
b/java-util/src/main/java/org/apache/druid/java/util/http/client/response/HttpResponseHandler.java
@@ -91,7 +91,8 @@ public interface HttpResponseHandler<IntermediateType, 
FinalType>
      * Call this to resume reading after you have suspended it.
      *
      * @param chunkNum chunk number corresponding to the handleChunk() or 
handleResponse() call from which you
+     * @return time that backpressure was applied (channel was closed for 
reads)
      */
-    void resume(long chunkNum);
+    long resume(long chunkNum);
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java 
b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java
index 6f8d3c6..b332fec 100644
--- a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java
+++ b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java
@@ -244,6 +244,12 @@ public class DefaultQueryMetrics<QueryType extends 
Query<?>> implements QueryMet
   }
 
   @Override
+  public QueryMetrics<QueryType> reportBackPressureTime(long timeNs)
+  {
+    return reportMillisTimeMetric("query/node/backpressure", timeNs);
+  }
+
+  @Override
   public QueryMetrics<QueryType> reportNodeTime(long timeNs)
   {
     return reportMillisTimeMetric("query/node/time", timeNs);
diff --git a/processing/src/main/java/org/apache/druid/query/QueryMetrics.java 
b/processing/src/main/java/org/apache/druid/query/QueryMetrics.java
index b8d2b55..c5e32b9 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryMetrics.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryMetrics.java
@@ -279,6 +279,11 @@ public interface QueryMetrics<QueryType extends Query<?>>
   QueryMetrics<QueryType> reportNodeTimeToFirstByte(long timeNs);
 
   /**
+   * Registers "time that channel is unreadable (backpressure)" metric.
+   */
+  QueryMetrics<QueryType> reportBackPressureTime(long timeNs);
+
+  /**
    * Registers "node time" metric.
    */
   QueryMetrics<QueryType> reportNodeTime(long timeNs);
diff --git 
a/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java
 
b/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java
index 96b126a..a44b004 100644
--- 
a/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java
+++ 
b/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java
@@ -209,6 +209,12 @@ public class DefaultSearchQueryMetrics implements 
SearchQueryMetrics
   }
 
   @Override
+  public QueryMetrics reportBackPressureTime(long timeNs)
+  {
+    return delegateQueryMetrics.reportBackPressureTime(timeNs);
+  }
+
+  @Override
   public QueryMetrics reportNodeTime(long timeNs)
   {
     return delegateQueryMetrics.reportNodeTime(timeNs);
diff --git 
a/processing/src/main/java/org/apache/druid/query/select/DefaultSelectQueryMetrics.java
 
b/processing/src/main/java/org/apache/druid/query/select/DefaultSelectQueryMetrics.java
index bf59412..d2b6b60 100644
--- 
a/processing/src/main/java/org/apache/druid/query/select/DefaultSelectQueryMetrics.java
+++ 
b/processing/src/main/java/org/apache/druid/query/select/DefaultSelectQueryMetrics.java
@@ -208,6 +208,12 @@ public class DefaultSelectQueryMetrics implements 
SelectQueryMetrics
   }
 
   @Override
+  public QueryMetrics reportBackPressureTime(long timeNs)
+  {
+    return delegateQueryMetrics.reportBackPressureTime(timeNs);
+  }
+
+  @Override
   public QueryMetrics reportNodeTime(long timeNs)
   {
     return delegateQueryMetrics.reportNodeTime(timeNs);
diff --git 
a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java 
b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
index ac022ca..16312c6 100644
--- 
a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
@@ -152,5 +152,10 @@ public class DefaultQueryMetricsTest
     actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
     Assert.assertEquals("query/node/bytes", actualEvent.get("metric"));
     Assert.assertEquals(10L, actualEvent.get("value"));
+
+    queryMetrics.reportBackPressureTime(11000001).emit(serviceEmitter);
+    actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
+    Assert.assertEquals("query/node/backpressure", actualEvent.get("metric"));
+    Assert.assertEquals(11L, actualEvent.get("value"));
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java 
b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
index 60dfd7a..39396b8 100644
--- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
+++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
@@ -203,6 +203,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
       {
         private final AtomicLong totalByteCount = new AtomicLong(0);
         private final AtomicLong queuedByteCount = new AtomicLong(0);
+        private final AtomicLong channelSuspendedTime = new AtomicLong(0);
         private final BlockingQueue<InputStreamHolder> queue = new 
LinkedBlockingQueue<>();
         private final AtomicBoolean done = new AtomicBoolean(false);
         private final AtomicReference<String> fail = new AtomicReference<>();
@@ -244,8 +245,9 @@ public class DirectDruidClient<T> implements QueryRunner<T>
 
           final long currentQueuedByteCount = 
queuedByteCount.addAndGet(-holder.getLength());
           if (usingBackpressure && currentQueuedByteCount < maxQueuedBytes) {
-            Preconditions.checkNotNull(trafficCopRef.get(), "No TrafficCop, 
how can this be?")
-                         .resume(holder.getChunkNum());
+            long backPressureTime = 
Preconditions.checkNotNull(trafficCopRef.get(), "No TrafficCop, how can this 
be?")
+                                                 .resume(holder.getChunkNum());
+            channelSuspendedTime.addAndGet(backPressureTime);
           }
 
           return holder.getStream();
@@ -382,6 +384,11 @@ public class DirectDruidClient<T> implements QueryRunner<T>
           QueryMetrics<? super Query<T>> responseMetrics = 
acquireResponseMetrics();
           responseMetrics.reportNodeTime(nodeTimeNs);
           responseMetrics.reportNodeBytes(totalByteCount.get());
+
+          if (usingBackpressure) {
+            responseMetrics.reportBackPressureTime(channelSuspendedTime.get());
+          }
+
           responseMetrics.emit(emitter);
           synchronized (done) {
             try {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to