[ 
https://issues.apache.org/jira/browse/CAMEL-10238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15436518#comment-15436518
 ] 

Dhiraj Bokde edited comment on CAMEL-10238 at 8/25/16 8:51 AM:
---------------------------------------------------------------

Rajesh, I have pushed the fixes to master branch. Please test in your 
environment and let me know whether it fixes your issue. 

Also, note the following changes in behavior:
* Since subscriptions are created and handled asynchronously, the consumer 
throws {{SalesforceException}} when it is unable to subscribe for any reason. 
The error message will be {{Error subscribing to <topicName>: <reason>}}. The 
consumer won't attempt to re-subscribe and will not receive any messages from 
Salesforce after this error. The underlying cause has to be fixed and the 
consumer/route restarted to attempt to subscribe to the channel again. 
* For hard disconnects, it will try to reconnect with a pause between attempts 
based on backoff options in CometD, which are 1 second and 30 seconds for 
increment and max respectively. If needed CometD options could be made 
configurable in the future to make the backoff customizable. 
* If it can't reconnect after the pause between attempts exceeds max backoff, 
all consumers will throw a SalesforceException with a message {{Aborting 
Salesforce reconnect due to: <reason>}}. At which point, all consumers are 
again disconnected and the component will have to be restarted to try to 
reconnect to Salesforce. 

Hopefully this makes the consumer more robust and the error handling intuitive. 
Let me know if you have any feedback or other ideas. 


was (Author: dhirajsb):
Rajesh, I have pushed the fixes to master branch. Please test in your 
environment and let me know whether it fixes your issue. 

Also, note the following changes in behavior:
* Since subscriptions are created and handled asynchronously, the consumer 
throws {{SalesforceException}} when it is unable to subscribe for any reason. 
The error message will be {{Error subscribing to <topicName>: <reason>}}. The 
consumer won't attempt to re-subscribe and will not receive any messages from 
Salesforce after this error. The underlying cause has to be fixed and the 
consumer/route restarted to attempt to subscribe to the channel again. 
* For hard disconnects, it will try to reconnect with a pause between attempts 
based on backoff options in CometD, which are 1 second and 30 seconds for 
increment and max respectively. If needed CometD options could be made 
configurable in the future to make the backoff customizable. 
* If it can't reconnect after the pause between attempts exceeds max backoff, 
all consumers will throw a SalesforceException with a message {{Aborting 
Salesforce reconnect due to: <reason>}}. At which point, all consumers are 
again disconnected and consumers/routes will have to be restarted to try to 
reconnect to Salesforce. 

Hopefully this makes the consumer more robust and the error handling intuitive. 
Let me know if you have any feedback or other ideas. 

