This is an automated email from the ASF dual-hosted git repository.
Croway pushed a commit to branch camel-4.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.18.x by this push:
new d3b0096082cd CAMEL-23646: Fix inverted BackgroundTask return values in
sjms, pgevent and master
d3b0096082cd is described below
commit d3b0096082cd877242a7f5c2fd70f87797055406
Author: Croway <[email protected]>
AuthorDate: Fri May 29 14:27:33 2026 +0200
CAMEL-23646: Fix inverted BackgroundTask return values in sjms, pgevent and
master
The BackOff to BackgroundTask migration (CAMEL-22206) preserved the callback
return values verbatim, but the two APIs have opposite boolean semantics:
BackOff returns false to stop, BackgroundTask returns true to stop.
This caused infinite retry loops after successful recovery in all three
components.
---
.../camel/component/master/MasterConsumer.java | 2 +-
.../camel/component/pgevent/PgEventConsumer.java | 2 +-
.../consumer/SimpleMessageListenerContainer.java | 4 +-
.../component/sjms/SjmsConnectionRecoveryTest.java | 232 +++++++++++++++++++++
4 files changed, 236 insertions(+), 4 deletions(-)
diff --git
a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
index 06b94fdce31b..5dfebcb3e6de 100644
---
a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
+++
b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
@@ -203,7 +203,7 @@ public class MasterConsumer extends DefaultConsumer
implements ResumeAware<Resum
LOG.info("Leadership taken. Attempt #{} success. Consumer
started: {}", leaderTask.iteration(),
delegatedEndpoint);
- return false; // no more attempts
+ return true; // no more attempts
});
} finally {
lock.unlock();
diff --git
a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
index 960bed80f699..421a47171d8e 100644
---
a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
+++
b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
@@ -117,7 +117,7 @@ public class PgEventConsumer extends DefaultConsumer {
}
LOG.debug("Connecting successful");
}
- return false;
+ return true;
});
}
}
diff --git
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/SimpleMessageListenerContainer.java
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/SimpleMessageListenerContainer.java
index e2271079920e..3752fe636f43 100644
---
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/SimpleMessageListenerContainer.java
+++
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/SimpleMessageListenerContainer.java
@@ -205,7 +205,7 @@ public class SimpleMessageListenerContainer extends
ServiceSupport
initConsumers();
LOG.debug("Successfully recovered JMS Connection (attempt: {})",
task.iteration());
// success so do not try again
- return false;
+ return true;
} catch (Exception e) {
String message = "Failed to recover JMS Connection (attempt: " +
task.iteration() + "). Will try again in "
+ endpoint.getRecoveryInterval() + " millis";
@@ -222,7 +222,7 @@ public class SimpleMessageListenerContainer extends
ServiceSupport
recoverPool =
endpoint.getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
"SjmsConnectionRecovery");
}
- if (recoverTask == null) {
+ if (recoverTask == null || !recoverTask.isRunning()) {
recoverTask = createTask();
recoverFuture =
recoverTask.schedule(endpoint.getCamelContext(), () ->
recoverConnection(recoverTask));
}
diff --git
a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsConnectionRecoveryTest.java
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsConnectionRecoveryTest.java
new file mode 100644
index 000000000000..140735bd954f
--- /dev/null
+++
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsConnectionRecoveryTest.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import jakarta.jms.Connection;
+import jakarta.jms.ConnectionFactory;
+import jakarta.jms.ExceptionListener;
+import jakarta.jms.JMSContext;
+import jakarta.jms.JMSException;
+
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.infra.artemis.services.ArtemisContainer;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Reproduces a scenario observed with Oracle AQ where, after a brief
connectivity interruption, the SJMS consumer
+ * enters an infinite recovery loop: it successfully recreates the JMS
connection on each attempt, but immediately
+ * re-enters recovery ~recoveryInterval later, blocking normal message
consumption indefinitely.
+ * <p>
+ * Root cause: {@code SimpleMessageListenerContainer.recoverConnection()}
returns {@code false} on success, but the
+ * {@code BackgroundTask} framework interprets {@code false} as "not done,
keep scheduling." This causes the task to
+ * keep running — each iteration destroys the working connection via {@code
refreshConnection()} while
+ * {@code initConsumers()} skips re-creation (because {@code consumers !=
null}), leaving consumers attached to closed
+ * sessions.
+ * <p>
+ * Uses a real Artemis broker over TCP (via Testcontainers) so that connection
close truly kills sessions and consumers,
+ * matching the behavior of Oracle AQ and other remote JMS providers.
+ */
+public class SjmsConnectionRecoveryTest extends CamelTestSupport {
+
+ private static final String SJMS_QUEUE_NAME =
"sjms:queue:SjmsConnectionRecoveryTest";
+ private static final String MOCK_RESULT = "mock:result";
+ private static final int RECOVERY_INTERVAL_MS = 1000;
+
+ private static ArtemisContainer broker;
+ private CountingConnectionFactory countingFactory;
+
+ @BeforeAll
+ static void startBroker() {
+ broker = new ArtemisContainer();
+ broker.start();
+ }
+
+ @AfterAll
+ static void stopBroker() {
+ if (broker != null) {
+ broker.stop();
+ }
+ }
+
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext camelContext = super.createCamelContext();
+
+ ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(
+ "tcp://" + broker.getHost() + ":" +
broker.defaultAcceptorPort(),
+ broker.username(), broker.password());
+ connectionFactory.setReconnectAttempts(0);
+
+ countingFactory = new CountingConnectionFactory(connectionFactory);
+
+ SjmsComponent sjms = new SjmsComponent();
+ sjms.setConnectionFactory(countingFactory);
+ camelContext.addComponent("sjms", sjms);
+
+ return camelContext;
+ }
+
+ /**
+ * Verifies that after a transient JMS connection interruption, recovery
succeeds once and the recovery loop stops —
+ * the consumer does not re-enter recovery again.
+ * <p>
+ * This directly reproduces the customer scenario where logs show:
+ *
+ * <pre>
+ * Recovering from JMS Connection exception (attempt: 125)
+ * Created JMS Connection
+ * Successfully recovered JMS Connection (attempt: 125)
+ *
+ * Recovering from JMS Connection exception (attempt: 126)
+ * Created JMS Connection
+ * Successfully recovered JMS Connection (attempt: 126)
+ * ... repeats indefinitely ...
+ * </pre>
+ *
+ * The test uses a {@link CountingConnectionFactory} to track how many JMS
connections are created during recovery.
+ * After triggering the connection exception, it waits for multiple
recovery intervals and asserts:
+ * <ul>
+ * <li>Exactly one new connection was created (recovery happened once and
stopped)</li>
+ * <li>Messages are consumed normally after recovery</li>
+ * </ul>
+ * <p>
+ * With the bug: multiple connections are created (recovery loops
indefinitely), and messages sent after the
+ * recovery window are never consumed because consumers are attached to
closed sessions.
+ * <p>
+ * With the fix: exactly one connection is created, the recovery task
stops, consumers remain alive, and messages
+ * are consumed normally.
+ */
+ @Test
+ public void testRecoveryStopsAfterSuccessfulReconnection() throws
Exception {
+ MockEndpoint mock = getMockEndpoint(MOCK_RESULT);
+
+ // Phase 1: verify normal consumption (also confirms consumer is fully
started)
+ mock.expectedMessageCount(1);
+ template.sendBody(SJMS_QUEUE_NAME, "before-failure");
+ await().atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() -> mock.assertIsSatisfied());
+ mock.reset();
+
+ // Phase 2: simulate a transient JMS connection exception.
+ // Record connection count before triggering the exception.
+ int connectionsBefore = countingFactory.createCount.get();
+ SjmsConsumer sjmsConsumer = (SjmsConsumer)
context.getRoute("recovery-route").getConsumer();
+ Field containerField =
SjmsConsumer.class.getDeclaredField("listenerContainer");
+ containerField.setAccessible(true);
+ ExceptionListener container = (ExceptionListener)
containerField.get(sjmsConsumer);
+ container.onException(new JMSException("Simulated transient connection
failure"));
+
+ // Phase 3: wait for recovery to complete and for multiple recovery
intervals to pass.
+ // With recoveryInterval=1000ms and BackgroundTask initial delay=1s:
+ // t=0s: onException fires, consumers/sessions nullified, recovery
scheduled
+ // t=1s: recovery iteration 1 — creates new connection + consumers
+ // t=2s: BUG: iteration 2 — creates another connection, skips
consumer re-creation
+ // t=3s: BUG: iteration 3 — creates yet another connection
+ // By t=4s, if the bug is present, 3+ connections were created.
+ await().pollDelay(4 * RECOVERY_INTERVAL_MS, TimeUnit.MILLISECONDS)
+ .atMost(5 * RECOVERY_INTERVAL_MS, TimeUnit.MILLISECONDS)
+ .until(() -> true);
+
+ // Phase 4: assert recovery happened exactly once and stopped.
+ int connectionsAfter = countingFactory.createCount.get();
+ int recoveryConnections = connectionsAfter - connectionsBefore;
+ assertEquals(1, recoveryConnections,
+ "Recovery should create exactly one new connection and stop, "
+ + "but created " +
recoveryConnections
+ + " — the recovery loop did not
stop after successful reconnection");
+
+ // Phase 5: verify messages are consumed after recovery.
+ mock.expectedMessageCount(1);
+ template.sendBody(SJMS_QUEUE_NAME, "after-failure");
+ await().atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() -> mock.assertIsSatisfied());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from(SJMS_QUEUE_NAME
+ + "?acknowledgementMode=CLIENT_ACKNOWLEDGE"
+ + "&asyncStartListener=true"
+ + "&concurrentConsumers=5"
+ + "&recoveryInterval=" + RECOVERY_INTERVAL_MS)
+ .routeId("recovery-route")
+ .to(MOCK_RESULT);
+ }
+ };
+ }
+
+ /**
+ * A ConnectionFactory wrapper that counts how many JMS connections are
created. Used to verify that the recovery
+ * mechanism creates exactly one new connection and stops, rather than
looping indefinitely.
+ */
+ static class CountingConnectionFactory implements ConnectionFactory {
+ private final ConnectionFactory delegate;
+ final AtomicInteger createCount = new AtomicInteger();
+
+ CountingConnectionFactory(ConnectionFactory delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public Connection createConnection() throws JMSException {
+ createCount.incrementAndGet();
+ return delegate.createConnection();
+ }
+
+ @Override
+ public Connection createConnection(String userName, String password)
throws JMSException {
+ createCount.incrementAndGet();
+ return delegate.createConnection(userName, password);
+ }
+
+ @Override
+ public JMSContext createContext() {
+ return delegate.createContext();
+ }
+
+ @Override
+ public JMSContext createContext(String userName, String password) {
+ return delegate.createContext(userName, password);
+ }
+
+ @Override
+ public JMSContext createContext(String userName, String password, int
sessionMode) {
+ return delegate.createContext(userName, password, sessionMode);
+ }
+
+ @Override
+ public JMSContext createContext(int sessionMode) {
+ return delegate.createContext(sessionMode);
+ }
+ }
+}