This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 38af5f7b57 NettyHttpClient: Cleaner state transitions for handlers. 
(#12889)
38af5f7b57 is described below

commit 38af5f7b57769e026f028a56bb9be8003e01ddc5
Author: Gian Merlino <[email protected]>
AuthorDate: Thu Aug 11 09:31:37 2022 -0700

    NettyHttpClient: Cleaner state transitions for handlers. (#12889)
    
    The Netty pipeline set up by the client can deliver multiple exceptions,
    and can deliver chunks even after delivering exceptions. This makes it
    difficult to implement HttpResponseHandlers. Looking at existing handler
    implementations, I do not see attempts to handle this case, so it's also
    a potential source of bugs.
    
    This patch updates the client to track whether an exception was
    encountered, and if so, to not call any additional methods on the handler
    after exceptionCaught. It also harmonizes exception handling between
    exceptionCaught and channelDisconnected.
---
 .../java/util/http/client/NettyHttpClient.java     | 97 +++++++++++++---------
 .../util/http/client/pool/ResourceContainer.java   |  1 -
 .../java/util/http/client/pool/ResourcePool.java   |  6 --
 3 files changed, 57 insertions(+), 47 deletions(-)

diff --git 
a/core/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java
 
b/core/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java
index f873da994a..3ab3719180 100644
--- 
a/core/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java
+++ 
b/core/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java
@@ -59,8 +59,10 @@ import java.net.URL;
 import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
+ * Implementation of {@link HttpClient} built using Netty.
  */
 public class NettyHttpClient extends AbstractHttpClient
 {
@@ -169,6 +171,11 @@ public class NettyHttpClient extends AbstractHttpClient
     final long readTimeout = getReadTimeout(requestReadTimeout);
     final SettableFuture<Final> retVal = SettableFuture.create();
 
+    // Pipeline can hand us chunks even after exceptionCaught is called. This 
has the potential to confuse
+    // HttpResponseHandler implementations, which expect exceptionCaught to be 
the final method called. So, we
+    // use this boolean to ensure that handlers do not see any chunks after 
exceptionCaught fires.
+    final AtomicBoolean didEncounterException = new AtomicBoolean();
+
     if (readTimeout > 0) {
       channel.getPipeline().addLast(
           READ_TIMEOUT_HANDLER_NAME,
@@ -202,6 +209,11 @@ public class NettyHttpClient extends AbstractHttpClient
               Object msg = e.getMessage();
 
               if (msg instanceof HttpResponse) {
+                if (didEncounterException.get()) {
+                  // Don't process HttpResponse after encountering an 
exception.
+                  return;
+                }
+
                 HttpResponse httpResponse = (HttpResponse) msg;
                 if (log.isDebugEnabled()) {
                   log.debug("[%s] Got response: %s", requestDesc, 
httpResponse.getStatus());
@@ -234,6 +246,11 @@ public class NettyHttpClient extends AbstractHttpClient
                   finishRequest();
                 }
               } else if (msg instanceof HttpChunk) {
+                if (didEncounterException.get()) {
+                  // Don't process HttpChunk after encountering an exception.
+                  return;
+                }
+
                 HttpChunk httpChunk = (HttpChunk) msg;
                 if (log.isDebugEnabled()) {
                   log.debug(
@@ -307,62 +324,62 @@ public class NettyHttpClient extends AbstractHttpClient
 
           @Override
           public void exceptionCaught(ChannelHandlerContext context, 
ExceptionEvent event)
+          {
+            handleExceptionAndCloseChannel(event.getCause(), false);
+          }
+
+          @Override
+          public void channelDisconnected(ChannelHandlerContext context, 
ChannelStateEvent event)
+          {
+            handleExceptionAndCloseChannel(new ChannelException("Channel 
disconnected"), true);
+          }
+
+          /**
+           * Handle an exception by logging it, possibly calling {@link 
SettableFuture#setException} on {@code retVal},
+           * possibly calling {@link HttpResponseHandler#exceptionCaught}, and 
possibly closing the channel.
+           *
+           * No actions will be taken (other than logging) if an exception has 
already been handled for this request.
+           *
+           * @param t exception
+           * @param closeIfNotOpen Call {@link Channel#close()} even if {@link 
Channel#isOpen()} returns false.
+           *                       Provided to retain existing behavior of two 
different chunks of code that were
+           *                       merged into this single method.
+           */
+          private void handleExceptionAndCloseChannel(final Throwable t, final 
boolean closeIfNotOpen)
           {
             if (log.isDebugEnabled()) {
-              final Throwable cause = event.getCause();
-              if (cause == null) {
-                log.debug("[%s] Caught exception", requestDesc);
-              } else {
-                log.debug(cause, "[%s] Caught exception", requestDesc);
-              }
+              log.debug(t, "[%s] Caught exception", requestDesc);
             }
 
-            // Ignore return value of setException, since exceptionCaught can 
be called multiple times and we
-            // only want to report the first one.
-            if (event.getCause() instanceof ReadTimeoutException) {
-              // ReadTimeoutException thrown by ReadTimeoutHandler is a 
singleton with a misleading stack trace.
-              // No point including it: instead, we replace it with a fresh 
exception.
-              retVal.setException(new 
ReadTimeoutException(StringUtils.format("[%s] Read timed out", requestDesc)));
-            } else {
-              retVal.setException(event.getCause());
+            // Only process the first exception encountered.
+            if (!didEncounterException.compareAndSet(false, true)) {
+              return;
+            }
+
+            if (!retVal.isDone()) {
+              if (t instanceof ReadTimeoutException) {
+                // ReadTimeoutException thrown by ReadTimeoutHandler is a 
singleton with a misleading stack trace.
+                // No point including it: instead, we replace it with a fresh 
exception.
+                retVal.setException(new 
ReadTimeoutException(StringUtils.format("[%s] Read timed out", requestDesc)));
+              } else {
+                retVal.setException(t);
+              }
             }
 
             // response is non-null if we received initial chunk and then 
exception occurs
             if (response != null) {
-              handler.exceptionCaught(response, event.getCause());
+              handler.exceptionCaught(response, t);
             }
             try {
-              if (channel.isOpen()) {
+              if (closeIfNotOpen || channel.isOpen()) {
                 channel.close();
               }
             }
             catch (Exception e) {
-              log.warn(e, "Error while closing channel");
+              log.warn(e, "[%s] Error while closing channel", requestDesc);
             }
             finally {
-              if (channelResourceContainer.isPresent()) {
-                // exceptionCaught can be called multiple times: we only want 
to return the channel if it hasn't
-                // already been returned.
-                channelResourceContainer.returnResource();
-              }
-            }
-          }
-
-          @Override
-          public void channelDisconnected(ChannelHandlerContext context, 
ChannelStateEvent event)
-          {
-            if (log.isDebugEnabled()) {
-              log.debug("[%s] Channel disconnected", requestDesc);
-            }
-            // response is non-null if we received initial chunk and then 
exception occurs
-            if (response != null) {
-              handler.exceptionCaught(response, new ChannelException("Channel 
disconnected"));
-            }
-            channel.close();
-            channelResourceContainer.returnResource();
-            if (!retVal.isDone()) {
-              log.warn("[%s] Channel disconnected before response complete", 
requestDesc);
-              retVal.setException(new ChannelException("Channel 
disconnected"));
+              channelResourceContainer.returnResource();
             }
           }
 
diff --git 
a/core/src/main/java/org/apache/druid/java/util/http/client/pool/ResourceContainer.java
 
b/core/src/main/java/org/apache/druid/java/util/http/client/pool/ResourceContainer.java
index 7f18fdd18a..37fc23ea5c 100644
--- 
a/core/src/main/java/org/apache/druid/java/util/http/client/pool/ResourceContainer.java
+++ 
b/core/src/main/java/org/apache/druid/java/util/http/client/pool/ResourceContainer.java
@@ -24,6 +24,5 @@ package org.apache.druid.java.util.http.client.pool;
 public interface ResourceContainer<ResourceType>
 {
   ResourceType get();
-  boolean isPresent();
   void returnResource();
 }
diff --git 
a/core/src/main/java/org/apache/druid/java/util/http/client/pool/ResourcePool.java
 
b/core/src/main/java/org/apache/druid/java/util/http/client/pool/ResourcePool.java
index 6ed025c768..2476e4c9eb 100644
--- 
a/core/src/main/java/org/apache/druid/java/util/http/client/pool/ResourcePool.java
+++ 
b/core/src/main/java/org/apache/druid/java/util/http/client/pool/ResourcePool.java
@@ -119,12 +119,6 @@ public class ResourcePool<K, V> implements Closeable
         return value;
       }
 
-      @Override
-      public boolean isPresent()
-      {
-        return !returned.get();
-      }
-
       @Override
       public void returnResource()
       {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to