Jason created CAMEL-20834:
-----------------------------

             Summary: A NullPointException in SubscriptionHelper.subscribe() 
results in the interruption of the platform-event channel subscription and 
events are not received
                 Key: CAMEL-20834
                 URL: https://issues.apache.org/jira/browse/CAMEL-20834
             Project: Camel
          Issue Type: Bug
          Components: camel-salesforce
    Affects Versions: 4.6.0, 4.5.0
         Environment: Spring-Boot: 3.2.6
Camel: 4.5.0 or 4.6.0
Java: 17
            Reporter: Jason


I appears that when the `SubscriptionHelper` attempts to reconnect and 
resubscribe to a channel in Salesforce, that occasionally and at an 
unpredictable rate, the `subscriptionListener` in 
`SubscriptionHelper.subscribe` throws an NPE when attempting to get the 
`SUBSCRIPTION_FIELD` from the response message from Salesforce because in some 
cases that message does not contain a `subscription` property in the JSON 
response.

This results in the component seeming to 

This seems to be the offending line:
https://github.com/apache/camel/blob/main/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java#L475

The reconnection seem to happen, but never actually re-subscribes to the 
channel and therefore the flow of platform-events stops until the spring-boot 
application is restarted.

Here is some of the associated logging:


{noformat}
[DEBUG] 2024-06-04 00:17:20,212 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper 
[Camel (camel-1) thread #319 - SalesforceHttpClient ] - 
[CHANNEL:META_HANDSHAKE]: {ext={replay=true, payload.format=true}, 
minimumVersion=1.0, clientId=REDACTED, supportedConnectionTypes=[long-polling], 
channel=/meta/handshake, id=328, version=1.0, successful=true}
[DEBUG] 2024-06-04 00:17:20,227 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper 
[Camel (camel-1) thread #320 - SalesforceHttpClient ] - [CHANNEL:META_CONNECT]: 
{clientId=REDACTED, advice={reconnect=retry, interval=0, timeout=110000}, 
channel=/meta/connect, id=329, successful=true}
[DEBUG] 2024-06-04 00:17:20,227 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper 
[Camel (camel-1) thread #320 - SalesforceHttpClient ] - Refreshing 
subscriptions to 1 channels on reconnect
[INFO ] 2024-06-04 00:17:20,227 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper 
[Camel (camel-1) thread #320 - SalesforceHttpClient ] - Subscribing to channel 
/event/M1SF_SObject_Capture__e...
[DEBUG] 2024-06-04 00:17:20,243 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper 
[Camel (camel-1) thread #321 - SalesforceHttpClient ] - [CHANNEL:META_CONNECT]: 
{advice={reconnect=handshake, interval=0}, channel=/meta/connect, id=330, 
error=403::Unknown client, successful=false}
[WARN ] 2024-06-04 00:17:20,243 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper 
[Camel (camel-1) thread #321 - SalesforceHttpClient ] - Connect failure: 
{advice={reconnect=handshake, interval=0}, channel=/meta/connect, id=330, 
error=403::Unknown client, successful=false}
[DEBUG] 2024-06-04 00:17:20,243 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper 
[Camel (camel-1) thread #321 - SalesforceHttpClient ] - Advice != retry, so 
handshaking
[DEBUG] 2024-06-04 00:17:20,243 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper 
[Camel (camel-1) thread #321 - SalesforceHttpClient ] - Begin handshake if not 
already in progress.
[DEBUG] 2024-06-04 00:17:20,269 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper 
[SalesforceHttpClient@f13e0a2-34 ] - [CHANNEL:META_SUBSCRIBE]: 
{advice={reconnect=handshake, interval=0}, channel=/meta/subscribe, id=331, 
error=403::Unknown client, successful=false}
[INFO ] 2024-06-04 00:17:20,270 org.cometd.bayeux.client.ClientSession 
[SalesforceHttpClient@f13e0a2-34 ] - Exception while invoking listener 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper$5@704dd2c3
java.lang.NullPointerException: Cannot invoke "Object.toString()" because the 
return value of "org.cometd.bayeux.Message.get(Object)" is null
        at 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper$5.onMessage(SubscriptionHelper.java:452)
        at 
org.cometd.common.AbstractClientSession$AbstractSessionChannel.notifyOnMessage(AbstractClientSession.java:599)
        at 
org.cometd.common.AbstractClientSession$AbstractSessionChannel.notifyMessageListeners(AbstractClientSession.java:586)
        at 
org.cometd.common.AbstractClientSession.notifyListeners(AbstractClientSession.java:311)
        at 
org.cometd.common.AbstractClientSession.lambda$receive$4(AbstractClientSession.java:268)
        at org.cometd.bayeux.Promise$2.succeed(Promise.java:96)
        at 
org.cometd.common.AsyncFoldLeft$AbstractLoop.run(AsyncFoldLeft.java:236)
        at org.cometd.common.AsyncFoldLeft.run(AsyncFoldLeft.java:107)
        at 
org.cometd.common.AbstractClientSession.extendIncoming(AbstractClientSession.java:99)
        at 
org.cometd.common.AbstractClientSession.receive(AbstractClientSession.java:262)
        at org.cometd.client.BayeuxClient.processMessage(BayeuxClient.java:812)
        at org.cometd.client.BayeuxClient.processMessages(BayeuxClient.java:634)
        at 
org.cometd.client.BayeuxClient$MessageTransportListener.onMessages(BayeuxClient.java:1227)
        at 
org.cometd.client.http.common.AbstractHttpClientTransport.processResponseMessages(AbstractHttpClientTransport.java:178)
        at 
org.cometd.client.http.jetty.JettyHttpClientTransport.access$200(JettyHttpClientTransport.java:51)
        at 
org.cometd.client.http.jetty.JettyHttpClientTransport$ResponseListener.onComplete(JettyHttpClientTransport.java:285)
        at 
org.eclipse.jetty.client.transport.ResponseListeners.notifyComplete(ResponseListeners.java:350)
        at 
org.eclipse.jetty.client.transport.ResponseListeners.lambda$addCompleteListener$7(ResponseListeners.java:335)
        at 
org.eclipse.jetty.client.transport.ResponseListeners.notifyComplete(ResponseListeners.java:350)
        at 
org.eclipse.jetty.client.transport.ResponseListeners.notifyComplete(ResponseListeners.java:342)
        at 
org.eclipse.jetty.client.transport.HttpReceiver.terminateResponse(HttpReceiver.java:420)
        at 
org.eclipse.jetty.client.transport.HttpReceiver.terminateResponse(HttpReceiver.java:402)
        at 
org.eclipse.jetty.client.transport.HttpReceiver.lambda$responseSuccess$3(HttpReceiver.java:367)
        at 
org.eclipse.jetty.util.thread.SerializedInvoker$Link.run(SerializedInvoker.java:191)
        at 
org.eclipse.jetty.util.thread.SerializedInvoker.run(SerializedInvoker.java:117)
        at 
org.eclipse.jetty.client.transport.HttpReceiver.responseHeaders(HttpReceiver.java:243)
        at 
org.eclipse.jetty.client.transport.internal.HttpReceiverOverHTTP.lambda$headerComplete$2(HttpReceiverOverHTTP.java:435)
        at 
org.eclipse.jetty.client.transport.internal.HttpReceiverOverHTTP.parse(HttpReceiverOverHTTP.java:320)
        at 
org.eclipse.jetty.client.transport.internal.HttpReceiverOverHTTP.parseAndFill(HttpReceiverOverHTTP.java:250)
        at 
org.eclipse.jetty.client.transport.internal.HttpReceiverOverHTTP.receive(HttpReceiverOverHTTP.java:76)
        at 
org.eclipse.jetty.client.transport.internal.HttpChannelOverHTTP.receive(HttpChannelOverHTTP.java:97)
        at 
org.eclipse.jetty.client.transport.internal.HttpConnectionOverHTTP.onFillable(HttpConnectionOverHTTP.java:207)
        at 
org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:322)
        at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:99)
        at 
org.eclipse.jetty.io.ssl.SslConnection$SslEndPoint.onFillable(SslConnection.java:574)
        at 
org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:390)
        at 
org.eclipse.jetty.io.ssl.SslConnection$2.succeeded(SslConnection.java:150)
        at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:99)
        at 
org.eclipse.jetty.io.SelectableChannelEndPoint$1.run(SelectableChannelEndPoint.java:53)
        at 
org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.runTask(AdaptiveExecutionStrategy.java:478)
        at 
org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.consumeTask(AdaptiveExecutionStrategy.java:441)
        at 
org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.tryProduce(AdaptiveExecutionStrategy.java:293)
        at 
org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.run(AdaptiveExecutionStrategy.java:201)
        at 
org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:311)
        at 
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:979)
        at 
org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.doRunJob(QueuedThreadPool.java:1209)
        at 
org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1164)
        at java.base/java.lang.Thread.run(Thread.java:833)
[DEBUG] 2024-06-04 00:17:20,326 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper 
[Camel (camel-1) thread #322 - SalesforceHttpClient ] - 
[CHANNEL:META_HANDSHAKE]: {ext={replay=true, payload.format=true}, 
minimumVersion=1.0, clientId=REDACTED, supportedConnectionTypes=[long-polling], 
channel=/meta/handshake, id=332, version=1.0, successful=true}
[DEBUG] 2024-06-04 00:17:20,345 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper 
[Camel (camel-1) thread #323 - SalesforceHttpClient ] - [CHANNEL:META_CONNECT]: 
{clientId=REDACTED, advice={reconnect=retry, interval=0, timeout=110000}, 
channel=/meta/connect, id=333, successful=true}
[DEBUG] 2024-06-04 00:19:10,361 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper 
[Camel (camel-1) thread #324 - SalesforceHttpClient ] - [CHANNEL:META_CONNECT]: 
{clientId=REDACTED, channel=/meta/connect, id=334, successful=true}
{noformat}

This bug is causing the flow of play for events to stop unpredictably.  We have 
some alerting around this logging event, but it seems like this should be 
handled in code. I have attempted to try to catch this exception in the Camel 
route, but have not been successful.

The message the SubscriptionHelper is try to parse is:


{noformat}
{advice={reconnect=handshake, interval=0}, channel=/meta/subscribe, id=331, 
error=403::Unknown client, successful=false}
{noformat}

Based on the exception:

{noformat}
Exception while invoking listener 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper$5@704dd2c3
java.lang.NullPointerException: Cannot invoke "Object.toString()" because the 
return value of "org.cometd.bayeux.Message.get(Object)" is null
{noformat}

This code:

{code:java}
final String subscribedChannelName = message.get(SUBSCRIPTION_FIELD).toString();
{code}

Is producing the NPE and interrupting the subscription process.





--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to