CAMEL-7909 camel-netty-http consumer need to close the connection if the
response connection header is close
Conflicts:
components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9cf11bd5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9cf11bd5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9cf11bd5
Branch: refs/remotes/origin/camel-2.13.x
Commit: 9cf11bd58146abb4938d0b546d3308d6e086b6da
Parents: f0e3e20
Author: Willem Jiang <[email protected]>
Authored: Tue Oct 14 10:14:12 2014 +0800
Committer: Willem Jiang <[email protected]>
Committed: Fri Oct 17 11:35:34 2014 +0800
----------------------------------------------------------------------
.../netty/http/DefaultNettyHttpBinding.java | 9 +++++++--
.../http/handlers/HttpServerChannelHandler.java | 19 ++-----------------
.../camel/component/netty/NettyHelper.java | 2 +-
.../camel/component/netty/NettyProducer.java | 13 ++++++++++---
.../handlers/ServerResponseFutureListener.java | 5 +++++
5 files changed, 25 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/9cf11bd5/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
----------------------------------------------------------------------
diff --git
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
index 23cae6c..fa8fa81 100644
---
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
+++
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
@@ -34,6 +34,7 @@ import org.apache.camel.Message;
import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.TypeConverter;
+import org.apache.camel.component.netty.NettyConstants;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.IOHelper;
@@ -67,7 +68,7 @@ public class DefaultNettyHttpBinding implements
NettyHttpBinding, Cloneable {
public DefaultNettyHttpBinding(HeaderFilterStrategy headerFilterStrategy) {
this.headerFilterStrategy = headerFilterStrategy;
}
-
+
public DefaultNettyHttpBinding copy() {
try {
return (DefaultNettyHttpBinding)this.clone();
@@ -384,7 +385,7 @@ public class DefaultNettyHttpBinding implements
NettyHttpBinding, Cloneable {
if (buffer.readerIndex() == buffer.writerIndex()) {
buffer.setIndex(0, buffer.writerIndex());
}
- // TODO How to enable the chunk transport
+ // TODO How to enable the chunk transport
int len = buffer.readableBytes();
// set content-length
response.setHeader(HttpHeaders.Names.CONTENT_LENGTH, len);
@@ -411,6 +412,10 @@ public class DefaultNettyHttpBinding implements
NettyHttpBinding, Cloneable {
}
}
response.setHeader(HttpHeaders.Names.CONNECTION, connection);
+ // Just make sure we close the channel when the connection value is
close
+ if (connection.equalsIgnoreCase(HttpHeaders.Values.CLOSE)) {
+
message.setHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, true);
+ }
LOG.trace("Connection: {}", connection);
return response;
http://git-wip-us.apache.org/repos/asf/camel/blob/9cf11bd5/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java
----------------------------------------------------------------------
diff --git
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java
index 8894f6a..63edef7 100644
---
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java
+++
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java
@@ -16,7 +16,6 @@
*/
package org.apache.camel.component.netty.http.handlers;
-import java.net.SocketAddress;
import java.net.URI;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.Charset;
@@ -26,7 +25,6 @@ import javax.security.auth.login.LoginException;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
-import org.apache.camel.component.netty.NettyConsumer;
import org.apache.camel.component.netty.NettyHelper;
import org.apache.camel.component.netty.handlers.ServerChannelHandler;
import org.apache.camel.component.netty.http.HttpPrincipal;
@@ -37,7 +35,6 @@ import org.apache.camel.util.CamelLogger;
import org.apache.camel.util.ObjectHelper;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
@@ -49,8 +46,6 @@ import org.jboss.netty.handler.codec.http.HttpResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static
org.jboss.netty.handler.codec.http.HttpHeaders.is100ContinueExpected;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
import static
org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.CONTINUE;
import static
org.jboss.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
@@ -58,6 +53,7 @@ import static
org.jboss.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAV
import static
org.jboss.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
/**
* Netty HTTP {@link ServerChannelHandler} that handles the incoming HTTP
requests and routes
* the received message in Camel.
@@ -268,7 +264,7 @@ public class HttpServerChannelHandler extends
ServerChannelHandler {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent
exceptionEvent) throws Exception {
-
+
// only close if we are still allowed to run
if (consumer.isRunAllowed()) {
@@ -282,17 +278,6 @@ public class HttpServerChannelHandler extends
ServerChannelHandler {
}
}
- @Override
- protected ChannelFutureListener createResponseFutureListener(NettyConsumer
consumer, Exchange exchange, SocketAddress remoteAddress) {
- // make sure to close channel if not keep-alive
- if (request != null && isKeepAlive(request)) {
- LOG.trace("Request has Connection: keep-alive so Channel is not
being closed");
- return null;
- } else {
- LOG.trace("Request is not Connection: close so Channel is being
closed");
- return ChannelFutureListener.CLOSE;
- }
- }
@Override
protected Object getResponseBody(Exchange exchange) throws Exception {
http://git-wip-us.apache.org/repos/asf/camel/blob/9cf11bd5/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
----------------------------------------------------------------------
diff --git
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
index 05f3e4d..b9368fa 100644
---
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
+++
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyHelper.java
@@ -111,7 +111,7 @@ public final class NettyHelper {
public static void close(Channel channel) {
if (channel != null) {
LOG.trace("Closing channel: {}", channel);
- channel.close();
+ channel.close().syncUninterruptibly();
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/9cf11bd5/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
----------------------------------------------------------------------
diff --git
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
index 50de736..87ad2be 100644
---
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
+++
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
@@ -498,8 +498,11 @@ public class NettyProducer extends DefaultAsyncProducer {
public void done(boolean doneSync) {
// put back in pool
try {
- LOG.trace("Putting channel back to pool {}", channel);
- pool.returnObject(channel);
+ // Only put the connected channel back to the pool
+ if (channel.isConnected()) {
+ LOG.trace("Putting channel back to pool {}", channel);
+ pool.returnObject(channel);
+ }
} catch (Exception e) {
LOG.warn("Error returning channel to pool {}. This exception
will be ignored.", channel);
} finally {
@@ -525,7 +528,9 @@ public class NettyProducer extends DefaultAsyncProducer {
@Override
public void destroyObject(Channel channel) throws Exception {
LOG.trace("Destroying channel: {}", channel);
- NettyHelper.close(channel);
+ if (channel.isOpen()) {
+ NettyHelper.close(channel);
+ }
allChannels.remove(channel);
}
@@ -540,11 +545,13 @@ public class NettyProducer extends DefaultAsyncProducer {
@Override
public void activateObject(Channel channel) throws Exception {
// noop
+ LOG.trace("activateObject channel: {} -> {}", channel);
}
@Override
public void passivateObject(Channel channel) throws Exception {
// noop
+ LOG.trace("passivateObject channel: {} -> {}", channel);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/9cf11bd5/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerResponseFutureListener.java
----------------------------------------------------------------------
diff --git
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerResponseFutureListener.java
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerResponseFutureListener.java
index 619a62e..90dc9e6 100644
---
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerResponseFutureListener.java
+++
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerResponseFutureListener.java
@@ -61,6 +61,11 @@ public class ServerResponseFutureListener implements
ChannelFutureListener {
} else {
close =
exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE,
Boolean.class);
}
+
+ // check the setting on the exchange property
+ if (close == null) {
+ close =
exchange.getProperty(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE,
Boolean.class);
+ }
// should we disconnect, the header can override the configuration
boolean disconnect = consumer.getConfiguration().isDisconnect();