This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-4.10.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.10.x by this push:
new e7164cb395a CAMEL-21901: camel-salesforce: Fix using fallbackReplyId
when salesfo… (#17696)
e7164cb395a is described below
commit e7164cb395acc7b3d86ece56e40872ccce63d590
Author: Claus Ibsen <[email protected]>
AuthorDate: Thu Apr 10 13:45:19 2025 +0200
CAMEL-21901: camel-salesforce: Fix using fallbackReplyId when salesfo…
(#17696)
* CAMEL-21901: camel-salesforce: Fix using fallbackReplyId when salesforce
cannot subscribe due to invalid initial replyId from a preconfigured
initialReplyIdMap.
---
.../internal/streaming/ReplayExtension.java | 8 +++--
.../internal/streaming/SubscriptionHelper.java | 4 +--
.../internal/streaming/SubscriptionHelperTest.java | 37 ++++++++++++++++++++++
3 files changed, 44 insertions(+), 5 deletions(-)
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java
index 0d2d32f012e..a43d8a8e38f 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java
@@ -36,9 +36,6 @@ import org.cometd.bayeux.client.ClientSession.Extension;
/**
* The Bayeux extension for replay
- *
- * @author hal.hildebrand
- * @since API v37.0
*/
public class ReplayExtension implements Extension {
private static final String EXTENSION_NAME = "replay";
@@ -52,6 +49,11 @@ public class ReplayExtension implements Extension {
dataMap.putIfAbsent(channelName, replayId);
}
+ public void setReplayId(final String channelName, final long replayId) {
+ // force setting with a specific value
+ dataMap.put(channelName, replayId);
+ }
+
@Override
public boolean rcv(ClientSession session, Message.Mutable message) {
Long replayId = getReplayId(message);
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index 894fcb16820..737d85327ad 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -228,10 +228,10 @@ public class SubscriptionHelper extends ServiceSupport {
} else if (error.matches(INVALID_REPLAY_ID_PATTERN)) {
abort = false;
long fallBackReplayId
- = ((SalesforceEndpoint)
firstConsumer.getEndpoint()).getConfiguration().getFallBackReplayId();
+ =
firstConsumer.getEndpoint().getConfiguration().getFallBackReplayId();
LOG.warn(error);
LOG.warn("Falling back to replayId {} for channel {}",
fallBackReplayId, channelName);
- REPLAY_EXTENSION.setReplayIdIfAbsent(channelName,
fallBackReplayId);
+ REPLAY_EXTENSION.setReplayId(channelName, fallBackReplayId);
for (var consumer : consumers) {
subscribe(consumer);
}
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
index 42d68baeee0..fd72d7345c8 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.salesforce.internal.streaming;
+import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -28,10 +29,12 @@ import
org.apache.camel.component.salesforce.SalesforceHttpClient;
import org.apache.camel.component.salesforce.SalesforceLoginConfig;
import org.apache.camel.component.salesforce.api.SalesforceException;
import org.apache.camel.component.salesforce.internal.SalesforceSession;
+import org.apache.camel.util.ReflectionHelper;
import org.cometd.client.BayeuxClient;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Test;
+import static
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.REPLAY_EXTENSION;
import static
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.determineReplayIdFor;
import static org.assertj.core.api.Assertions.assertThat;
import static
org.cometd.client.transport.ClientTransport.MAX_NETWORK_DELAY_OPTION;
@@ -187,4 +190,38 @@ public class SubscriptionHelperTest {
MatcherAssert.assertThat(longPollingTimeout,
instanceOf(Integer.class));
MatcherAssert.assertThat((Integer) longPollingTimeout,
greaterThan(110000));
}
+
+ @Test
+ public void fallbackReplyId() throws Exception {
+ final SalesforceEndpointConfig componentConfig = new
SalesforceEndpointConfig();
+ componentConfig.setFallBackReplayId(-2L);
+
+ final SalesforceEndpointConfig endpointConfig = new
SalesforceEndpointConfig();
+ endpointConfig.setDefaultReplayId(-1L);
+
endpointConfig.setInitialReplayIdMap(Collections.singletonMap("my-topic-1",
2L));
+
+ final SalesforceComponent component = mock(SalesforceComponent.class);
+ when(component.getConfig()).thenReturn(componentConfig);
+
+ final SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class);
+ when(endpoint.getReplayId()).thenReturn(null);
+ when(endpoint.getComponent()).thenReturn(component);
+ when(endpoint.getConfiguration()).thenReturn(endpointConfig);
+
+ assertEquals(Optional.of(2L), determineReplayIdFor(endpoint,
"my-topic-1"),
+ "Expecting replayId for `my-topic-1` to be 2, from initial
reply id map");
+
+ REPLAY_EXTENSION.setReplayIdIfAbsent("my-topic-1", 3L);
+ REPLAY_EXTENSION.setReplayIdIfAbsent("my-topic-1", 4L);
+
+ // should still be 3L
+ Field f = REPLAY_EXTENSION.getClass().getDeclaredField("dataMap");
+ Map m = (Map) ReflectionHelper.getField(f, REPLAY_EXTENSION);
+ assertEquals(3L, m.get("my-topic-1"));
+
+ // there is some subscription error due to INVALID_REPLAY_ID_PATTERN
so we force setting another reply id
+ REPLAY_EXTENSION.setReplayId("my-topic-1", -2L);
+ assertEquals(-2L, m.get("my-topic-1"));
+ }
+
}