This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-3.7.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit 1508f25e51302af1cf02434a7ca6c8d61e3d4263 Author: k-jamroz <[email protected]> AuthorDate: Thu Feb 18 17:01:40 2021 +0100 CAMEL-16227: Netty with reuseChannel invokes wrong callback (#5101) Co-authored-by: Krzysztof Jamróz <[email protected]> --- .../camel/component/netty/NettyProducer.java | 16 ++- .../netty/NettyReuseChannelCallbackTest.java | 145 +++++++++++++++++++++ 2 files changed, 160 insertions(+), 1 deletion(-) diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java index 0b74d89..ee0e0f2 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java @@ -18,6 +18,7 @@ package org.apache.camel.component.netty; import java.net.ConnectException; import java.net.InetSocketAddress; +import java.util.Optional; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; @@ -36,6 +37,7 @@ import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.util.AttributeKey; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.ImmediateEventExecutor; import org.apache.camel.AsyncCallback; @@ -60,6 +62,9 @@ public class NettyProducer extends DefaultAsyncProducer { private static final Logger LOG = LoggerFactory.getLogger(NettyProducer.class); + private static final AttributeKey<NettyCamelStateCorrelationManager> CORRELATION_MANAGER_ATTR + = AttributeKey.valueOf("NettyCamelStateCorrelationManager"); + private ChannelGroup allChannels; private CamelContext context; private NettyConfiguration configuration; @@ -269,6 +274,9 @@ public class NettyProducer extends DefaultAsyncProducer { // remember channel so we can reuse it final Channel channel = channelFuture.channel(); if (getConfiguration().isReuseChannel() && exchange.getProperty(NettyConstants.NETTY_CHANNEL) == null) { + // remember correlation manager for this channel + // for use when sending subsequent messages reusing this channel + channel.attr(CORRELATION_MANAGER_ATTR).set(correlationManager); exchange.setProperty(NettyConstants.NETTY_CHANNEL, channel); // and defer closing the channel until we are done routing the exchange exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { @@ -298,6 +306,12 @@ public class NettyProducer extends DefaultAsyncProducer { }); } + // Get appropriate correlation manager. + // If we reuse channel then get it from channel. CORRELATION_MANAGER_ATTR should be set at this point. + // Otherwise use correlation manager for this producer. + final NettyCamelStateCorrelationManager channelCorrelationManager + = Optional.ofNullable(channel.attr(CORRELATION_MANAGER_ATTR).get()).orElse(correlationManager); + if (exchange.getIn().getHeader(NettyConstants.NETTY_REQUEST_TIMEOUT) != null) { long timeoutInMs = exchange.getIn().getHeader(NettyConstants.NETTY_REQUEST_TIMEOUT, Long.class); ChannelHandler oldHandler = channel.pipeline().get("timeout"); @@ -321,7 +335,7 @@ public class NettyProducer extends DefaultAsyncProducer { } // setup state as attachment on the channel, so we can access the state later when needed - correlationManager.putState(channel, new NettyCamelState(producerCallback, exchange)); + channelCorrelationManager.putState(channel, new NettyCamelState(producerCallback, exchange)); // here we need to setup the remote address information here InetSocketAddress remoteAddress = null; if (!isTcp()) { diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseChannelCallbackTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseChannelCallbackTest.java new file mode 100644 index 0000000..412af2e --- /dev/null +++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseChannelCallbackTest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import io.netty.channel.Channel; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.NotifyBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.event.ExchangeSentEvent; +import org.apache.camel.spi.CamelEvent; +import org.apache.camel.spi.CamelEvent.ExchangeSendingEvent; +import org.apache.camel.support.EventNotifierSupport; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Regression test for CAMEL-16227 + */ +class NettyReuseChannelCallbackTest extends BaseNettyTest { + + private static final Logger LOG = LoggerFactory.getLogger(NettyReuseChannelCallbackTest.class); + + private final List<Channel> channels = new ArrayList<>(); + + @Test + void testReuse() throws Exception { + final List<Endpoint> eventEndpoints = new ArrayList<>(4); + final List<Long> times = new ArrayList<>(2); + + final EventNotifierSupport nettyEventRecorder = new EventNotifierSupport() { + @Override + public void notify(CamelEvent event) throws Exception { + if (event instanceof ExchangeSendingEvent) { + LOG.info("Got event {}", event); + add(((ExchangeSendingEvent) event).getEndpoint()); + } + if (event instanceof ExchangeSentEvent) { + LOG.info("Got event {}", event); + add(((ExchangeSentEvent) event).getEndpoint()); + if (((ExchangeSentEvent) event).getEndpoint() instanceof NettyEndpoint) { + times.add(((ExchangeSentEvent) event).getTimeTaken()); + } + } + } + + private void add(Endpoint endpoint) { + if (endpoint instanceof NettyEndpoint) { + eventEndpoints.add(endpoint); + } + } + }; + nettyEventRecorder.start(); + context.getManagementStrategy().addEventNotifier(nettyEventRecorder); + + NotifyBuilder notify = new NotifyBuilder(context).whenDone(1).create(); + + getMockEndpoint("mock:a").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:b").expectedBodiesReceived("Hello Hello World"); + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Hello Hello World"); + + template.sendBody("direct:start", "World\n"); + + assertMockEndpointsSatisfied(10, TimeUnit.SECONDS); + + assertTrue(notify.matchesWaitTime()); + + assertEquals(2, channels.size()); + assertSame(channels.get(0), channels.get(1), "Should reuse channel"); + assertFalse(channels.get(0).isOpen(), "And closed when routing done"); + assertFalse(channels.get(1).isOpen(), "And closed when routing done"); + + assertEquals(4, eventEndpoints.size(), "Should get 4 events for netty endpoints"); + assertSame(eventEndpoints.get(0), eventEndpoints.get(1), "Sending and sent event should contain the same endpoint"); + assertSame(eventEndpoints.get(2), eventEndpoints.get(3), "Sending and sent event should contain the same endpoint"); + assertEquals(2, times.size(), "Should get 2 ExchangeSent events"); + // one side effect of mixing callbacks in wrong time taken reported in event + times.forEach(time -> assertTrue(time < 900)); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // Netty URIs are slightly different (different requestTimeout) + // so there is different NettyEndpoint instance for each of them. + // This makes distinguishing events easier in test. + // If they URIs would be the same there would be one NettyEndpoint instance + // but still 2 separate NettyProducer instances. + from("direct:start") + .to("netty:tcp://localhost:{{port}}?textline=true&sync=true&reuseChannel=true&disconnect=true&requestTimeout=1000") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + Channel channel = exchange.getProperty(NettyConstants.NETTY_CHANNEL, Channel.class); + channels.add(channel); + assertTrue(channel.isActive(), "Should be active"); + } + }) + .to("mock:a") + .to("netty:tcp://localhost:{{port}}?textline=true&sync=true&reuseChannel=true&disconnect=true&requestTimeout=2000") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + Channel channel = exchange.getProperty(NettyConstants.NETTY_CHANNEL, Channel.class); + channels.add(channel); + assertTrue(channel.isActive(), "Should be active"); + } + }) + .to("mock:b"); + + from("netty:tcp://localhost:{{port}}?textline=true&sync=true") + .transform(body().prepend("Hello ")) + .delay(500) + .to("mock:result"); + } + }; + } +}
