This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 68fe1d415c08 CAMEL-22830: avoid storing replayId in static map (#20777)
68fe1d415c08 is described below

commit 68fe1d415c087304c732206c9bc3b3264ff98b14
Author: François de Parscau <[email protected]>
AuthorDate: Mon Jan 12 16:11:15 2026 +0100

    CAMEL-22830: avoid storing replayId in static map (#20777)
---
 .../component/salesforce/SalesforceComponent.java    |  2 +-
 .../internal/streaming/SubscriptionHelper.java       | 12 ++++++++----
 .../internal/streaming/SubscriptionHelperTest.java   | 20 ++++++++++++++------
 3 files changed, 23 insertions(+), 11 deletions(-)

diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
index 1fdd3a0f2b81..8224c736f693 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
@@ -521,7 +521,7 @@ public class SalesforceComponent extends DefaultComponent 
implements SSLContextP
         }
     }
 
-    public SubscriptionHelper getSubscriptionHelper() throws Exception {
+    public SubscriptionHelper getSubscriptionHelper() {
         if (subscriptionHelper == null) {
             // lazily create subscription helper
             subscriptionHelper = new SubscriptionHelper(this);
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index f0c22e6454f8..26de3104ce4e 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -65,7 +65,7 @@ import static org.cometd.bayeux.Message.SUBSCRIPTION_FIELD;
 
 public class SubscriptionHelper extends ServiceSupport {
 
-    static final ReplayExtension REPLAY_EXTENSION = new ReplayExtension();
+    private final ReplayExtension replayExtension = new ReplayExtension();
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SubscriptionHelper.class);
 
@@ -231,7 +231,7 @@ public class SubscriptionHelper extends ServiceSupport {
                     = 
firstConsumer.getEndpoint().getConfiguration().getFallBackReplayId();
             LOG.warn(error);
             LOG.warn("Falling back to replayId {} for channel {}", 
fallBackReplayId, channelName);
-            REPLAY_EXTENSION.setReplayId(channelName, fallBackReplayId);
+            replayExtension.setReplayId(channelName, fallBackReplayId);
             for (var consumer : consumers) {
                 subscribe(consumer);
             }
@@ -408,7 +408,7 @@ public class SubscriptionHelper extends ServiceSupport {
         BayeuxClient client = new BayeuxClient(getEndpointUrl(component), 
transport);
 
         // added eagerly to check for support during handshake
-        client.addExtension(REPLAY_EXTENSION);
+        
client.addExtension(component.getSubscriptionHelper().getReplayExtension());
 
         return client;
     }
@@ -439,6 +439,10 @@ public class SubscriptionHelper extends ServiceSupport {
         }
     }
 
+    ReplayExtension getReplayExtension() {
+        return replayExtension;
+    }
+
     private static boolean isTemporaryError(Message message) {
         String failureReason = getFailureReason(message);
         return failureReason != null && 
failureReason.startsWith(SERVER_TOO_BUSY_ERROR);
@@ -465,7 +469,7 @@ public class SubscriptionHelper extends ServiceSupport {
 
             final Long replayIdValue = replayId.get();
 
-            REPLAY_EXTENSION.setReplayIdIfAbsent(channelName, replayIdValue);
+            replayExtension.setReplayIdIfAbsent(channelName, replayIdValue);
         }
     }
 
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
index fd72d7345c86..963e9d8fbe6b 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
@@ -34,7 +34,6 @@ import org.cometd.client.BayeuxClient;
 import org.hamcrest.MatcherAssert;
 import org.junit.jupiter.api.Test;
 
-import static 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.REPLAY_EXTENSION;
 import static 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.determineReplayIdFor;
 import static org.assertj.core.api.Assertions.assertThat;
 import static 
org.cometd.client.transport.ClientTransport.MAX_NETWORK_DELAY_OPTION;
@@ -144,6 +143,8 @@ public class SubscriptionHelperTest {
         when(component.getLoginConfig()).thenReturn(loginConfig);
         when(component.getConfig()).thenReturn(endpointConfig);
         when(component.getSession()).thenReturn(session);
+        final SubscriptionHelper subscriptionHelper = new 
SubscriptionHelper(component);
+        when(component.getSubscriptionHelper()).thenReturn(subscriptionHelper);
 
         BayeuxClient bayeuxClient = SubscriptionHelper.createClient(component, 
session);
 
@@ -167,6 +168,8 @@ public class SubscriptionHelperTest {
         when(component.getLoginConfig()).thenReturn(loginConfig);
         when(component.getConfig()).thenReturn(endpointConfig);
         when(component.getSession()).thenReturn(session);
+        final SubscriptionHelper subscriptionHelper = new 
SubscriptionHelper(component);
+        when(component.getSubscriptionHelper()).thenReturn(subscriptionHelper);
 
         BayeuxClient bayeuxClient = SubscriptionHelper.createClient(component, 
session);
 
@@ -183,6 +186,8 @@ public class SubscriptionHelperTest {
         when(component.getLoginConfig()).thenReturn(new 
SalesforceLoginConfig());
         when(component.getConfig()).thenReturn(endpointConfig);
         when(component.getSession()).thenReturn(session);
+        final SubscriptionHelper subscriptionHelper = new 
SubscriptionHelper(component);
+        when(component.getSubscriptionHelper()).thenReturn(subscriptionHelper);
         var bayeuxClient = SubscriptionHelper.createClient(component, session);
 
         var longPollingTimeout = 
bayeuxClient.getTransport("long-polling").getOption(MAX_NETWORK_DELAY_OPTION);
@@ -207,20 +212,23 @@ public class SubscriptionHelperTest {
         when(endpoint.getReplayId()).thenReturn(null);
         when(endpoint.getComponent()).thenReturn(component);
         when(endpoint.getConfiguration()).thenReturn(endpointConfig);
+        final SubscriptionHelper subscriptionHelper = new 
SubscriptionHelper(component);
+        when(component.getSubscriptionHelper()).thenReturn(subscriptionHelper);
 
         assertEquals(Optional.of(2L), determineReplayIdFor(endpoint, 
"my-topic-1"),
                 "Expecting replayId for `my-topic-1` to be 2, from initial 
reply id map");
 
-        REPLAY_EXTENSION.setReplayIdIfAbsent("my-topic-1", 3L);
-        REPLAY_EXTENSION.setReplayIdIfAbsent("my-topic-1", 4L);
+        ReplayExtension replayExtension = 
component.getSubscriptionHelper().getReplayExtension();
+        replayExtension.setReplayIdIfAbsent("my-topic-1", 3L);
+        replayExtension.setReplayIdIfAbsent("my-topic-1", 4L);
 
         // should still be 3L
-        Field f = REPLAY_EXTENSION.getClass().getDeclaredField("dataMap");
-        Map m = (Map) ReflectionHelper.getField(f, REPLAY_EXTENSION);
+        Field f = replayExtension.getClass().getDeclaredField("dataMap");
+        Map m = (Map) ReflectionHelper.getField(f, replayExtension);
         assertEquals(3L, m.get("my-topic-1"));
 
         // there is some subscription error due to INVALID_REPLAY_ID_PATTERN 
so we force setting another reply id
-        REPLAY_EXTENSION.setReplayId("my-topic-1", -2L);
+        replayExtension.setReplayId("my-topic-1", -2L);
         assertEquals(-2L, m.get("my-topic-1"));
     }
 

Reply via email to