Repository: camel Updated Branches: refs/heads/camel-2.13.x f0e3e2015 -> f0ddb5ef7 refs/heads/camel-2.14.x 6eb1275ab -> 7f730a86b
CAMEL-7909 camel-netty-http consumer need to close the connection if the response connection header is close Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/14ba1b38 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/14ba1b38 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/14ba1b38 Branch: refs/heads/camel-2.14.x Commit: 14ba1b38f468d12d2d6425b3e92421b47df08155 Parents: 6eb1275 Author: Willem Jiang <willem.ji...@gmail.com> Authored: Tue Oct 14 10:14:12 2014 +0800 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Fri Oct 17 11:25:22 2014 +0800 ---------------------------------------------------------------------- .../netty/http/DefaultNettyHttpBinding.java | 5 +++++ .../http/handlers/HttpServerChannelHandler.java | 18 ++---------------- .../apache/camel/component/netty/NettyHelper.java | 2 +- .../camel/component/netty/NettyProducer.java | 13 ++++++++++--- .../handlers/ServerResponseFutureListener.java | 5 +++++ 5 files changed, 23 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/14ba1b38/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java ---------------------------------------------------------------------- diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java index eed39b0..102fb91 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java @@ -34,6 +34,7 @@ import org.apache.camel.Message; import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.RuntimeCamelException; import org.apache.camel.TypeConverter; +import org.apache.camel.component.netty.NettyConstants; import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.IOHelper; @@ -411,6 +412,10 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable { } } response.headers().set(HttpHeaders.Names.CONNECTION, connection); + // Just make sure we close the channel when the connection value is close + if (connection.equalsIgnoreCase(HttpHeaders.Values.CLOSE)) { + message.setHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, true); + } LOG.trace("Connection: {}", connection); return response; http://git-wip-us.apache.org/repos/asf/camel/blob/14ba1b38/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java index da17d25..cb92d44 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java @@ -16,7 +16,6 @@ */ package org.apache.camel.component.netty.http.handlers; -import java.net.SocketAddress; import java.net.URI; import java.nio.channels.ClosedChannelException; import java.nio.charset.Charset; @@ -27,7 +26,6 @@ import javax.security.auth.login.LoginException; import org.apache.camel.Exchange; import org.apache.camel.LoggingLevel; -import org.apache.camel.component.netty.NettyConsumer; import org.apache.camel.component.netty.NettyHelper; import org.apache.camel.component.netty.handlers.ServerChannelHandler; import org.apache.camel.component.netty.http.HttpPrincipal; @@ -38,7 +36,6 @@ import org.apache.camel.util.CamelLogger; import org.apache.camel.util.ObjectHelper; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.ChannelFutureListener; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; @@ -50,7 +47,6 @@ import org.jboss.netty.handler.codec.http.HttpResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK; @@ -58,6 +54,7 @@ import static org.jboss.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAV import static org.jboss.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED; import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; + /** * Netty HTTP {@link ServerChannelHandler} that handles the incoming HTTP requests and routes * the received message in Camel. @@ -303,18 +300,7 @@ public class HttpServerChannelHandler extends ServerChannelHandler { } } } - - @Override - protected ChannelFutureListener createResponseFutureListener(NettyConsumer consumer, Exchange exchange, SocketAddress remoteAddress) { - // make sure to close channel if not keep-alive - if (request != null && isKeepAlive(request)) { - LOG.trace("Request has Connection: keep-alive so Channel is not being closed"); - return null; - } else { - LOG.trace("Request is not Connection: close so Channel is being closed"); - return ChannelFutureListener.CLOSE; - } - } + @Override protected Object getResponseBody(Exchange exchange) throws Exception { http://git-wip-us.apache.org/repos/asf/camel/blob/14ba1b38/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java index 05f3e4d..b9368fa 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java @@ -111,7 +111,7 @@ public final class NettyHelper { public static void close(Channel channel) { if (channel != null) { LOG.trace("Closing channel: {}", channel); - channel.close(); + channel.close().syncUninterruptibly(); } } http://git-wip-us.apache.org/repos/asf/camel/blob/14ba1b38/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java index 50de736..87ad2be 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java @@ -498,8 +498,11 @@ public class NettyProducer extends DefaultAsyncProducer { public void done(boolean doneSync) { // put back in pool try { - LOG.trace("Putting channel back to pool {}", channel); - pool.returnObject(channel); + // Only put the connected channel back to the pool + if (channel.isConnected()) { + LOG.trace("Putting channel back to pool {}", channel); + pool.returnObject(channel); + } } catch (Exception e) { LOG.warn("Error returning channel to pool {}. This exception will be ignored.", channel); } finally { @@ -525,7 +528,9 @@ public class NettyProducer extends DefaultAsyncProducer { @Override public void destroyObject(Channel channel) throws Exception { LOG.trace("Destroying channel: {}", channel); - NettyHelper.close(channel); + if (channel.isOpen()) { + NettyHelper.close(channel); + } allChannels.remove(channel); } @@ -540,11 +545,13 @@ public class NettyProducer extends DefaultAsyncProducer { @Override public void activateObject(Channel channel) throws Exception { // noop + LOG.trace("activateObject channel: {} -> {}", channel); } @Override public void passivateObject(Channel channel) throws Exception { // noop + LOG.trace("passivateObject channel: {} -> {}", channel); } } http://git-wip-us.apache.org/repos/asf/camel/blob/14ba1b38/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerResponseFutureListener.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerResponseFutureListener.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerResponseFutureListener.java index 619a62e..90dc9e6 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerResponseFutureListener.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerResponseFutureListener.java @@ -61,6 +61,11 @@ public class ServerResponseFutureListener implements ChannelFutureListener { } else { close = exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class); } + + // check the setting on the exchange property + if (close == null) { + close = exchange.getProperty(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class); + } // should we disconnect, the header can override the configuration boolean disconnect = consumer.getConfiguration().isDisconnect();