This is an automated email from the ASF dual-hosted git repository.

zregvart pushed a commit to branch issue/CAMEL-12871
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 7808691aa251e7359abb97344df9c50b91d1245a
Author: Zoran Regvart <zregv...@apache.org>
AuthorDate: Wed Dec 16 10:15:31 2020 +0100

    CAMEL-12871: release resources on stop (WIP)
    
    When SubscriptionHelper is stopped we need to remove all listeners and
    close channels this helper is listening on.
---
 .../internal/streaming/SubscriptionHelper.java     | 161 +++++++++++++--------
 1 file changed, 103 insertions(+), 58 deletions(-)

diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index 625b2f2..c97fb02 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -22,6 +22,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Stream;
 
@@ -36,6 +37,7 @@ import 
org.apache.camel.component.salesforce.internal.SalesforceSession;
 import org.apache.camel.support.service.ServiceSupport;
 import org.cometd.bayeux.Message;
 import org.cometd.bayeux.client.ClientSessionChannel;
+import org.cometd.bayeux.client.ClientSessionChannel.MessageListener;
 import org.cometd.client.BayeuxClient;
 import org.cometd.client.BayeuxClient.State;
 import org.cometd.client.transport.ClientTransport;
@@ -92,6 +94,7 @@ public class SubscriptionHelper extends ServiceSupport {
 
     private volatile boolean reconnecting;
     private final AtomicLong restartBackoff;
+    private final AtomicBoolean restarting = new AtomicBoolean();
 
     public SubscriptionHelper(final SalesforceComponent component) throws 
SalesforceException {
         this.component = component;
@@ -172,6 +175,7 @@ public class SubscriptionHelper extends ServiceSupport {
                         connectError = (String) message.get(ERROR_FIELD);
                         connectException = getFailure(message);
 
+                        client.disconnect();
                     } else if (reconnecting) {
 
                         reconnecting = false;
@@ -205,6 +209,10 @@ public class SubscriptionHelper extends ServiceSupport {
         }
         client.getChannel(META_DISCONNECT).addListener(disconnectListener);
 
+        connect();
+    }
+
+    private void connect() throws CamelException {
         // connect to Salesforce cometd endpoint
         client.handshake();
 
@@ -228,80 +236,95 @@ public class SubscriptionHelper extends ServiceSupport {
 
     // launch an async task to restart
     private void restartClient() {
+        if (!restarting.compareAndSet(false, true)) {
+            return;
+        }
 
         // launch a new restart command
         final SalesforceHttpClient httpClient = 
component.getConfig().getHttpClient();
         httpClient.getExecutor().execute(new Runnable() {
             @Override
             public void run() {
+                try {
+                    performClientRestart();
+                } finally {
+                    restarting.set(false);
+                }
+            }
+        });
+    }
 
-                LOG.info("Restarting on unexpected disconnect from 
Salesforce...");
-                boolean abort = false;
+    private void performClientRestart() {
+        if (isStoppingOrStopped()) {
+            return;
+        }
 
-                // wait for disconnect
-                LOG.debug("Waiting to disconnect...");
-                while (!client.isDisconnected()) {
-                    try {
-                        Thread.sleep(DISCONNECT_INTERVAL);
-                    } catch (InterruptedException e) {
-                        LOG.error("Aborting restart on interrupt!");
-                        abort = true;
-                    }
-                }
+        LOG.info("Restarting on unexpected disconnect from Salesforce...");
+        boolean abort = false;
+
+        // wait for disconnect
+        LOG.debug("Waiting to disconnect...");
+        while (!abort && !client.isDisconnected()) {
+            try {
+                Thread.sleep(DISCONNECT_INTERVAL);
+            } catch (InterruptedException e) {
+                LOG.error("Aborting restart on interrupt!");
+                abort = true;
+            }
 
-                if (!abort) {
+            abort = isStoppingOrStopped();
+        }
 
-                    // update restart attempt backoff
-                    final long backoff = 
restartBackoff.getAndAdd(backoffIncrement);
-                    if (backoff > maxBackoff) {
-                        LOG.error("Restart aborted after exceeding {} msecs 
backoff", maxBackoff);
-                        abort = true;
-                    } else {
+        if (!abort) {
 
-                        // pause before restart attempt
-                        LOG.debug("Pausing for {} msecs before restart 
attempt", backoff);
-                        try {
-                            Thread.sleep(backoff);
-                        } catch (InterruptedException e) {
-                            LOG.error("Aborting restart on interrupt!");
-                            abort = true;
-                        }
-                    }
+            // update restart attempt backoff
+            final long backoff = restartBackoff.getAndAdd(backoffIncrement);
+            if (backoff > maxBackoff) {
+                LOG.error("Restart aborted after exceeding {} msecs backoff", 
maxBackoff);
+                abort = true;
+            } else {
 
-                    if (!abort) {
-                        Exception lastError = new SalesforceException("Unknown 
error", null);
-                        try {
-                            // reset client
-                            doStop();
+                // pause before restart attempt
+                LOG.debug("Pausing for {} msecs before restart attempt", 
backoff);
+                try {
+                    Thread.sleep(backoff);
+                } catch (InterruptedException e) {
+                    LOG.error("Aborting restart on interrupt!");
+                    abort = true;
+                }
+            }
 
-                            // register listeners and restart
-                            doStart();
+            if (!abort) {
+                Exception lastError = new SalesforceException("Unknown error", 
null);
+                try {
+                    // reset client
+                    doStop();
 
-                        } catch (Exception e) {
-                            LOG.error("Error restarting: " + e.getMessage(), 
e);
-                            lastError = e;
-                        }
+                    // register listeners and restart
+                    doStart();
 
-                        if (client != null && client.isHandshook()) {
-                            LOG.info("Successfully restarted!");
-                            // reset backoff interval
-                            restartBackoff.set(client.getBackoffIncrement());
-                        } else {
-                            LOG.error("Failed to restart after pausing for {} 
msecs", backoff);
-                            if ((backoff + backoffIncrement) > maxBackoff) {
-                                // notify all consumers
-                                String abortMsg = "Aborting restart attempt 
due to: " + lastError.getMessage();
-                                SalesforceException ex = new 
SalesforceException(abortMsg, lastError);
-                                for (SalesforceConsumer consumer : 
listenerMap.keySet()) {
-                                    consumer.handleException(abortMsg, ex);
-                                }
-                            }
+                } catch (Exception e) {
+                    LOG.error("Error restarting: " + e.getMessage(), e);
+                    lastError = e;
+                }
+
+                if (client != null && client.isHandshook()) {
+                    LOG.info("Successfully restarted!");
+                    // reset backoff interval
+                    restartBackoff.set(client.getBackoffIncrement());
+                } else {
+                    LOG.error("Failed to restart after pausing for {} msecs", 
backoff);
+                    if ((backoff + backoffIncrement) > maxBackoff) {
+                        // notify all consumers
+                        String abortMsg = "Aborting restart attempt due to: " 
+ lastError.getMessage();
+                        SalesforceException ex = new 
SalesforceException(abortMsg, lastError);
+                        for (SalesforceConsumer consumer : 
listenerMap.keySet()) {
+                            consumer.handleException(abortMsg, ex);
                         }
                     }
                 }
-
             }
-        });
+        }
     }
 
     @SuppressWarnings("unchecked")
@@ -320,11 +343,33 @@ public class SubscriptionHelper extends ServiceSupport {
         return exception;
     }
 
+    private void closeChannel(final String name, MessageListener listener) {
+        if (client == null) {
+            return;
+        }
+
+        final ClientSessionChannel channel = client.getChannel(name);
+        channel.removeListener(listener);
+        channel.release();
+    }
+
     @Override
     protected void doStop() throws Exception {
-        client.getChannel(META_DISCONNECT).removeListener(disconnectListener);
-        client.getChannel(META_CONNECT).removeListener(connectListener);
-        client.getChannel(META_HANDSHAKE).removeListener(handshakeListener);
+        closeChannel(META_DISCONNECT, disconnectListener);
+        closeChannel(META_CONNECT, connectListener);
+        closeChannel(META_HANDSHAKE, handshakeListener);
+
+        for (Map.Entry<SalesforceConsumer, MessageListener> entry : 
listenerMap.entrySet()) {
+            final SalesforceConsumer consumer = entry.getKey();
+            final String topic = consumer.getTopicName();
+
+            final MessageListener listener = entry.getValue();
+            closeChannel(getChannelName(topic), listener);
+        }
+
+        if (client == null) {
+            return;
+        }
 
         client.disconnect();
         boolean disconnected = client.waitFor(timeout, State.DISCONNECTED);

Reply via email to