gohanbg opened a new issue #1300:
URL: https://github.com/apache/camel-kafka-connector/issues/1300


   Hello,
   
   Apologies if my question is not for here. I have a kafka topic, that I want 
to export to S3, therefore using the **CamelAws2s3SinkConnector**. It has a 
confluent schema registry. All works well, but when I except when encountering 
a deleted kafka message. Then I get the following exception:
   ```
   2021-12-19 13:56:58,542 ERROR WorkerSinkTask{id=s3-sink-connector-0} Task 
threw an uncaught and unrecoverable exception. Task is being killed and will 
not recover until manually restarted. Error: Exchange delivery has failed! 
(org.apache.kafka.connect.runtime.WorkerSinkTask) 
[task-thread-s3-sink-connector-0]
   org.apache.kafka.connect.errors.ConnectException: Exchange delivery has 
failed!
        at 
org.apache.camel.kafkaconnector.CamelSinkTask.put(CamelSinkTask.java:199)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
        at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:241)
        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
   Caused by: org.apache.camel.InvalidPayloadException: No body available of 
type: java.lang.Object on: Message. Exchange[2719B1AB7DB587D-0000000000000003]
        at 
org.apache.camel.support.MessageSupport.getMandatoryBody(MessageSupport.java:79)
        at 
org.apache.camel.component.aws2.s3.AWS2S3Producer.processSingleOp(AWS2S3Producer.java:258)
        at 
org.apache.camel.component.aws2.s3.AWS2S3Producer.process(AWS2S3Producer.java:100)
        at 
org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66)
        at 
org.apache.camel.processor.SendDynamicProcessor.lambda$process$0(SendDynamicProcessor.java:197)
        at 
org.apache.camel.support.cache.DefaultProducerCache.doInAsyncProducer(DefaultProducerCache.java:318)
        at 
org.apache.camel.processor.SendDynamicProcessor.process(SendDynamicProcessor.java:182)
        at 
org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:463)
        at 
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:179)
        at 
org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:64)
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:184)
        at 
org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:398)
        at 
org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:96)
        at 
org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:217)
        at 
org.apache.camel.impl.engine.SharedCamelInternalProcessor$1.process(SharedCamelInternalProcessor.java:111)
        at 
org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
        at 
org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:108)
        at 
org.apache.camel.support.cache.DefaultProducerCache.send(DefaultProducerCache.java:190)
        at 
org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:176)
        at 
org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:148)
        at 
org.apache.camel.kafkaconnector.CamelSinkTask.put(CamelSinkTask.java:194)
        ... 11 more
   ```
   
   This is my configuration:
   ```
   apiVersion: kafka.strimzi.io/v1beta2
   kind: KafkaConnector
   metadata:
     name: s3-sink-connector
     labels:
       strimzi.io/cluster: aws-connect
   spec:
     class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SinkConnector
     tasksMax: 1
     config:
       key.converter: org.apache.kafka.connect.storage.StringConverter
       value.converter: io.confluent.connect.avro.AvroConverter
       value.converter.schema.registry.url: 
http://confluent-cp-schema-registry.messaging.svc:8081
       topics: my-topic
       camel.sink.path.bucketNameOrArn: my-bucket-name
       camel.sink.endpoint.keyName: ${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}
       camel.sink.maxPollDuration: 10000
       transforms: tojson
       transforms.tojson.type: 
org.apache.camel.kafkaconnector.transforms.SchemaAndStructToJsonTransform
       transforms.tojson.converter.type: value
       camel.component.aws2-s3.accessKey: aws-key
       camel.component.aws2-s3.secretKey: aws-secret
       camel.component.aws2-s3.region: region
   
   ```
   
   What happens is the following:
   1. The connector starts reading the topic from beginning
   2. The first few messages are correctly transformed and uploaded to S3 (i 
can see them all appearing correct, with actual JSON and not an avro message)
   3. We reach a kafka null record (i.e. deleted kafka record)
   4. The above exception is thrown and I can't continue
   
   I'm using **strimzi v0.26.1**, with **kafka v3.0.0**, confluent **schema 
registry v7.0.0** and **camel-aws2-s3-kafka-connector v0.11.0**
   
   Regards
   Mihail


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to