This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.7.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 1508f25e51302af1cf02434a7ca6c8d61e3d4263
Author: k-jamroz <[email protected]>
AuthorDate: Thu Feb 18 17:01:40 2021 +0100

    CAMEL-16227: Netty with reuseChannel invokes wrong callback (#5101)
    
    Co-authored-by: Krzysztof Jamróz <[email protected]>
---
 .../camel/component/netty/NettyProducer.java       |  16 ++-
 .../netty/NettyReuseChannelCallbackTest.java       | 145 +++++++++++++++++++++
 2 files changed, 160 insertions(+), 1 deletion(-)

diff --git 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
index 0b74d89..ee0e0f2 100644
--- 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
+++ 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.netty;
 
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
+import java.util.Optional;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 
@@ -36,6 +37,7 @@ import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.channel.socket.nio.NioDatagramChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.handler.timeout.ReadTimeoutHandler;
+import io.netty.util.AttributeKey;
 import io.netty.util.ReferenceCountUtil;
 import io.netty.util.concurrent.ImmediateEventExecutor;
 import org.apache.camel.AsyncCallback;
@@ -60,6 +62,9 @@ public class NettyProducer extends DefaultAsyncProducer {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(NettyProducer.class);
 
+    private static final AttributeKey<NettyCamelStateCorrelationManager> 
CORRELATION_MANAGER_ATTR
+            = AttributeKey.valueOf("NettyCamelStateCorrelationManager");
+
     private ChannelGroup allChannels;
     private CamelContext context;
     private NettyConfiguration configuration;
@@ -269,6 +274,9 @@ public class NettyProducer extends DefaultAsyncProducer {
         // remember channel so we can reuse it
         final Channel channel = channelFuture.channel();
         if (getConfiguration().isReuseChannel() && 
exchange.getProperty(NettyConstants.NETTY_CHANNEL) == null) {
+            // remember correlation manager for this channel
+            // for use when sending subsequent messages reusing this channel
+            channel.attr(CORRELATION_MANAGER_ATTR).set(correlationManager);
             exchange.setProperty(NettyConstants.NETTY_CHANNEL, channel);
             // and defer closing the channel until we are done routing the 
exchange
             exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
SynchronizationAdapter() {
@@ -298,6 +306,12 @@ public class NettyProducer extends DefaultAsyncProducer {
             });
         }
 
+        // Get appropriate correlation manager.
+        // If we reuse channel then get it from channel. 
CORRELATION_MANAGER_ATTR should be set at this point.
+        // Otherwise use correlation manager for this producer.
+        final NettyCamelStateCorrelationManager channelCorrelationManager
+                = 
Optional.ofNullable(channel.attr(CORRELATION_MANAGER_ATTR).get()).orElse(correlationManager);
+
         if (exchange.getIn().getHeader(NettyConstants.NETTY_REQUEST_TIMEOUT) 
!= null) {
             long timeoutInMs = 
exchange.getIn().getHeader(NettyConstants.NETTY_REQUEST_TIMEOUT, Long.class);
             ChannelHandler oldHandler = channel.pipeline().get("timeout");
@@ -321,7 +335,7 @@ public class NettyProducer extends DefaultAsyncProducer {
         }
 
         // setup state as attachment on the channel, so we can access the 
state later when needed
-        correlationManager.putState(channel, new 
NettyCamelState(producerCallback, exchange));
+        channelCorrelationManager.putState(channel, new 
NettyCamelState(producerCallback, exchange));
         // here we need to setup the remote address information here
         InetSocketAddress remoteAddress = null;
         if (!isTcp()) {
diff --git 
a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseChannelCallbackTest.java
 
b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseChannelCallbackTest.java
new file mode 100644
index 0000000..412af2e
--- /dev/null
+++ 
b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseChannelCallbackTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.channel.Channel;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.event.ExchangeSentEvent;
+import org.apache.camel.spi.CamelEvent;
+import org.apache.camel.spi.CamelEvent.ExchangeSendingEvent;
+import org.apache.camel.support.EventNotifierSupport;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Regression test for CAMEL-16227
+ */
+class NettyReuseChannelCallbackTest extends BaseNettyTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(NettyReuseChannelCallbackTest.class);
+
+    private final List<Channel> channels = new ArrayList<>();
+
+    @Test
+    void testReuse() throws Exception {
+        final List<Endpoint> eventEndpoints = new ArrayList<>(4);
+        final List<Long> times = new ArrayList<>(2);
+
+        final EventNotifierSupport nettyEventRecorder = new 
EventNotifierSupport() {
+            @Override
+            public void notify(CamelEvent event) throws Exception {
+                if (event instanceof ExchangeSendingEvent) {
+                    LOG.info("Got event {}", event);
+                    add(((ExchangeSendingEvent) event).getEndpoint());
+                }
+                if (event instanceof ExchangeSentEvent) {
+                    LOG.info("Got event {}", event);
+                    add(((ExchangeSentEvent) event).getEndpoint());
+                    if (((ExchangeSentEvent) event).getEndpoint() instanceof 
NettyEndpoint) {
+                        times.add(((ExchangeSentEvent) event).getTimeTaken());
+                    }
+                }
+            }
+
+            private void add(Endpoint endpoint) {
+                if (endpoint instanceof NettyEndpoint) {
+                    eventEndpoints.add(endpoint);
+                }
+            }
+        };
+        nettyEventRecorder.start();
+        context.getManagementStrategy().addEventNotifier(nettyEventRecorder);
+
+        NotifyBuilder notify = new NotifyBuilder(context).whenDone(1).create();
+
+        getMockEndpoint("mock:a").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:b").expectedBodiesReceived("Hello Hello World");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", 
"Hello Hello World");
+
+        template.sendBody("direct:start", "World\n");
+
+        assertMockEndpointsSatisfied(10, TimeUnit.SECONDS);
+
+        assertTrue(notify.matchesWaitTime());
+
+        assertEquals(2, channels.size());
+        assertSame(channels.get(0), channels.get(1), "Should reuse channel");
+        assertFalse(channels.get(0).isOpen(), "And closed when routing done");
+        assertFalse(channels.get(1).isOpen(), "And closed when routing done");
+
+        assertEquals(4, eventEndpoints.size(), "Should get 4 events for netty 
endpoints");
+        assertSame(eventEndpoints.get(0), eventEndpoints.get(1), "Sending and 
sent event should contain the same endpoint");
+        assertSame(eventEndpoints.get(2), eventEndpoints.get(3), "Sending and 
sent event should contain the same endpoint");
+        assertEquals(2, times.size(), "Should get 2 ExchangeSent events");
+        // one side effect of mixing callbacks in wrong time taken reported in 
event
+        times.forEach(time -> assertTrue(time < 900));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // Netty URIs are slightly different (different requestTimeout)
+                // so there is different NettyEndpoint instance for each of 
them.
+                // This makes distinguishing events easier in test.
+                // If they URIs would be the same there would be one 
NettyEndpoint instance
+                // but still 2 separate NettyProducer instances.
+                from("direct:start")
+                        
.to("netty:tcp://localhost:{{port}}?textline=true&sync=true&reuseChannel=true&disconnect=true&requestTimeout=1000")
+                        .process(new Processor() {
+                            @Override
+                            public void process(Exchange exchange) throws 
Exception {
+                                Channel channel = 
exchange.getProperty(NettyConstants.NETTY_CHANNEL, Channel.class);
+                                channels.add(channel);
+                                assertTrue(channel.isActive(), "Should be 
active");
+                            }
+                        })
+                        .to("mock:a")
+                        
.to("netty:tcp://localhost:{{port}}?textline=true&sync=true&reuseChannel=true&disconnect=true&requestTimeout=2000")
+                        .process(new Processor() {
+                            @Override
+                            public void process(Exchange exchange) throws 
Exception {
+                                Channel channel = 
exchange.getProperty(NettyConstants.NETTY_CHANNEL, Channel.class);
+                                channels.add(channel);
+                                assertTrue(channel.isActive(), "Should be 
active");
+                            }
+                        })
+                        .to("mock:b");
+
+                from("netty:tcp://localhost:{{port}}?textline=true&sync=true")
+                        .transform(body().prepend("Hello "))
+                        .delay(500)
+                        .to("mock:result");
+            }
+        };
+    }
+}

Reply via email to