This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 86c201d0b97ba204461de565c1ef54516fc9ddc5
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Mon Apr 9 17:26:09 2018 +0200

    CAMEL-12427: camel-netty4 - Add SPI to plugin custom correlation state for 
request/reply in producer
---
 .../component/netty4/http/NettyHttpEndpoint.java   |  2 +-
 .../src/main/docs/netty4-component.adoc            |  3 +-
 .../DefaultNettyCamelStateCorrelationManager.java  | 48 ++++++++++++++++
 .../netty4/NettyCamelStateCorrelationManager.java  | 65 +++++++++++++++++++++
 .../camel/component/netty4/NettyConfiguration.java | 17 ++++++
 .../camel/component/netty4/NettyProducer.java      | 45 +++++----------
 .../netty4/handlers/ClientChannelHandler.java      | 67 +++++++++++-----------
 .../springboot/NettyComponentConfiguration.java    | 22 +++++++
 8 files changed, 205 insertions(+), 64 deletions(-)

diff --git 
a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpEndpoint.java
 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpEndpoint.java
index 320ca18..b499ff4 100644
--- 
a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpEndpoint.java
+++ 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpEndpoint.java
@@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory;
 @UriEndpoint(firstVersion = "2.14.0", scheme = "netty4-http", extendsScheme = 
"netty4", title = "Netty4 HTTP",
         syntax = "netty4-http:protocol:host:port/path", consumerClass = 
NettyHttpConsumer.class, label = "http", lenientProperties = true,
         excludeProperties = 
"textline,delimiter,autoAppendDelimiter,decoderMaxLineLength,encoding,allowDefaultCodec,udpConnectionlessSending,networkInterface"
-                + 
",clientMode,reconnect,reconnectInterval,useByteBuf,udpByteArrayCodec,broadcast")
+                + 
",clientMode,reconnect,reconnectInterval,useByteBuf,udpByteArrayCodec,broadcast,correlationManager")
 public class NettyHttpEndpoint extends NettyEndpoint implements AsyncEndpoint, 
