hakidehari opened a new issue #1271:
URL: https://github.com/apache/camel-kafka-connector/issues/1271


   We are producing events from Salesforce to Kafka via something in SF called 
platform events. To get these events into Kafka, we are using the Salesforce 
Kafka Source Connector from Camel. Documentation for this Kafka connector can 
be found here:
   
   
https://camel.apache.org/camel-kafka-connector/latest/reference/connectors/camel-salesforce-kafka-source-connector.html
   
   The events generated from Salesforce are in JSON format. The format looks 
something like this:
   
   `{
    "data": {
       "schema": "NhgeDyLTvEyVPQ9uOzDqeQ",
       "payload": {
           "AccountId__c": "00119000013q2g3AAA",
           "AccountUUID__c": "4654fefb-e3d1-4b08-a4e2-5dabaa504abb",
           "GUID__c": null,
           "CreatedById": "0056g000005YeBAAA0",
           "CreatedDate": "2021-10-10T15:24:43.819Z"
       },
       "event": {
           "replayId": "1256093"
       }
   },
    "channel": "/event/Order_Completed__e"
   }`
   
   This leads us to our issue. We are using the following configuration for our 
source connector:
   
   `{
   "name": "sf_order_p_event_connector",
   "config": {
       "key.converter": "org.apache.kafka.connect.storage.StringConverter",
       "key.converter.schemas.enable": "true",
       "value.converter": "org.apache.kafka.connect.json.JsonConverter",
       "value.converter.schemas.enable": "true",
       "connector.class": 
"org.apache.camel.kafkaconnector.salesforce.CamelSalesforceSourceConnector",
       "camel.component.salesforce.loginUrl": "<redacted>",
       "camel.component.salesforce.instanceUrl": "<redacted>",
       "topics": "<redacted>",
       "camel.source.endpoint.rawPayload": "true",
       "camel.source.path.topicName": "/event/Order_Completed__e",
       "camel.source.endpoint.replayId": "-1",
       "camel.component.salesforce.authenticationType": "USERNAME_PASSWORD",
       "camel.component.salesforce.clientId": "<redacted",
       "camel.component.salesforce.clientSecret": "<redacted>",
       "camel.component.salesforce.password": "<redacted5",
       "camel.component.salesforce.userName": "<redacted>",
       "camel.source.endpoint.apiVersion": "52.0"
   }
   }`
   
   When using this value.converter from the configuration above, we get the 
following exception:
   
   `ERROR [sf_account_change_connector|task-0] 
WorkerSourceTask{id=sf_account_change_connector-0} Task threw an uncaught and 
unrecoverable exception. Task is being killed and will not recover until 
manually restarted (org.apache.kafka.connect.runtime.WorkerTask:184) 
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error 
handler at 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
 at 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
 at 
org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:298)
 at 
org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:324)
 at 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:248)
 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182) at 
org.apache.kafka.connect.runtime.WorkerTas
 k.run(WorkerTask.java:231) at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) Caused by: 
java.lang.NullPointerException at 
org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:677)
 at 
org.apache.kafka.connect.json.JsonConverter.convertToJsonWithEnvelope(JsonConverter.java:592)
 at 
org.apache.kafka.connect.json.JsonConverter.fromConnectData(JsonConverter.java:346)
 at 
org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63) 
at 
org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:298)
 at 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
 at
  
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
 ... 11 more`
   
   It seems it cannot convert whatever value is coming through to Kafka into 
JSON. I changed value.converter to 
"org.apache.kafka.connect.storage.StringConverter" to see what I am actually 
getting in the topic. Once I did that, this is the message I saw coming through 
to Kafka from Salesforce:
   
   `{data={schema=NhgeDyLTvEyVPQ9uOzDqeQ, 
payload={AccountId__c=00119000013q2g3AAA, 
   AccountUUID__c=4654fefb-e3d1-4b08-a4e2-5dabaa504abb, GUID__c=null, 
   CreatedById=0056g000005YeBAAA0, CreatedDate=2021-10-10T15:24:43.819Z}, 
event= 
   {replayId=1256093}}, channel=/event/Order_Completed__e}`
   
   It seems Kafka is not processing this as JSON but as a key=value (whatever 
this is) type of value. My question is, why am I seeing this type of payload 
and not JSON? Also, what configuration (if any) for the source connector can I 
use to get past this and perhaps convert the value into JSON? I need this value 
in JSON for my faust agent to properly process it. I have tried multiple 
different configurations for the source connector but nothing seems to be 
working.
   
   Any help would be appreciated. Thank you in advance!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to