This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-4.8.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.8.x by this push:
new 2c5e5d46b14 CAMEL-21379: camel-salesforce: introduce delay in pub/sub
reconnect attempts
2c5e5d46b14 is described below
commit 2c5e5d46b1479958c352f095bdf7dd86998cbd7e
Author: Jeremy Ross <[email protected]>
AuthorDate: Wed Oct 23 13:07:52 2024 -0500
CAMEL-21379: camel-salesforce: introduce delay in pub/sub reconnect attempts
---
.../salesforce/internal/client/PubSubApiClient.java | 13 ++++++++++++-
.../apache/camel/component/salesforce/PubSubApiTest.java | 6 ++++--
2 files changed, 16 insertions(+), 3 deletions(-)
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
index d6caba9b317..8ba356d4814 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
@@ -80,6 +80,7 @@ public class PubSubApiClient extends ServiceSupport {
private final long backoffIncrement;
private final long maxBackoff;
+ private long reconnectDelay;
private final String pubSubHost;
private final int pubSubPort;
@@ -106,6 +107,7 @@ public class PubSubApiClient extends ServiceSupport {
this.pubSubPort = pubSubPort;
this.maxBackoff = maxBackoff;
this.backoffIncrement = backoffIncrement;
+ this.reconnectDelay = backoffIncrement;
}
public
List<org.apache.camel.component.salesforce.api.dto.pubsub.PublishResult>
publishMessage(
@@ -289,6 +291,9 @@ public class PubSubApiClient extends ServiceSupport {
@Override
public void onNext(FetchResponse fetchResponse) {
+ // reset reconnect delay in case we previously had errors
+ reconnectDelay = backoffIncrement;
+
String topic = consumer.getTopic();
LOG.debug("Received {} events on topic: {}",
fetchResponse.getEventsList().size(), topic);
@@ -339,11 +344,17 @@ public class PubSubApiClient extends ServiceSupport {
} else {
LOG.error("An unexpected error occurred.", throwable);
}
- LOG.debug("Attempting subscribe after error");
resubscribeOnError();
}
private void resubscribeOnError() {
+ try {
+ LOG.debug("Will attempt resubscribe in {} ms", reconnectDelay);
+ Thread.sleep(reconnectDelay);
+ reconnectDelay = reconnectDelay + backoffIncrement;
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
if (replayId != null) {
subscribe(consumer, ReplayPreset.CUSTOM, replayId);
} else {
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java
index 38b9533411f..04cb67b3cd5 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java
@@ -98,7 +98,7 @@ public class PubSubApiTest {
client.subscribe(consumer, ReplayPreset.CUSTOM, "initial");
verify(session, timeout(5000)).attemptLoginUntilSuccessful(anyLong(),
anyLong());
- verify(client, times(2)).subscribe(consumer, ReplayPreset.CUSTOM,
"initial");
+ verify(client, timeout(5000).times(2)).subscribe(consumer,
ReplayPreset.CUSTOM, "initial");
}
@Test
@@ -126,8 +126,10 @@ public class PubSubApiTest {
client.start();
client.subscribe(consumer, ReplayPreset.LATEST, null);
+ Thread.sleep(1000);
+
verify(session, timeout(5000)).attemptLoginUntilSuccessful(anyLong(),
anyLong());
- verify(client, times(2)).subscribe(consumer, ReplayPreset.LATEST,
null);
+ verify(client, timeout(5000).times(2)).subscribe(consumer,
ReplayPreset.LATEST, null);
}
@Test