HeaderFilterStrategyAware {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(NettyHttpEndpoint.class);
diff --git a/components/camel-netty4/src/main/docs/netty4-component.adoc 
b/components/camel-netty4/src/main/docs/netty4-component.adoc
index d1c909b..4d8c61e 100644
--- a/components/camel-netty4/src/main/docs/netty4-component.adoc
+++ b/components/camel-netty4/src/main/docs/netty4-component.adoc
@@ -100,7 +100,7 @@ with the following path and query parameters:
 |===
 
 
-==== Query Parameters (71 parameters):
+==== Query Parameters (72 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -133,6 +133,7 @@ with the following path and query parameters:
 | *connectTimeout* (producer) | Time to wait for a socket connection to be 
available. Value is in millis. | 10000 | int
 | *requestTimeout* (producer) | Allows to use a timeout for the Netty producer 
when calling a remote server. By default no timeout is in use. The value is in 
milli seconds, so eg 30000 is 30 seconds. The requestTimeout is using Netty's 
ReadTimeoutHandler to trigger the timeout. |  | long
 | *clientInitializerFactory* (producer) | To use a custom 
ClientInitializerFactory |  | ClientInitializer Factory
+| *correlationManager* (producer) | To use a custom correlation manager to 
manage how request and reply messages are mapped when using request/reply with 
the netty producer. This should only be used if you have a way to map requests 
together with replies such as if there is correlation ids in both the request 
and reply messages. This can be used if you want to multiplex concurrent 
messages on the same channel (aka connection) in netty. When doing this you 
must have a way to correlate the [...]
 | *lazyChannelCreation* (producer) | Channels can be lazily created to avoid 
exceptions, if the remote server is not up and running when the Camel producer 
is started. | true | boolean
 | *producerPoolEnabled* (producer) | Whether producer pool is enabled or not. 
Important: Do not turn this off, as the pooling is needed for handling 
concurrency and reliable request/reply. | true | boolean
 | *producerPoolMaxActive* (producer) | Sets the cap on the number of objects 
that can be allocated by the pool (checked out to clients, or idle awaiting 
checkout) at a given time. Use a negative value for no limit. | -1 | int
diff --git 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultNettyCamelStateCorrelationManager.java
 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultNettyCamelStateCorrelationManager.java
new file mode 100644
index 0000000..2a909bb
--- /dev/null
+++ 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultNettyCamelStateCorrelationManager.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+
+public class DefaultNettyCamelStateCorrelationManager implements 
NettyCamelStateCorrelationManager {
+
+    private final Map<Channel, NettyCamelState> cache = new 
ConcurrentHashMap<Channel, NettyCamelState>();
+
+    @Override
+    public void putState(Channel channel, NettyCamelState state) {
+        cache.put(channel, state);
+    }
+
+    @Override
+    public void removeState(ChannelHandlerContext ctx, Channel channel) {
+        cache.remove(channel);
+    }
+
+    @Override
+    public NettyCamelState getState(ChannelHandlerContext ctx, Channel 
channel, Object msg) {
+        return cache.get(channel);
+    }
+
+    @Override
+    public NettyCamelState getState(ChannelHandlerContext ctx, Channel 
channel, Throwable cause) {
+        return cache.get(channel);
+    }
+}
diff --git 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyCamelStateCorrelationManager.java
 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyCamelStateCorrelationManager.java
new file mode 100644
index 0000000..cd76196
--- /dev/null
+++ 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyCamelStateCorrelationManager.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+
+/**
+ * To manage and correlate state of {@link NettyCamelState} when doing 
request/reply via {@link NettyProducer}.
+ * <p/>
+ * This SPI allows custom implementations to correlate the request and replies.
+ */
+public interface NettyCamelStateCorrelationManager {
+
+    /**
+     * Puts the state.
+     * <p/>
+     * You can get access to the Camel message from the {@link 
NettyCamelState} instance.
+     *
+     * @param channel the channel
+     * @param state   the Camel state to be stored
+     */
+    void putState(Channel channel, NettyCamelState state);
+
+    /**
+     * Removes the state when the channel is inactive.
+     *
+     * @param ctx netty channel handler context
+     * @param channel the channel
+     */
+    void removeState(ChannelHandlerContext ctx, Channel channel);
+
+    /**
+     * Gets the state when a response message has been received.
+     *
+     * @param ctx netty channel handler context
+     * @param channel the channel
+     * @param msg the response message
+     */
+    NettyCamelState getState(ChannelHandlerContext ctx, Channel channel, 
Object msg);
+
+    /**
+     * Gets the state when some error occurred.
+     *
+     * @param ctx netty channel handler context
+     * @param channel the channel
+     * @param cause the error
+     */
+    NettyCamelState getState(ChannelHandlerContext ctx, Channel channel, 
Throwable cause);
+
+}
diff --git 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java
 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java
index c63ab65..98e9298 100644
--- 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java
+++ 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java
@@ -110,6 +110,8 @@ public class NettyConfiguration extends 
NettyServerBootstrapConfiguration implem
     private boolean udpByteArrayCodec;
     @UriParam(label = "common")
     private boolean reuseChannel;
+    @UriParam(label = "producer,advanced")
+    private NettyCamelStateCorrelationManager correlationManager;
 
     /**
      * Returns a copy of this configuration
@@ -655,6 +657,21 @@ public class NettyConfiguration extends 
NettyServerBootstrapConfiguration implem
         this.reuseChannel = reuseChannel;
     }
 
+    public NettyCamelStateCorrelationManager getCorrelationManager() {
+        return correlationManager;
+    }
+
+    /**
+     * To use a custom correlation manager to manage how request and reply 
messages are mapped when using request/reply with the netty producer.
+     * This should only be used if you have a way to map requests together 
with replies such as if there is correlation ids in both the request
+     * and reply messages. This can be used if you want to multiplex 
concurrent messages on the same channel (aka connection) in netty. When doing
+     * this you must have a way to correlate the request and reply messages so 
you can store the right reply on the inflight Camel Exchange before
+     * its continued routed.
+     */
+    public void setCorrelationManager(NettyCamelStateCorrelationManager 
correlationManager) {
+        this.correlationManager = correlationManager;
+    }
+
     private static <T> void addToHandlersList(List<T> configured, List<T> 
handlers, Class<T> handlerType) {
         if (handlers != null) {
             for (T handler : handlers) {
diff --git 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
index dcbcc03..203f9c0 100644
--- 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
+++ 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
@@ -18,8 +18,6 @@ package org.apache.camel.component.netty4;
 
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 
@@ -64,7 +62,7 @@ public class NettyProducer extends DefaultAsyncProducer {
     private CamelLogger noReplyLogger;
     private EventLoopGroup workerGroup;
     private ObjectPool<ChannelFuture> pool;
-    private Map<Channel, NettyCamelState> nettyCamelStatesMap = new 
ConcurrentHashMap<Channel, NettyCamelState>();
+    private NettyCamelStateCorrelationManager correlationManager;
 
     public NettyProducer(NettyEndpoint nettyEndpoint, NettyConfiguration 
configuration) {
         super(nettyEndpoint);
@@ -87,6 +85,10 @@ public class NettyProducer extends DefaultAsyncProducer {
         return context;
     }
 
+    public NettyCamelStateCorrelationManager getCorrelationManager() {
+        return correlationManager;
+    }
+
     protected boolean isTcp() {
         return configuration.getProtocol().equalsIgnoreCase("tcp");
     }
@@ -94,6 +96,13 @@ public class NettyProducer extends DefaultAsyncProducer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
+
+        if (configuration.getCorrelationManager() != null) {
+            correlationManager = configuration.getCorrelationManager();
+        } else {
+            correlationManager = new 
DefaultNettyCamelStateCorrelationManager();
+        }
+
         if (configuration.getWorkerGroup() == null) {
             // create new pool which we should shutdown when stopping as its 
not shared
             workerGroup = new NettyWorkerPoolBuilder()
@@ -301,7 +310,7 @@ public class NettyProducer extends DefaultAsyncProducer {
         }
 
         // setup state as attachment on the channel, so we can access the 
state later when needed
-        putState(channel, new NettyCamelState(producerCallback, exchange));
+        correlationManager.putState(channel, new 
NettyCamelState(producerCallback, exchange));
         // here we need to setup the remote address information here
         InetSocketAddress remoteAddress = null;
         if (!isTcp()) {
@@ -372,28 +381,6 @@ public class NettyProducer extends DefaultAsyncProducer {
         return body;
     }
 
-    /**
-     * To get the {@link NettyCamelState} from the given channel.
-     */
-    public NettyCamelState getState(Channel channel) {
-        return nettyCamelStatesMap.get(channel);
-    }
-
-    /**
-     * To remove the {@link NettyCamelState} stored on the channel,
-     * when no longer needed
-     */
-    public void removeState(Channel channel) {
-        nettyCamelStatesMap.remove(channel);
-    }
-
-    /**
-     * Put the {@link NettyCamelState} into the map use the given channel as 
the key
-     */
-    public void putState(Channel channel, NettyCamelState state) {
-        nettyCamelStatesMap.put(channel, state);
-    }
-
     protected EventLoopGroup getWorkerGroup() {
         // prefer using explicit configured thread pools
         EventLoopGroup wg = configuration.getWorkerGroup();
@@ -420,7 +407,7 @@ public class NettyProducer extends DefaultAsyncProducer {
             clientBootstrap.option(ChannelOption.SO_REUSEADDR, 
configuration.isReuseAddress());
             clientBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 
configuration.getConnectTimeout());
 
-            //TODO need to check it later
+            //TODO need to check it later;
             // set any additional netty options
             /*
             if (configuration.getOptions() != null) {
@@ -526,7 +513,6 @@ public class NettyProducer extends DefaultAsyncProducer {
         this.configuration = configuration;
     }
 
-
     public ChannelGroup getAllChannels() {
         return allChannels;
     }
@@ -656,8 +642,7 @@ public class NettyProducer extends DefaultAsyncProducer {
     }
 
     /**
-     * This class is used to release body in case when some error occured and 
body was not handed over
-     * to netty
+     * This class is used to release body in case when some error occurred and 
body was not handed over to netty
      */
     private static final class BodyReleaseCallback implements AsyncCallback {
         private volatile Object body;
diff --git 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
index b9a2a17..77f498e 100644
--- 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
+++ 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
@@ -75,8 +75,9 @@ public class ClientChannelHandler extends 
SimpleChannelInboundHandler<Object> {
             LOG.debug("Closing channel as an exception was thrown from Netty", 
cause);
         }
 
-        Exchange exchange = getExchange(ctx);
-        AsyncCallback callback = getAsyncCallback(ctx);
+        NettyCamelState state = getState(ctx, cause);
+        Exchange exchange = state != null ? state.getExchange() : null;
+        AsyncCallback callback = state != null ? state.getCallback() : null;
 
         // the state may not be set
         if (exchange != null && callback != null) {
@@ -102,35 +103,38 @@ public class ClientChannelHandler extends 
SimpleChannelInboundHandler<Object> {
             LOG.trace("Channel closed: {}", ctx.channel());
         }
 
-        Exchange exchange = getExchange(ctx);
-        AsyncCallback callback = getAsyncCallback(ctx);
+        NettyCamelState state = getState(ctx, null);
+        Exchange exchange = state != null ? state.getExchange() : null;
+        AsyncCallback callback = state != null ? state.getCallback() : null;
 
         // remove state
-        producer.removeState(ctx.channel());
+        producer.getCorrelationManager().removeState(ctx, ctx.channel());
 
         // to keep track of open sockets
         producer.getAllChannels().remove(ctx.channel());
 
-        // this channel is maybe closing graceful and the exchange is already 
done
-        // and if so we should not trigger an exception
-        boolean doneUoW = exchange.getUnitOfWork() == null;
-
-        NettyConfiguration configuration = producer.getConfiguration();
-        if (configuration.isSync() && !doneUoW && !messageReceived && 
!exceptionHandled) {
-            // To avoid call the callback.done twice
-            exceptionHandled = true;
-            // session was closed but no message received. This could be 
because the remote server had an internal error
-            // and could not return a response. We should count down to stop 
waiting for a response            
-            String address = configuration != null ? 
configuration.getAddress() : "";
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Channel closed but no message received from 
address: {}", address);
-            }
-            // don't fail the exchange if we actually specify to disconnect
-            if (!configuration.isDisconnect()) {
-                exchange.setException(new CamelExchangeException("No response 
received from remote server: " + address, exchange));
+        if (exchange != null) {
+            // this channel is maybe closing graceful and the exchange is 
already done
+            // and if so we should not trigger an exception
+            boolean doneUoW = exchange.getUnitOfWork() == null;
+
+            NettyConfiguration configuration = producer.getConfiguration();
+            if (configuration.isSync() && !doneUoW && !messageReceived && 
!exceptionHandled) {
+                // To avoid call the callback.done twice
+                exceptionHandled = true;
+                // session was closed but no message received. This could be 
because the remote server had an internal error
+                // and could not return a response. We should count down to 
stop waiting for a response
+                String address = configuration.getAddress();
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Channel closed but no message received from 
address: {}", address);
+                }
+                // don't fail the exchange if we actually specify to disconnect
+                if (!configuration.isDisconnect()) {
+                    exchange.setException(new CamelExchangeException("No 
response received from remote server: " + address, exchange));
+                }
+                // signal callback
+                callback.done(false);
             }
-            // signal callback
-            callback.done(false);
         }
         
         // make sure the event can be processed by other handlers
@@ -151,12 +155,13 @@ public class ClientChannelHandler extends 
SimpleChannelInboundHandler<Object> {
             ctx.pipeline().remove(handler);
         }
 
-        Exchange exchange = getExchange(ctx);
+        NettyCamelState state = getState(ctx, msg);
+        Exchange exchange = state != null ? state.getExchange() : null;
         if (exchange == null) {
             // we just ignore the received message as the channel is closed
             return;
         }
-        AsyncCallback callback = getAsyncCallback(ctx);
+        AsyncCallback callback = state.getCallback();
 
         Message message;
         try {
@@ -246,14 +251,12 @@ public class ClientChannelHandler extends 
SimpleChannelInboundHandler<Object> {
         }
     }
 
-    private Exchange getExchange(ChannelHandlerContext ctx) {
-        NettyCamelState state = producer.getState(ctx.channel());
-        return state != null ? state.getExchange() : null;
+    private NettyCamelState getState(ChannelHandlerContext ctx, Object msg) {
+        return producer.getCorrelationManager().getState(ctx, ctx.channel(), 
msg);
     }
 
-    private AsyncCallback getAsyncCallback(ChannelHandlerContext ctx) {
-        NettyCamelState state = producer.getState(ctx.channel());
-        return state != null ? state.getCallback() : null;
+    private NettyCamelState getState(ChannelHandlerContext ctx, Throwable 
cause) {
+        return producer.getCorrelationManager().getState(ctx, ctx.channel(), 
cause);
     }
 
 }
diff --git 
a/platforms/spring-boot/components-starter/camel-netty4-starter/src/main/java/org/apache/camel/component/netty4/springboot/NettyComponentConfiguration.java
 
b/platforms/spring-boot/components-starter/camel-netty4-starter/src/main/java/org/apache/camel/component/netty4/springboot/NettyComponentConfiguration.java
index 35704d8..0125ee4 100644
--- 
a/platforms/spring-boot/components-starter/camel-netty4-starter/src/main/java/org/apache/camel/component/netty4/springboot/NettyComponentConfiguration.java
+++ 
b/platforms/spring-boot/components-starter/camel-netty4-starter/src/main/java/org/apache/camel/component/netty4/springboot/NettyComponentConfiguration.java
@@ -27,6 +27,7 @@ import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.EventExecutorGroup;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.component.netty4.ClientInitializerFactory;
+import org.apache.camel.component.netty4.NettyCamelStateCorrelationManager;
 import org.apache.camel.component.netty4.NettyComponent;
 import org.apache.camel.component.netty4.NettyServerBootstrapFactory;
 import org.apache.camel.component.netty4.ServerInitializerFactory;
@@ -314,6 +315,18 @@ public class NettyComponentConfiguration
          */
         private Boolean reuseChannel = false;
         /**
+         * To use a custom correlation manager to manage how request and reply
+         * messages are mapped when using request/reply with the netty 
producer.
+         * This should only be used if you have a way to map requests together
+         * with replies such as if there is correlation ids in both the request
+         * and reply messages. This can be used if you want to multiplex
+         * concurrent messages on the same channel (aka connection) in netty.
+         * When doing this you must have a way to correlate the request and
+         * reply messages so you can store the right reply on the inflight 
Camel
+         * Exchange before its continued routed.
+         */
+        private NettyCamelStateCorrelationManager correlationManager;
+        /**
          * The protocol to use which can be tcp or udp.
          */
         private String protocol;
@@ -781,6 +794,15 @@ public class NettyComponentConfiguration
             this.reuseChannel = reuseChannel;
         }
 
+        public NettyCamelStateCorrelationManager getCorrelationManager() {
+            return correlationManager;
+        }
+
+        public void setCorrelationManager(
+                NettyCamelStateCorrelationManager correlationManager) {
+            this.correlationManager = correlationManager;
+        }
+
         public String getProtocol() {
             return protocol;
         }

-- 
To stop receiving notification emails like this one, please contact
davscl...@apache.org.

Reply via email to