CAMEL-10238: Updated subscription helper to listen for hard disconnects, and 
reconnect from scratch, also updated error handling throwing 
SalesforceException from consumer endpoints

Conflicts:
        
components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/36f85788
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/36f85788
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/36f85788

Branch: refs/heads/camel-2.17.x
Commit: 36f857888bbfc07bdcc2908d3bcbd5df01aaa03f
Parents: 59b67d2
Author: Dhiraj Bokde <dhira...@yahoo.com>
Authored: Thu Aug 25 01:20:07 2016 -0700
Committer: Dhiraj Bokde <dhira...@yahoo.com>
Committed: Thu Aug 25 08:56:42 2016 -0700

----------------------------------------------------------------------
 .../salesforce/SalesforceConsumer.java          |   9 +-
 .../internal/streaming/SubscriptionHelper.java  | 237 +++++++++++++------
 2 files changed, 172 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/36f85788/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
index e6434a5..e5c3075 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
@@ -26,6 +26,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.component.salesforce.internal.PayloadFormat;
+import org.apache.camel.component.salesforce.api.SalesforceException;
 import org.apache.camel.component.salesforce.internal.client.DefaultRestClient;
 import org.apache.camel.component.salesforce.internal.client.RestClient;
 import 
org.apache.camel.component.salesforce.internal.streaming.PushTopicHelper;
@@ -172,7 +173,7 @@ public class SalesforceConsumer extends DefaultConsumer {
         } catch (IOException e) {
             final String msg = String.format("Error parsing message [%s] from 
Topic %s: %s",
                     message, topicName, e.getMessage());
-            handleException(msg, new RuntimeCamelException(msg, e));
+            handleException(msg, new SalesforceException(msg, e));
         }
 
         try {
@@ -186,11 +187,13 @@ public class SalesforceConsumer extends DefaultConsumer {
                 }
             });
         } catch (Exception e) {
-            handleException(String.format("Error processing %s: %s", exchange, 
e.getMessage()), e);
+            String msg = String.format("Error processing %s: %s", exchange, e);
+            handleException(msg, new SalesforceException(msg, e));
         } finally {
             Exception ex = exchange.getException();
             if (ex != null) {
-                handleException(String.format("Unhandled exception: %s", 
ex.getMessage()), ex);
+                String msg = String.format("Unhandled exception: %s", 
ex.getMessage());
+                handleException(msg, new SalesforceException(msg, ex));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/36f85788/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
----------------------------------------------------------------------
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index 1cc4a21..1e3e8a4 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -28,6 +28,7 @@ import org.apache.camel.CamelException;
 import org.apache.camel.component.salesforce.SalesforceComponent;
 import org.apache.camel.component.salesforce.SalesforceConsumer;
 import org.apache.camel.component.salesforce.SalesforceHttpClient;
+import org.apache.camel.component.salesforce.api.SalesforceException;
 import org.apache.camel.component.salesforce.internal.SalesforceSession;
 import org.apache.camel.support.ServiceSupport;
 import org.cometd.bayeux.Message;
@@ -41,6 +42,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.cometd.bayeux.Channel.META_CONNECT;
+import static org.cometd.bayeux.Channel.META_DISCONNECT;
 import static org.cometd.bayeux.Channel.META_HANDSHAKE;
 import static org.cometd.bayeux.Channel.META_SUBSCRIBE;
 import static org.cometd.bayeux.Channel.META_UNSUBSCRIBE;
@@ -54,7 +56,9 @@ public class SubscriptionHelper extends ServiceSupport {
     private static final int CONNECT_TIMEOUT = 110;
     private static final int CHANNEL_TIMEOUT = 40;
 
+    private static final String FAILURE_FIELD = "failure";
     private static final String EXCEPTION_FIELD = "exception";
+    private static final int DISCONNECT_INTERVAL = 5000;
 
     private final SalesforceComponent component;
     private final SalesforceSession session;
@@ -65,11 +69,14 @@ public class SubscriptionHelper extends ServiceSupport {
 
     private ClientSessionChannel.MessageListener handshakeListener;
     private ClientSessionChannel.MessageListener connectListener;
+    private ClientSessionChannel.MessageListener disconnectListener;
 
-    private String handshakeError;
-    private Exception handshakeException;
-    private String connectError;
-    private boolean reconnecting;
+    private volatile String handshakeError;
+    private volatile Exception handshakeException;
+    private volatile String connectError;
+    private volatile Exception connectException;
+
+    private volatile boolean reconnecting;
 
     public SubscriptionHelper(SalesforceComponent component) throws Exception {
         this.component = component;
@@ -83,6 +90,13 @@ public class SubscriptionHelper extends ServiceSupport {
 
     @Override
     protected void doStart() throws Exception {
+
+        // reset all error conditions
+        handshakeError = null;
+        handshakeException = null;
+        connectError = null;
+        connectException = null;
+
         // listener for handshake error or exception
         if (handshakeListener == null) {
             // first start
@@ -91,14 +105,9 @@ public class SubscriptionHelper extends ServiceSupport {
                     LOG.debug("[CHANNEL:META_HANDSHAKE]: {}", message);
 
                     if (!message.isSuccessful()) {
-                        String error = (String) message.get(ERROR_FIELD);
-                        if (error != null) {
-                            handshakeError = error;
-                        }
-                        Exception exception = (Exception) 
message.get(EXCEPTION_FIELD);
-                        if (exception != null) {
-                            handshakeException = exception;
-                        }
+                        LOG.warn("Handshake failure: {}", message);
+                        handshakeError = (String) message.get(ERROR_FIELD);
+                        handshakeException = getFailure(message);
                     } else if (!listenerMap.isEmpty()) {
                         reconnecting = true;
                     }
@@ -114,10 +123,22 @@ public class SubscriptionHelper extends ServiceSupport {
                     LOG.debug("[CHANNEL:META_CONNECT]: {}", message);
 
                     if (!message.isSuccessful()) {
-                        String error = (String) message.get(ERROR_FIELD);
-                        if (error != null) {
-                            connectError = error;
+
+                        LOG.warn("Connect failure: {}", message);
+                        connectError = (String) message.get(ERROR_FIELD);
+                        connectException = getFailure(message);
+
+                        if (connectError != null) {
+                            // refresh oauth token, if it's a 401 error
+                            if (connectError.startsWith("401::")) {
+                                try {
+                                    session.login(null);
+                                } catch (SalesforceException e) {
+                                    LOG.error("Error renewing OAuth token on 
Connect 401: {} ", e.getMessage(), e);
+                                }
+                            }
                         }
+
                     } else if (reconnecting) {
 
                         reconnecting = false;
@@ -131,15 +152,7 @@ public class SubscriptionHelper extends ServiceSupport {
                         for (Map.Entry<SalesforceConsumer, 
ClientSessionChannel.MessageListener> entry : map.entrySet()) {
                             final SalesforceConsumer consumer = entry.getKey();
                             final String topicName = consumer.getTopicName();
-                            try {
-                                subscribe(topicName, consumer);
-                            } catch (CamelException e) {
-                                // let the consumer handle the exception
-                                consumer.handleException(
-                                        String.format("Error refreshing 
subscription to topic [%s]: %s",
-                                                topicName, e.getMessage()),
-                                        e);
-                            }
+                            subscribe(topicName, consumer);
                         }
 
                     }
@@ -148,6 +161,84 @@ public class SubscriptionHelper extends ServiceSupport {
         }
         client.getChannel(META_CONNECT).addListener(connectListener);
 
+        // handle fatal disconnects by reconnecting asynchronously
+        if (disconnectListener == null) {
+            disconnectListener = new ClientSessionChannel.MessageListener() {
+                @Override
+                public void onMessage(ClientSessionChannel 
clientSessionChannel, Message message) {
+
+                    // launch an async task to reconnect
+                    final SalesforceHttpClient httpClient = 
component.getConfig().getHttpClient();
+
+                    httpClient.getExecutor().execute(new Runnable() {
+                        @Override
+                        public void run() {
+
+                            boolean abort = false;
+                            // wait for disconnect
+                            while (!client.isDisconnected()) {
+                                try {
+                                    Thread.sleep(DISCONNECT_INTERVAL);
+                                } catch (InterruptedException e) {
+                                    LOG.error("Aborting reconnect on 
interrupt!");
+                                    abort = true;
+                                }
+                            }
+
+                            if (!abort) {
+
+                                LOG.info("Reconnecting on unexpected 
disconnect from Salesforce...");
+                                final long backoffIncrement = 
client.getBackoffIncrement();
+                                final long maxBackoff = client.getMaxBackoff();
+
+                                long backoff = backoffIncrement;
+                                String msg = String.format("Failed to 
reconnect, exceeded maximum backoff %s msecs", maxBackoff);
+                                Exception lastError = new 
SalesforceException(msg, null);
+
+                                // retry until interrupted, or handshook or 
connect backoff exceeded
+                                while (!abort && !client.isHandshook() && 
backoff < maxBackoff) {
+
+                                    try {
+                                        // reset client
+                                        doStop();
+
+                                        // register listeners and restart
+                                        doStart();
+
+                                    } catch (Exception e) {
+                                        LOG.error("Error reconnecting to 
Salesforce: {}", e.getMessage(), e);
+                                        lastError = e;
+                                    }
+
+                                    if (!client.isHandshook()) {
+                                        LOG.debug("Pausing for {} msecs after 
reconnect failure", backoff);
+                                        try {
+                                            Thread.sleep(backoff);
+                                        } catch (InterruptedException e) {
+                                            LOG.error("Aborting reconnect on 
interrupt!");
+                                            abort = true;
+                                        }
+                                        backoff += backoffIncrement;
+                                    }
+                                }
+
+                                if (client.isHandshook()) {
+                                    LOG.info("Successfully reconnected to 
Salesforce!");
+                                } else if (!abort) {
+                                    // notify all consumers
+                                    String abortMsg = "Aborting Salesforce 
reconnect due to: " + lastError.getMessage();
+                                    for (SalesforceConsumer consumer : 
listenerMap.keySet()) {
+                                        consumer.handleException(abortMsg, new 
SalesforceException(abortMsg, lastError));
+                                    }
+                                }
+                            }
+                        }
+                    });
+                }
+            };
+        }
+        client.getChannel(META_DISCONNECT).addListener(disconnectListener);
+
         // connect to Salesforce cometd endpoint
         client.handshake();
 
@@ -159,6 +250,10 @@ public class SubscriptionHelper extends ServiceSupport {
                         handshakeException);
             } else if (handshakeError != null) {
                 throw new CamelException(String.format("Error during 
HANDSHAKE: %s", handshakeError));
+            } else if (connectException != null) {
+                throw new CamelException(
+                        String.format("Exception during CONNECT: %s", 
connectException.getMessage()),
+                        connectException);
             } else if (connectError != null) {
                 throw new CamelException(String.format("Error during CONNECT: 
%s", connectError));
             } else {
@@ -168,8 +263,20 @@ public class SubscriptionHelper extends ServiceSupport {
         }
     }
 
+    @SuppressWarnings("unchecked")
+    private Exception getFailure(Message message) {
+        Exception exception = null;
+        if (message.get(EXCEPTION_FIELD) != null) {
+            exception = (Exception) message.get(EXCEPTION_FIELD);
+        } else if (message.get(FAILURE_FIELD) != null) {
+            exception = (Exception) ((Map<String, 
Object>)message.get("failure")).get("exception");
+        }
+        return exception;
+    }
+
     @Override
     protected void doStop() throws Exception {
+        client.getChannel(META_DISCONNECT).removeListener(disconnectListener);
         client.getChannel(META_CONNECT).removeListener(connectListener);
         client.getChannel(META_HANDSHAKE).removeListener(handshakeListener);
 
@@ -198,7 +305,8 @@ public class SubscriptionHelper extends ServiceSupport {
                 super.customize(request);
 
                 // add current security token obtained from session
-                request.header(HttpHeader.AUTHORIZATION, "OAuth " + 
session.getAccessToken());
+                // replace old token
+                request.getHeaders().put(HttpHeader.AUTHORIZATION, "OAuth " + 
session.getAccessToken());
             }
         };
 
@@ -206,7 +314,7 @@ public class SubscriptionHelper extends ServiceSupport {
         return client;
     }
 
-    public void subscribe(final String topicName, final SalesforceConsumer 
consumer) throws CamelException {
+    public void subscribe(final String topicName, final SalesforceConsumer 
consumer) {
         // create subscription for consumer
         final String channelName = getChannelName(topicName);
 
@@ -225,9 +333,7 @@ public class SubscriptionHelper extends ServiceSupport {
 
         final ClientSessionChannel clientChannel = 
client.getChannel(channelName);
 
-        // listener for subscribe error
-        final CountDownLatch latch = new CountDownLatch(1);
-        final String[] subscribeError = {null};
+        // listener for subscription
         final ClientSessionChannel.MessageListener subscriptionListener = new 
ClientSessionChannel.MessageListener() {
             public void onMessage(ClientSessionChannel channel, Message 
message) {
                 LOG.debug("[CHANNEL:META_SUBSCRIBE]: {}", message);
@@ -236,45 +342,28 @@ public class SubscriptionHelper extends ServiceSupport {
 
                     if (!message.isSuccessful()) {
                         String error = (String) message.get(ERROR_FIELD);
-                        if (error != null) {
-                            subscribeError[0] = error;
+                        if (error == null) {
+                            error = "Missing error message";
                         }
+                        Exception failure = getFailure(message);
+                        String msg = String.format("Error subscribing to %s: 
%s", topicName,
+                            failure != null ? failure.getMessage() : error);
+                        consumer.handleException(msg, new 
SalesforceException(msg, failure));
                     } else {
                         // remember subscription
                         LOG.info("Subscribed to channel {}", 
subscribedChannelName);
+                        listenerMap.put(consumer, listener);
                     }
-                    latch.countDown();
+
+                    // remove this subscription listener
+                    client.getChannel(META_SUBSCRIBE).removeListener(this);
                 }
             }
         };
         client.getChannel(META_SUBSCRIBE).addListener(subscriptionListener);
 
-        try {
-            clientChannel.subscribe(listener);
-
-            // confirm that a subscription was created
-            try {
-                if (!latch.await(CHANNEL_TIMEOUT, SECONDS)) {
-                    String message;
-                    if (subscribeError[0] != null) {
-                        message = String.format("Error subscribing to topic 
%s: %s",
-                                topicName, subscribeError[0]);
-                    } else {
-                        message = String.format("Timeout error subscribing to 
topic %s after %s seconds",
-                                topicName, CHANNEL_TIMEOUT);
-                    }
-                    throw new CamelException(message);
-                }
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                // probably shutting down, so forget subscription
-            }
-
-            listenerMap.put(consumer, listener);
-
-        } finally {
-            
client.getChannel(META_SUBSCRIBE).removeListener(subscriptionListener);
-        }
+        // subscribe asynchronously
+        clientChannel.subscribe(listener);
     }
 
     private String getChannelName(String topicName) {
@@ -289,22 +378,25 @@ public class SubscriptionHelper extends ServiceSupport {
         // listen for unsubscribe error
         final CountDownLatch latch = new CountDownLatch(1);
         final String[] unsubscribeError = {null};
+        final Exception[] unsubscribeFailure = {null};
+
         final ClientSessionChannel.MessageListener unsubscribeListener = new 
ClientSessionChannel.MessageListener() {
             public void onMessage(ClientSessionChannel channel, Message 
message) {
                 LOG.debug("[CHANNEL:META_UNSUBSCRIBE]: {}", message);
-                String unsubscribedChannelName = 
message.get(SUBSCRIPTION_FIELD).toString();
-                if (channelName.equals(unsubscribedChannelName)) {
-
-                    if (!message.isSuccessful()) {
-                        String error = (String) message.get(ERROR_FIELD);
-                        if (error != null) {
-                            unsubscribeError[0] = error;
+                Object subscription = message.get(SUBSCRIPTION_FIELD);
+                if (subscription != null) {
+                    String unsubscribedChannelName = subscription.toString();
+                    if (channelName.equals(unsubscribedChannelName)) {
+
+                        if (!message.isSuccessful()) {
+                            unsubscribeError[0] = (String) 
message.get(ERROR_FIELD);
+                            unsubscribeFailure[0] = getFailure(message);
+                        } else {
+                            // forget subscription
+                            LOG.info("Unsubscribed from channel {}", 
unsubscribedChannelName);
                         }
-                    } else {
-                        // forget subscription
-                        LOG.info("Unsubscribed from channel {}", 
unsubscribedChannelName);
+                        latch.countDown();
                     }
-                    latch.countDown();
                 }
             }
         };
@@ -323,14 +415,17 @@ public class SubscriptionHelper extends ServiceSupport {
                 try {
                     if (!latch.await(CHANNEL_TIMEOUT, SECONDS)) {
                         String message;
-                        if (unsubscribeError[0] != null) {
+                        if (unsubscribeFailure[0] != null) {
+                            message = String.format("Error unsubscribing from 
topic %s: %s",
+                                topicName, unsubscribeFailure[0].getMessage());
+                        } else if (unsubscribeError[0] != null) {
                             message = String.format("Error unsubscribing from 
topic %s: %s",
                                     topicName, unsubscribeError[0]);
                         } else {
                             message = String.format("Timeout error 
unsubscribing from topic %s after %s seconds",
                                     topicName, CHANNEL_TIMEOUT);
                         }
-                        throw new CamelException(message);
+                        throw new CamelException(message, 
unsubscribeFailure[0]);
                     }
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();

Reply via email to