This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 4845cf938e423355ea96ed7a3e1942fcceac2de3
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed Jun 16 09:49:05 2021 +0200

    CAMEL-16718: camel-netty - If netty producer fails writing at a early phase 
then netty does not trigger exceptionCaught in the client handler, so we need 
to do something when operation is not success and then cause Camel to get the 
real caused exception and continue routing its callback, otherwise a thread may 
hang. Thanks to Morgan L for reporting and the analysis of the problem.
---
 .../apache/camel/component/netty/NettyCamelState.java | 19 +++++++++++++++++++
 .../apache/camel/component/netty/NettyProducer.java   | 16 +++++++++++++++-
 .../netty/handlers/ClientChannelHandler.java          | 14 ++++++++------
 3 files changed, 42 insertions(+), 7 deletions(-)

diff --git 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java
 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java
index 389ac99..517fe4c 100644
--- 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java
+++ 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.netty;
 
+import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.camel.AsyncCallback;
@@ -34,11 +35,13 @@ public final class NettyCamelState {
     private final AsyncCallback callback;
     // It is never a good idea to call the same callback twice
     private final AtomicBoolean callbackCalled;
+    private final AtomicBoolean exceptionCaught;
 
     public NettyCamelState(AsyncCallback callback, Exchange exchange) {
         this.callback = callback;
         this.exchange = exchange;
         this.callbackCalled = new AtomicBoolean();
+        this.exceptionCaught = new AtomicBoolean();
     }
 
     public AsyncCallback getCallback() {
@@ -59,4 +62,20 @@ public final class NettyCamelState {
     public Exchange getExchange() {
         return exchange;
     }
+
+    public void onExceptionCaught() {
+        exceptionCaught.set(true);
+    }
+
+    public void onExceptionCaughtOnce(boolean doneSync) {
+        // only trigger callback once if an exception has not already been 
caught
+        // (ClientChannelHandler#exceptionCaught vs 
NettyProducer#processWithConnectedChannel)
+        if (exceptionCaught.compareAndSet(false, true)) {
+            // set some general exception as Camel should know the netty write 
operation failed
+            if (exchange.getException() == null) {
+                exchange.setException(new IOException("Netty write operation 
failed"));
+            }
+            callbackDoneOnce(doneSync);
+        }
+    }
 }
diff --git 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
index 764ea9d..e2e1e3b 100644
--- 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
+++ 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
@@ -341,7 +341,8 @@ public class NettyProducer extends DefaultAsyncProducer {
         }
 
         // setup state as attachment on the channel, so we can access the 
state later when needed
-        channelCorrelationManager.putState(channel, new 
NettyCamelState(producerCallback, exchange));
+        final NettyCamelState state = new NettyCamelState(producerCallback, 
exchange);
+        channelCorrelationManager.putState(channel, state);
         // here we need to setup the remote address information here
         InetSocketAddress remoteAddress = null;
         if (!isTcp()) {
@@ -353,7 +354,20 @@ public class NettyProducer extends DefaultAsyncProducer {
             public void operationComplete(ChannelFuture channelFuture) throws 
Exception {
                 LOG.trace("Operation complete {}", channelFuture);
                 if (!channelFuture.isSuccess()) {
+                    Throwable cause = null;
                     // no success then exit, (any exception has been handled 
by ClientChannelHandler#exceptionCaught)
+                    try {
+                        // need to get real caused exception from netty, which 
is not possible in a nice API
+                        // but we can try to get a result with a 0 timeout, 
then netty will throw the caused
+                        // exception wrapped in an outer exception
+                        channelFuture.get(0, TimeUnit.MILLISECONDS);
+                    } catch (Exception e) {
+                        cause = e.getCause();
+                    }
+                    if (cause != null) {
+                        exchange.setException(cause);
+                    }
+                    state.onExceptionCaughtOnce(false);
                     return;
                 }
 
diff --git 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
index 0000818..57e7ad6 100644
--- 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
+++ 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
@@ -68,20 +68,19 @@ public class ClientChannelHandler extends 
SimpleChannelInboundHandler<Object> {
         if (LOG.isTraceEnabled()) {
             LOG.trace("Exception caught at Channel: {}", ctx.channel(), cause);
         }
-
         if (exceptionHandled) {
             // ignore subsequent exceptions being thrown
             return;
         }
-
         exceptionHandled = true;
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Closing channel as an exception was thrown from Netty", 
cause);
-        }
+        Exchange exchange = null;
 
         NettyCamelState state = getState(ctx, cause);
-        Exchange exchange = state != null ? state.getExchange() : null;
+        if (state != null) {
+            state.onExceptionCaught();
+            exchange = state.getExchange();
+        }
 
         // the state may not be set
         if (exchange != null) {
@@ -94,6 +93,9 @@ public class ClientChannelHandler extends 
SimpleChannelInboundHandler<Object> {
             }
 
             // close channel in case an exception was thrown
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Closing channel as an exception was thrown from 
Netty", cause);
+            }
             NettyHelper.close(ctx.channel());
 
             // signal callback

Reply via email to