This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-3.7.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit 864c65b8b455b46f6271c5093f6c2dcf635a71db Author: k-jamroz <[email protected]> AuthorDate: Thu Feb 18 17:01:28 2021 +0100 CAMEL-16178: camel-netty should invoke callback exactly once even when there was no UnitOfWork (#5093) * CAMEL-9527: Should not log stacktrace when client has received reply - regression test * CAMEL-16178: camel-netty should invoke callback exactly once even when there was no UnitOfWork Co-authored-by: Krzysztof Jamróz <[email protected]> --- components/camel-netty/pom.xml | 5 ++ .../camel/component/netty/NettyCamelState.java | 16 +++++ .../netty/handlers/ClientChannelHandler.java | 22 +++--- .../camel/component/netty/BaseNettyTest.java | 2 +- .../netty/EnrichWithoutRestResponseTest.java | 81 ++++++++++++++++++++++ .../netty/ErrorDuringGracefullShutdownTest.java | 62 +++++++++++++++++ .../camel/component/netty/LogCaptureAppender.java | 16 ++++- .../camel/component/netty/LogCaptureTest.java | 6 +- .../src/test/resources/log4j2.properties | 2 + 9 files changed, 195 insertions(+), 17 deletions(-) diff --git a/components/camel-netty/pom.xml b/components/camel-netty/pom.xml index 6e5fbf7..7583921 100644 --- a/components/camel-netty/pom.xml +++ b/components/camel-netty/pom.xml @@ -100,6 +100,11 @@ <artifactId>junit-jupiter</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <scope>test</scope> + </dependency> <!-- logging --> <dependency> <groupId>org.apache.logging.log4j</groupId> diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java index 074dab5..389ac99 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.netty; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; @@ -30,16 +32,30 @@ public final class NettyCamelState { private final Exchange exchange; private final AsyncCallback callback; + // It is never a good idea to call the same callback twice + private final AtomicBoolean callbackCalled; public NettyCamelState(AsyncCallback callback, Exchange exchange) { this.callback = callback; this.exchange = exchange; + this.callbackCalled = new AtomicBoolean(); } public AsyncCallback getCallback() { return callback; } + public boolean isDone() { + return callbackCalled.get(); + } + + public void callbackDoneOnce(boolean doneSync) { + if (!callbackCalled.getAndSet(true)) { + // this is the first time we call the callback + callback.done(doneSync); + } + } + public Exchange getExchange() { return exchange; } diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java index ee7a43c..0000818 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java @@ -19,7 +19,6 @@ package org.apache.camel.component.netty.handlers; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; -import org.apache.camel.AsyncCallback; import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -83,10 +82,9 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> { NettyCamelState state = getState(ctx, cause); Exchange exchange = state != null ? state.getExchange() : null; - AsyncCallback callback = state != null ? state.getCallback() : null; // the state may not be set - if (exchange != null && callback != null) { + if (exchange != null) { Throwable initialCause = exchange.getException(); if (initialCause != null && initialCause.getCause() == null) { initialCause.initCause(cause); @@ -99,7 +97,7 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> { NettyHelper.close(ctx.channel()); // signal callback - callback.done(false); + state.callbackDoneOnce(false); } } @@ -111,7 +109,9 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> { NettyCamelState state = getState(ctx, null); Exchange exchange = state != null ? state.getExchange() : null; - AsyncCallback callback = state != null ? state.getCallback() : null; + // this channel is maybe closing graceful and the callback could already have been called + // and if so we should not trigger an exception nor invoke callback second time + boolean doneUoW = state != null ? state.isDone() : false; // remove state producer.getCorrelationManager().removeState(ctx, ctx.channel()); @@ -120,10 +120,6 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> { producer.getAllChannels().remove(ctx.channel()); if (exchange != null && !disconnecting) { - // this channel is maybe closing graceful and the exchange is already done - // and if so we should not trigger an exception - boolean doneUoW = exchange.getUnitOfWork() == null; - NettyConfiguration configuration = producer.getConfiguration(); if (configuration.isSync() && !doneUoW && !messageReceived && !exceptionHandled) { // To avoid call the callback.done twice @@ -140,7 +136,7 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> { new CamelExchangeException("No response received from remote server: " + address, exchange)); } // signal callback - callback.done(false); + state.callbackDoneOnce(false); } } @@ -171,14 +167,13 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> { // we just ignore the received message as the channel is closed return; } - AsyncCallback callback = state.getCallback(); Message message; try { message = getResponseMessage(exchange, ctx, msg); } catch (Exception e) { exchange.setException(e); - callback.done(false); + state.callbackDoneOnce(false); return; } @@ -225,8 +220,7 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> { NettyHelper.close(ctx.channel()); } } finally { - // signal callback - callback.done(false); + state.callbackDoneOnce(false); } } diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java index 283f9318..b98e501 100644 --- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java +++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java @@ -58,7 +58,7 @@ public class BaseNettyTest extends CamelTestSupport { System.gc(); // Kick leak detection logging ByteBufAllocator.DEFAULT.buffer(1).release(); - Collection<LogEvent> events = LogCaptureAppender.getEvents(); + Collection<LogEvent> events = LogCaptureAppender.getEvents(ResourceLeakDetector.class); if (!events.isEmpty()) { String message = "Leaks detected while running tests: " + events; // Just write the message into log to help debug diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/EnrichWithoutRestResponseTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/EnrichWithoutRestResponseTest.java new file mode 100644 index 0000000..5789360 --- /dev/null +++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/EnrichWithoutRestResponseTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelExecutionException; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; + +/** + * Regression test for CAMEL-16178 + */ +class EnrichWithoutRestResponseTest extends BaseNettyTest { + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + + @Override + public void configure() throws Exception { + // mock server - accepts connection and immediately disconnects without any response + from("netty:tcp://0.0.0.0:{{port}}?disconnect=true") + .log("Got request ${body}") + .setBody(constant(null)); + + // test routes + final String nettyClientUri + = "netty:tcp://127.0.0.1:{{port}}?textline=true&connectTimeout=1000&requestTimeout=1000"; + from("direct:reqTo") + .to(nettyClientUri); + from("direct:reqEnrich") + .enrich(nettyClientUri); + from("direct:reqEnrichShareUoW") + .enrich(nettyClientUri, new UseLatestAggregationStrategy(), true, true); + } + }; + } + + @Test + @Timeout(value = 10, unit = TimeUnit.SECONDS) + void toTest() { + assertThatExceptionOfType(CamelExecutionException.class) + .isThrownBy(() -> template.requestBody("direct:reqTo", "")) + .havingCause().withMessageContaining("No response received from remote server"); + } + + @Test + @Timeout(value = 10, unit = TimeUnit.SECONDS) + void enrichTest() { + assertThatExceptionOfType(CamelExecutionException.class) + .isThrownBy(() -> template.requestBody("direct:reqEnrich", "")) + .havingCause().withMessageContaining("No response received from remote server"); + } + + @Test + @Timeout(value = 10, unit = TimeUnit.SECONDS) + void enrichShareUoWTest() { + assertThatExceptionOfType(CamelExecutionException.class) + .isThrownBy(() -> template.requestBody("direct:reqEnrichShareUoW", "")) + .havingCause().withMessageContaining("No response received from remote server"); + } +} diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/ErrorDuringGracefullShutdownTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/ErrorDuringGracefullShutdownTest.java new file mode 100644 index 0000000..6771cc5 --- /dev/null +++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/ErrorDuringGracefullShutdownTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import org.apache.camel.RoutesBuilder; +import org.apache.camel.ServiceStatus; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.errorhandler.DefaultErrorHandler; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Regression test for CAMEL-9527 + */ +class ErrorDuringGracefullShutdownTest extends BaseNettyTest { + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + + @Override + public void configure() throws Exception { + // mock server + from("netty:tcp://0.0.0.0:{{port}}?textline=true&disconnect=false") + .log("Got request ${body}") + .setBody(constant("response")); + + from("direct:req") + .to("netty:tcp://127.0.0.1:{{port}}?textline=true"); + } + }; + } + + @Test + void shouldNotTriggerErrorDuringGracefullShutdown() throws Exception { + // given: successful request + assertThat(template.requestBody("direct:req", "test", String.class)).isEqualTo("response"); + + // when: context is closed + context().close(); + while (context.getStatus() != ServiceStatus.Stopped) { + Thread.sleep(1); + } + + // then: there should be no entries in log indicating that the callback was called twice + assertThat(LogCaptureAppender.hasEventsFor(DefaultErrorHandler.class)).isFalse(); + } +} diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureAppender.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureAppender.java index 89dce73..a5d8bfa 100644 --- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureAppender.java +++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureAppender.java @@ -20,6 +20,7 @@ import java.io.Serializable; import java.util.ArrayDeque; import java.util.Collection; import java.util.Deque; +import java.util.stream.Collectors; import org.apache.logging.log4j.core.Filter; import org.apache.logging.log4j.core.Layout; @@ -29,6 +30,7 @@ import org.apache.logging.log4j.core.config.plugins.Plugin; import org.apache.logging.log4j.core.config.plugins.PluginAttribute; import org.apache.logging.log4j.core.config.plugins.PluginElement; import org.apache.logging.log4j.core.config.plugins.PluginFactory; +import org.apache.logging.log4j.core.impl.MutableLogEvent; @Plugin(name = "LogCaptureAppender", category = "Core", elementType = "appender", printObject = true) public class LogCaptureAppender extends AbstractAppender { @@ -52,7 +54,11 @@ public class LogCaptureAppender extends AbstractAppender { @Override public void append(LogEvent logEvent) { - LOG_EVENTS.add(logEvent); + if (logEvent instanceof MutableLogEvent) { + LOG_EVENTS.add(((MutableLogEvent) logEvent).createMemento()); + } else { + LOG_EVENTS.add(logEvent); + } } public static void reset() { @@ -62,4 +68,12 @@ public class LogCaptureAppender extends AbstractAppender { public static Collection<LogEvent> getEvents() { return LOG_EVENTS; } + + public static Collection<LogEvent> getEvents(Class<?> cls) { + return LOG_EVENTS.stream().filter(e -> e.getLoggerName().equals(cls.getName())).collect(Collectors.toList()); + } + + public static boolean hasEventsFor(Class<?> cls) { + return LOG_EVENTS.stream().anyMatch(e -> e.getLoggerName().equals(cls.getName())); + } } diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureTest.java index 8b76fa1..e9656ff 100644 --- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureTest.java +++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureTest.java @@ -18,9 +18,11 @@ package org.apache.camel.component.netty; import io.netty.util.ResourceLeakDetector; import io.netty.util.internal.logging.InternalLoggerFactory; +import org.apache.camel.processor.errorhandler.DefaultErrorHandler; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * This test ensures LogCaptureAppender is configured properly @@ -29,7 +31,9 @@ public class LogCaptureTest { @Test public void testCapture() { InternalLoggerFactory.getInstance(ResourceLeakDetector.class).error("testError"); - assertFalse(LogCaptureAppender.getEvents().isEmpty()); + assertFalse(LogCaptureAppender.getEvents(ResourceLeakDetector.class).isEmpty()); + assertTrue(LogCaptureAppender.hasEventsFor(ResourceLeakDetector.class)); + assertTrue(LogCaptureAppender.getEvents(DefaultErrorHandler.class).isEmpty()); LogCaptureAppender.reset(); } } diff --git a/components/camel-netty/src/test/resources/log4j2.properties b/components/camel-netty/src/test/resources/log4j2.properties index 5005aef..1646981 100644 --- a/components/camel-netty/src/test/resources/log4j2.properties +++ b/components/camel-netty/src/test/resources/log4j2.properties @@ -30,5 +30,7 @@ appender.capture.name=capture logger.leak.name = io.netty.util.ResourceLeakDetector logger.leak.appenderRef.capture.ref = capture +logger.errorHandler.name = org.apache.camel.processor.errorhandler.DefaultErrorHandler +logger.errorHandler.appenderRef.capture.ref = capture rootLogger.level = INFO rootLogger.appenderRef.file.ref = file
