This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 4845cf938e423355ea96ed7a3e1942fcceac2de3 Author: Claus Ibsen <[email protected]> AuthorDate: Wed Jun 16 09:49:05 2021 +0200 CAMEL-16718: camel-netty - If netty producer fails writing at a early phase then netty does not trigger exceptionCaught in the client handler, so we need to do something when operation is not success and then cause Camel to get the real caused exception and continue routing its callback, otherwise a thread may hang. Thanks to Morgan L for reporting and the analysis of the problem. --- .../apache/camel/component/netty/NettyCamelState.java | 19 +++++++++++++++++++ .../apache/camel/component/netty/NettyProducer.java | 16 +++++++++++++++- .../netty/handlers/ClientChannelHandler.java | 14 ++++++++------ 3 files changed, 42 insertions(+), 7 deletions(-) diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java index 389ac99..517fe4c 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.netty; +import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.camel.AsyncCallback; @@ -34,11 +35,13 @@ public final class NettyCamelState { private final AsyncCallback callback; // It is never a good idea to call the same callback twice private final AtomicBoolean callbackCalled; + private final AtomicBoolean exceptionCaught; public NettyCamelState(AsyncCallback callback, Exchange exchange) { this.callback = callback; this.exchange = exchange; this.callbackCalled = new AtomicBoolean(); + this.exceptionCaught = new AtomicBoolean(); } public AsyncCallback getCallback() { @@ -59,4 +62,20 @@ public final class NettyCamelState { public Exchange getExchange() { return exchange; } + + public void onExceptionCaught() { + exceptionCaught.set(true); + } + + public void onExceptionCaughtOnce(boolean doneSync) { + // only trigger callback once if an exception has not already been caught + // (ClientChannelHandler#exceptionCaught vs NettyProducer#processWithConnectedChannel) + if (exceptionCaught.compareAndSet(false, true)) { + // set some general exception as Camel should know the netty write operation failed + if (exchange.getException() == null) { + exchange.setException(new IOException("Netty write operation failed")); + } + callbackDoneOnce(doneSync); + } + } } diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java index 764ea9d..e2e1e3b 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java @@ -341,7 +341,8 @@ public class NettyProducer extends DefaultAsyncProducer { } // setup state as attachment on the channel, so we can access the state later when needed - channelCorrelationManager.putState(channel, new NettyCamelState(producerCallback, exchange)); + final NettyCamelState state = new NettyCamelState(producerCallback, exchange); + channelCorrelationManager.putState(channel, state); // here we need to setup the remote address information here InetSocketAddress remoteAddress = null; if (!isTcp()) { @@ -353,7 +354,20 @@ public class NettyProducer extends DefaultAsyncProducer { public void operationComplete(ChannelFuture channelFuture) throws Exception { LOG.trace("Operation complete {}", channelFuture); if (!channelFuture.isSuccess()) { + Throwable cause = null; // no success then exit, (any exception has been handled by ClientChannelHandler#exceptionCaught) + try { + // need to get real caused exception from netty, which is not possible in a nice API + // but we can try to get a result with a 0 timeout, then netty will throw the caused + // exception wrapped in an outer exception + channelFuture.get(0, TimeUnit.MILLISECONDS); + } catch (Exception e) { + cause = e.getCause(); + } + if (cause != null) { + exchange.setException(cause); + } + state.onExceptionCaughtOnce(false); return; } diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java index 0000818..57e7ad6 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java @@ -68,20 +68,19 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> { if (LOG.isTraceEnabled()) { LOG.trace("Exception caught at Channel: {}", ctx.channel(), cause); } - if (exceptionHandled) { // ignore subsequent exceptions being thrown return; } - exceptionHandled = true; - if (LOG.isDebugEnabled()) { - LOG.debug("Closing channel as an exception was thrown from Netty", cause); - } + Exchange exchange = null; NettyCamelState state = getState(ctx, cause); - Exchange exchange = state != null ? state.getExchange() : null; + if (state != null) { + state.onExceptionCaught(); + exchange = state.getExchange(); + } // the state may not be set if (exchange != null) { @@ -94,6 +93,9 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> { } // close channel in case an exception was thrown + if (LOG.isDebugEnabled()) { + LOG.debug("Closing channel as an exception was thrown from Netty", cause); + } NettyHelper.close(ctx.channel()); // signal callback
