hakidehari opened a new issue #1339:
URL: https://github.com/apache/camel-kafka-connector/issues/1339
Hi all,
I have been trying to troubleshoot some issues I have been seeing with the
Camel Kafka Salesforce Source connector to no avail. I have two different kafka
topics which each have 2 connectors flowing information into them via
Salesforce streaming API using the Salesforce source connector. There are a
few weird things I am seeing. The connectors seem to be reconnecting to the
Salesforce streams every minute or so. Here is an example of what I see in the
logs
`[2022-02-14 02:36:41,133] WARN [sf_account_change_connector|task-0] Connect
failure: {failure={exception=org.cometd.common.TransportException:
{httpCode=504}, message={clientId=y98udzrc3x45vv1jfrg9twg1f9e,
channel=/meta/connect, id=30557, connectionType=long-polling}, httpCode=504,
connectionType=long-polling}, channel=/meta/connect, id=30557,
successful=false}
(org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:174)
[2022-02-14 02:36:41,252] INFO [sf_account_change_connector|task-0]
Restarting on unexpected disconnect from Salesforce...
(org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:262)
[2022-02-14 02:36:41,525] INFO [sf_account_change_connector|task-0] Set
Replay extension to replay from `-1` for channel `/data/AccountChangeEvent`
(org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:547)
[2022-02-14 02:36:41,525] INFO [sf_account_change_connector|task-0]
Subscribing to channel /data/AccountChangeEvent...
(org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:435)
[2022-02-14 02:36:41,526] INFO [sf_account_change_connector|task-0]
Successfully restarted!
(org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:312)
[2022-02-14 02:36:41,569] INFO [sf_account_change_connector|task-0]
Subscribed to channel /data/AccountChangeEvent
(org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:499)
[2022-02-14 02:36:46,193] WARN [sf_order_p_event_connector|task-0] Connect
failure: {failure={exception=org.cometd.common.TransportException:
{httpCode=504}, message={clientId=139646d4ddo6w8r1v1xmp4rj3bli,
channel=/meta/connect, id=30562, connectionType=long-polling}, httpCode=504,
connectionType=long-polling}, channel=/meta/connect, id=30562,
successful=false}
(org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:174)
[2022-02-14 02:36:46,210] INFO [sf_account_change_connector|task-0|offsets]
WorkerSourceTask{id=sf_account_change_connector-0} flushing 0 outstanding
messages for offset commit
(org.apache.kafka.connect.runtime.WorkerSourceTask:487)
[2022-02-14 02:36:46,336] INFO [sf_order_p_event_connector|task-0]
Restarting on unexpected disconnect from Salesforce...
(org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:262)
[2022-02-14 02:36:46,392] INFO [sf_order_p_event_connector|task-0|offsets]
WorkerSourceTask{id=sf_order_p_event_connector-0} flushing 0 outstanding
messages for offset commit
(org.apache.kafka.connect.runtime.WorkerSourceTask:487)
[2022-02-14 02:36:46,596] INFO [sf_order_p_event_connector|task-0] Set
Replay extension to replay from `-1` for channel `/event/Order_Completed__e`
(org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:547)
[2022-02-14 02:36:46,596] INFO [sf_order_p_event_connector|task-0]
Subscribing to channel /event/Order_Completed__e...
(org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:435)
`
Eventually, after a few hours or even a couple of days, some of the
connectors fail to reconnect. Here, I see two different errors. I see either
401::Authentication invalid error or 403::Unknown Client error. Examples:
`[2022-02-14 04:11:15,324] ERROR [sf_order_p_event_connector|task-0] Error
restarting: Error during CONNECT: 403::Unknown client
(org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:307)
org.apache.camel.CamelException: Error during CONNECT: 403::Unknown client
at
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.connect(SubscriptionHelper.java:230)
at
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.doStart(SubscriptionHelper.java:212)
at
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.performClientRestart(SubscriptionHelper.java:304)
at
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.access$1000(SubscriptionHelper.java:60)
at
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper$4.run(SubscriptionHelper.java:249)
at
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:882)
at
org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1036)
at java.lang.Thread.run(Thread.java:748)
`
and
`[2022-02-09 19:58:39,911] ERROR [sf_contact_change_connector|task-0] Error
restarting: Exception during HANDSHAKE: 401::Authentication invalid
(org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:307)
org.apache.camel.CamelException: Exception during HANDSHAKE:
401::Authentication invalid
at
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.connect(SubscriptionHelper.java:223)
at
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.doStart(SubscriptionHelper.java:212)
at
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.performClientRestart(SubscriptionHelper.java:304)
at
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.access$1000(SubscriptionHelper.java:60)
at
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper$4.run(SubscriptionHelper.java:249)
at
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:882)
at
org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1036)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.camel.component.salesforce.api.SalesforceException:
401::Authentication invalid
at
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.getFailure(SubscriptionHelper.java:340)
at
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.access$300(SubscriptionHelper.java:60)
at
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper$1.onMessage(SubscriptionHelper.java:132)
at
org.cometd.common.AbstractClientSession$AbstractSessionChannel.notifyOnMessage(AbstractClientSession.java:583)
at
org.cometd.common.AbstractClientSession$AbstractSessionChannel.notifyMessageListeners(AbstractClientSession.java:568)
at
org.cometd.common.AbstractClientSession.notifyListeners(AbstractClientSession.java:308)
at
org.cometd.common.AbstractClientSession.lambda$receive$4(AbstractClientSession.java:269)
at org.cometd.bayeux.Promise$2.succeed(Promise.java:103)
at
org.cometd.common.AsyncFoldLeft$AbstractLoop.run(AsyncFoldLeft.java:199)
at org.cometd.common.AsyncFoldLeft.run(AsyncFoldLeft.java:93)
at
org.cometd.common.AbstractClientSession.extendIncoming(AbstractClientSession.java:103)
at
org.cometd.common.AbstractClientSession.receive(AbstractClientSession.java:263)
at
org.cometd.client.BayeuxClient.failHandshake(BayeuxClient.java:721)
at
org.cometd.client.BayeuxClient.processHandshake(BayeuxClient.java:707)
`
This is not specific to one connector but can happen on any of them. Here
is an example config of my Salesforce Source connector which I am using. I am
using the REFRESH_TOKEN authentication method and have set the refresh token to
expire if it is not used for 365 days within Salesforce. The authentication is
good, as is evident by it working fine when I recreate the connectors.
`{
"name": "sf_account_change_connector",
"config": {
"camel.source.endpoint.rawPayload": true,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": false,
"value.converter.schemas.enable":false,
"value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter",
"camel.source.marshal":"json-jackson",
"connector.class":"org.apache.camel.kafkaconnector.salesforce.CamelSalesforceSourceConnector",
"camel.component.salesforce.loginUrl":"<redacted>",
"camel.component.salesforce.instanceUrl":"<redacted>",
"topics": "CISL_SF_ASSET_INFO_INPUT",
"errors.tolerance":"all",
"errors.retry.timeout":"-1",
"camel.source.path.topicName":"/data/AccountChangeEvent",
"camel.source.endpoint.replayId":"-1",
"camel.component.salesforce.authenticationType":"REFRESH_TOKEN",
"camel.component.salesforce.clientId":"<redacted>",
"camel.component.salesforce.clientSecret":"<redacted>",
"camel.component.salesforce.refreshToken":"redacted",
"camel.component.salesforce.userName":"<redacted>"
}
}
`
All the connectors have similar configs to this one, only changes are the
streaming API path and topic names. So my questions are this:
Why is it reconnecting to the streams every so often? Is this a setting I
can change in the connector config itself or is this something SF specific?
Also, why are there occasional 403 and 401 errors arising, even though my
authentication is perfectly valid? The connectors seem to stop working when
one of these errors happen.
Thanks in advance!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]