> Camel-salesforce component never tries to reconnect after a disconnect
> ----------------------------------------------------------------------
>
>                 Key: CAMEL-10238
>                 URL: https://issues.apache.org/jira/browse/CAMEL-10238
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-salesforce
>    Affects Versions: 2.17.2, 2.17.3, 2.18.0
>            Reporter: Rajesh A
>            Assignee: Dhiraj Bokde
>
> My connection to salesforce-streaming api gets disconnect automatically after 
> 2 hours. This is because salesforce automatically disconnects the connection 
> from server side. However, I was expecting camel-salesforce component to 
> reconnect automatically after disconnect. But, it does not reconnect and I do 
> not have a  hold or a way to reconnect. Seems to be a defect and a blocker to 
> me.
> Here is the trace log
> {code}
> [36mo.a.c.c.s.i.s.SubscriptionHelper        [0;39m [2m:[0;39m 
> [CHANNEL:META_CONNECT]: {clientId=3u9riwg6ag3r5dd3ay86i444f, 
> channel=/meta/connect, id=84, successful=true}
> [36morg.cometd.client.BayeuxClient          [0;39m [2m:[0;39m Connecting, 
> transport 
> org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper$3@4e0cc334
> [36morg.cometd.client.BayeuxClient          [0;39m [2m:[0;39m Sending 
> messages [{clientId=3u9riwg6ag3r5dd3ay86i444f, channel=/meta/connect, id=85, 
> connectionType=long-polling}]
> [36mo.a.c.c.s.i.s.SubscriptionHelper$3      [0;39m [2m:[0;39m Received 
> messages [{clientId=3u9riwg6ag3r5dd3ay86i444f, channel=/meta/connect, id=85, 
> successful=true}]
> [36morg.cometd.client.BayeuxClient          [0;39m [2m:[0;39m Processing 
> /meta/connect {clientId=3u9riwg6ag3r5dd3ay86i444f, channel=/meta/connect, 
> id=85, successful=true}
> [36morg.cometd.client.BayeuxClient          [0;39m [2m:[0;39m State update: 
> CONNECTED -> CONNECTED
> [36mo.a.c.c.s.i.s.SubscriptionHelper        [0;39m [2m:[0;39m 
> [CHANNEL:META_CONNECT]: {clientId=3u9riwg6ag3r5dd3ay86i444f, 
> channel=/meta/connect, id=85, successful=true}
> [36morg.cometd.client.BayeuxClient          [0;39m [2m:[0;39m Connecting, 
> transport 
> org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper$3@4e0cc334
> [36morg.cometd.client.BayeuxClient          [0;39m [2m:[0;39m Sending 
> messages [{clientId=3u9riwg6ag3r5dd3ay86i444f, channel=/meta/connect, id=86, 
> connectionType=long-polling}]
> [36morg.cometd.client.BayeuxClient          [0;39m [2m:[0;39m State update: 
> CONNECTED -> UNCONNECTED
> [36morg.cometd.client.BayeuxClient          [0;39m [2m:[0;39m Messages failed 
> [{clientId=3u9riwg6ag3r5dd3ay86i444f, channel=/meta/connect, id=86, 
> connectionType=long-polling}]
> java.io.EOFException: HttpConnectionOverHTTP@12f0e719(l:/10.172.131.200:50574 
> <-> 
> r:my-proxy.com/x.x.x.x:xx,closed=false)[HttpChannelOverHTTP@2a4927(exchange=HttpExchange@6ae1ae35
>  req=TERMINATED/null@null 
> res=PENDING/null@null)[send=HttpSenderOverHTTP@51fea1e0(req=QUEUED,snd=COMPLETED,failure=null)[HttpGenerator{s=START}],recv=HttpReceiverOverHTTP@44234ce9(rsp=IDLE,failure=null)[HttpParser{s=CLOSED,0
>  of -1}]]]
>       at 
> org.eclipse.jetty.client.http.HttpReceiverOverHTTP.earlyEOF(HttpReceiverOverHTTP.java:277)
>  [jetty-client-9.2.14.v20151106.jar:9.2.14.v20151106]
>       at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:1309) 
> [jetty-http-9.2.14.v20151106.jar:9.2.14.v20151106]
>       at 
> org.eclipse.jetty.client.http.HttpReceiverOverHTTP.shutdown(HttpReceiverOverHTTP.java:182)
>  [jetty-client-9.2.14.v20151106.jar:9.2.14.v20151106]
>       at 
> org.eclipse.jetty.client.http.HttpReceiverOverHTTP.process(HttpReceiverOverHTTP.java:129)
>  [jetty-client-9.2.14.v20151106.jar:9.2.14.v20151106]
>       at 
> org.eclipse.jetty.client.http.HttpReceiverOverHTTP.receive(HttpReceiverOverHTTP.java:69)
>  [jetty-client-9.2.14.v20151106.jar:9.2.14.v20151106]
>       at 
> org.eclipse.jetty.client.http.HttpChannelOverHTTP.receive(HttpChannelOverHTTP.java:89)
>  [jetty-client-9.2.14.v20151106.jar:9.2.14.v20151106]
>       at 
> org.eclipse.jetty.client.http.HttpConnectionOverHTTP.onFillable(HttpConnectionOverHTTP.java:122)
>  [jetty-client-9.2.14.v20151106.jar:9.2.14.v20151106]
>       at 
> org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:544) 
> [jetty-io-9.2.14.v20151106.jar:9.2.14.v20151106]
>       at 
> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
>  [jetty-util-9.2.14.v20151106.jar:9.2.14.v20151106]
>       at 
> org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
>  [jetty-util-9.2.14.v20151106.jar:9.2.14.v20151106]
>       at java.lang.Thread.run(Thread.java:745) [na:1.8.0_66]
> [36mo.a.c.c.s.i.s.SubscriptionHelper        [0;39m [2m:[0;39m 
> [CHANNEL:META_CONNECT]: {failure={exception=java.io.EOFException: 
> HttpConnectionOverHTTP@12f0e719(l:/10.172.131.200:50574 <-> 
> r:my-proxy.com/x.x.x.x:xx,closed=false)[HttpChannelOverHTTP@2a4927(exchange=HttpExchange@6ae1ae35
>  req=TERMINATED/null@null 
> res=PENDING/null@null)[send=HttpSenderOverHTTP@51fea1e0(req=QUEUED,snd=COMPLETED,failure=null)[HttpGenerator{s=START}],recv=HttpReceiverOverHTTP@44234ce9(rsp=IDLE,failure=null)[HttpParser{s=CLOSED,0
>  of -1}]]], message={clientId=3u9riwg6ag3r5dd3ay86i444f, 
> channel=/meta/connect, id=86, connectionType=long-polling}, 
> connectionType=long-polling}, channel=/meta/connect, id=86, 
> subscription=null, successful=false}
> [36morg.cometd.client.BayeuxClient          [0;39m [2m:[0;39m Connecting, 
> transport 
> org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper$3@4e0cc334
> [36morg.cometd.client.BayeuxClient          [0;39m [2m:[0;39m Sending 
> messages [{clientId=3u9riwg6ag3r5dd3ay86i444f, advice={timeout=0}, 
> channel=/meta/connect, id=87, connectionType=long-polling}]
> [36morg.cometd.client.BayeuxClient          [0;39m [2m:[0;39m State update: 
> UNCONNECTED -> UNCONNECTED
> [36morg.cometd.client.BayeuxClient          [0;39m [2m:[0;39m Messages failed 
> [{clientId=3u9riwg6ag3r5dd3ay86i444f, advice={timeout=0}, 
> channel=/meta/connect, id=87, connectionType=long-polling}]
> org.eclipse.jetty.io.EofException: null
>       at 
> org.eclipse.jetty.io.ssl.SslConnection$DecryptedEndPoint.flush(SslConnection.java:723)
>  ~[jetty-io-9.2.14.v20151106.jar:9.2.14.v20151106]
>       at org.eclipse.jetty.io.WriteFlusher.flush(WriteFlusher.java:408) 
> ~[jetty-io-9.2.14.v20151106.jar:9.2.14.v20151106]
>       at org.eclipse.jetty.io.WriteFlusher.write(WriteFlusher.java:302) 
> ~[jetty-io-9.2.14.v20151106.jar:9.2.14.v20151106]
>       at 
> org.eclipse.jetty.io.AbstractEndPoint.write(AbstractEndPoint.java:129) 
> [jetty-io-9.2.14.v20151106.jar:9.2.14.v20151106]
>       at 
> org.eclipse.jetty.client.http.HttpSenderOverHTTP.sendHeaders(HttpSenderOverHTTP.java:108)
>  [jetty-client-9.2.14.v20151106.jar:9.2.14.v20151106]
>       at org.eclipse.jetty.client.HttpSender.send(HttpSender.java:204) 
> [jetty-client-9.2.14.v20151106.jar:9.2.14.v20151106]
>       at 
> org.eclipse.jetty.client.http.HttpChannelOverHTTP.send(HttpChannelOverHTTP.java:78)
>  [jetty-client-9.2.14.v20151106.jar:9.2.14.v20151106]
>       at 
> org.eclipse.jetty.client.http.HttpConnectionOverHTTP$Delegate.send(HttpConnectionOverHTTP.java:218)
>  [jetty-client-9.2.14.v20151106.jar:9.2.14.v20151106]
>       at 
> org.eclipse.jetty.client.http.HttpConnectionOverHTTP.send(HttpConnectionOverHTTP.java:91)
>  [jetty-client-9.2.14.v20151106.jar:9.2.14.v20151106]
>       at 
> org.eclipse.jetty.client.http.HttpDestinationOverHTTP.send(HttpDestinationOverHTTP.java:36)
>  [jetty-client-9.2.14.v20151106.jar:9.2.14.v20151106]
>       at 
> org.eclipse.jetty.client.http.HttpDestinationOverHTTP.send(HttpDestinationOverHTTP.java:26)
>  [jetty-client-9.2.14.v20151106.jar:9.2.14.v20151106]
>       at 
> org.eclipse.jetty.client.PoolingHttpDestination.process(PoolingHttpDestination.java:150)
>  [jetty-client-9.2.14.v20151106.jar:9.2.14.v20151106]
>       at 
> org.eclipse.jetty.client.PoolingHttpDestination.send(PoolingHttpDestination.java:85)
>  [jetty-client-9.2.14.v20151106.jar:9.2.14.v20151106]
>       at 
> org.eclipse.jetty.client.PoolingHttpDestination.send(PoolingHttpDestination.java:76)
>  [jetty-client-9.2.14.v20151106.jar:9.2.14.v20151106]
>       at 
> org.eclipse.jetty.client.HttpDestination.send(HttpDestination.java:187) 
> [jetty-client-9.2.14.v20151106.jar:9.2.14.v20151106]
>       at org.eclipse.jetty.client.HttpClient.send(HttpClient.java:527) 
> [jetty-client-9.2.14.v20151106.jar:9.2.14.v20151106]
>       at org.eclipse.jetty.client.HttpRequest.send(HttpRequest.java:694) 
> [jetty-client-9.2.14.v20151106.jar:9.2.14.v20151106]
>       at org.eclipse.jetty.client.HttpRequest.send(HttpRequest.java:678) 
> [jetty-client-9.2.14.v20151106.jar:9.2.14.v20151106]
>       at 
> org.cometd.client.transport.LongPollingTransport.send(LongPollingTransport.java:191)
>  [cometd-java-client-3.0.9.jar:na]
>       at 
> org.cometd.client.BayeuxClient$BayeuxClientState.transportSend(BayeuxClient.java:1430)
>  [cometd-java-client-3.0.9.jar:na]
>       at 
> org.cometd.client.BayeuxClient$BayeuxClientState.send(BayeuxClient.java:1425) 
> [cometd-java-client-3.0.9.jar:na]
>       at org.cometd.client.BayeuxClient.sendConnect(BayeuxClient.java:482) 
> [cometd-java-client-3.0.9.jar:na]
>       at org.cometd.client.BayeuxClient$12.run(BayeuxClient.java:815) 
> [cometd-java-client-3.0.9.jar:na]
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [na:1.8.0_66]
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
> [na:1.8.0_66]
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  [na:1.8.0_66]
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  [na:1.8.0_66]
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  [na:1.8.0_66]
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  [na:1.8.0_66]
>       at java.lang.Thread.run(Thread.java:745) [na:1.8.0_66]
> Caused by: java.nio.channels.ClosedChannelException: null
>       ... 30 common frames omitted
> [36mo.a.c.c.s.i.s.SubscriptionHelper        [0;39m [2m:[0;39m 
> [CHANNEL:META_CONNECT]: 
> {failure={exception=org.eclipse.jetty.io.EofException, 
> message={clientId=3u9riwg6ag3r5dd3ay86i444f, advice={timeout=0}, 
> channel=/meta/connect, id=87, connectionType=long-polling}, 
> connectionType=long-polling}, channel=/meta/connect, id=87, 
> subscription=null, successful=false}
> [36morg.cometd.client.BayeuxClient          [0;39m [2m:[0;39m Connecting, 
> transport 
> org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper$3@4e0cc334
> [36morg.cometd.client.BayeuxClient          [0;39m [2m:[0;39m Sending 
> messages [{clientId=3u9riwg6ag3r5dd3ay86i444f, advice={timeout=0}, 
> channel=/meta/connect, id=88, connectionType=long-polling}]
> [36mo.a.c.c.s.i.s.SubscriptionHelper$3      [0;39m [2m:[0;39m Received 
> messages [{advice={reconnect=handshake, interval=0}, channel=/meta/connect, 
> id=88, error=403::Unknown client, successful=false}]
> [36morg.cometd.client.BayeuxClient          [0;39m [2m:[0;39m Processing 
> /meta/connect {advice={reconnect=handshake, interval=0}, 
> channel=/meta/connect, id=88, error=403::Unknown client, successful=false}
> [36morg.cometd.client.BayeuxClient          [0;39m [2m:[0;39m State update: 
> UNCONNECTED -> REHANDSHAKING
> [36mo.a.c.c.s.i.s.SubscriptionHelper        [0;39m [2m:[0;39m 
> [CHANNEL:META_CONNECT]: {advice={reconnect=handshake, interval=0}, 
> channel=/meta/connect, id=88, error=403::Unknown client, successful=false}
> [36morg.cometd.client.BayeuxClient          [0;39m [2m:[0;39m Handshaking on 
> transport 
> org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper$3@4e0cc334:
>  {supportedConnectionTypes=[long-polling], channel=/meta/handshake, 
> version=1.0}
> [36morg.cometd.client.BayeuxClient          [0;39m [2m:[0;39m Sending 
> messages [{ext={replay=true}, supportedConnectionTypes=[long-polling], 
> channel=/meta/handshake, id=89, version=1.0}]
> [36mo.a.c.c.s.i.s.SubscriptionHelper$3      [0;39m [2m:[0;39m Received 
> messages [{ext={replay=true}, advice={reconnect=none}, 
> channel=/meta/handshake, id=89, error=401::Authentication invalid, 
> successful=false}]
> [36morg.cometd.client.BayeuxClient          [0;39m [2m:[0;39m Processing 
> /meta/handshake {ext={replay=true}, advice={reconnect=none}, 
> channel=/meta/handshake, id=89, error=401::Authentication invalid, 
> successful=false}
> [36morg.cometd.client.BayeuxClient          [0;39m [2m:[0;39m State update: 
> REHANDSHAKING -> TERMINATING
> [36mo.a.c.c.s.i.s.SubscriptionHelper        [0;39m [2m:[0;39m 
> [CHANNEL:META_HANDSHAKE]: {ext={replay=true}, advice={reconnect=none}, 
> channel=/meta/handshake, id=89, error=401::Authentication invalid, 
> successful=false}
> [36morg.cometd.client.BayeuxClient          [0;39m [2m:[0;39m State update: 
> TERMINATING -> DISCONNECTED
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to