gohanbg opened a new issue #1300:
URL: https://github.com/apache/camel-kafka-connector/issues/1300
Hello,
Apologies if my question is not for here. I have a kafka topic, that I want
to export to S3, therefore using the **CamelAws2s3SinkConnector**. It has a
confluent schema registry. All works well, but when I except when encountering
a deleted kafka message. Then I get the following exception:
```
2021-12-19 13:56:58,542 ERROR WorkerSinkTask{id=s3-sink-connector-0} Task
threw an uncaught and unrecoverable exception. Task is being killed and will
not recover until manually restarted. Error: Exchange delivery has failed!
(org.apache.kafka.connect.runtime.WorkerSinkTask)
[task-thread-s3-sink-connector-0]
org.apache.kafka.connect.errors.ConnectException: Exchange delivery has
failed!
at
org.apache.camel.kafkaconnector.CamelSinkTask.put(CamelSinkTask.java:199)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:241)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.camel.InvalidPayloadException: No body available of
type: java.lang.Object on: Message. Exchange[2719B1AB7DB587D-0000000000000003]
at
org.apache.camel.support.MessageSupport.getMandatoryBody(MessageSupport.java:79)
at
org.apache.camel.component.aws2.s3.AWS2S3Producer.processSingleOp(AWS2S3Producer.java:258)
at
org.apache.camel.component.aws2.s3.AWS2S3Producer.process(AWS2S3Producer.java:100)
at
org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66)
at
org.apache.camel.processor.SendDynamicProcessor.lambda$process$0(SendDynamicProcessor.java:197)
at
org.apache.camel.support.cache.DefaultProducerCache.doInAsyncProducer(DefaultProducerCache.java:318)
at
org.apache.camel.processor.SendDynamicProcessor.process(SendDynamicProcessor.java:182)
at
org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:463)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:179)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:64)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:184)
at
org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:398)
at
org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:96)
at
org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:217)
at
org.apache.camel.impl.engine.SharedCamelInternalProcessor$1.process(SharedCamelInternalProcessor.java:111)
at
org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
at
org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:108)
at
org.apache.camel.support.cache.DefaultProducerCache.send(DefaultProducerCache.java:190)
at
org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:176)
at
org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:148)
at
org.apache.camel.kafkaconnector.CamelSinkTask.put(CamelSinkTask.java:194)
... 11 more
```
This is my configuration:
```
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: s3-sink-connector
labels:
strimzi.io/cluster: aws-connect
spec:
class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SinkConnector
tasksMax: 1
config:
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url:
http://confluent-cp-schema-registry.messaging.svc:8081
topics: my-topic
camel.sink.path.bucketNameOrArn: my-bucket-name
camel.sink.endpoint.keyName: ${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}
camel.sink.maxPollDuration: 10000
transforms: tojson
transforms.tojson.type:
org.apache.camel.kafkaconnector.transforms.SchemaAndStructToJsonTransform
transforms.tojson.converter.type: value
camel.component.aws2-s3.accessKey: aws-key
camel.component.aws2-s3.secretKey: aws-secret
camel.component.aws2-s3.region: region
```
What happens is the following:
1. The connector starts reading the topic from beginning
2. The first few messages are correctly transformed and uploaded to S3 (i
can see them all appearing correct, with actual JSON and not an avro message)
3. We reach a kafka null record (i.e. deleted kafka record)
4. The above exception is thrown and I can't continue
I'm using **strimzi v0.26.1**, with **kafka v3.0.0**, confluent **schema
registry v7.0.0** and **camel-aws2-s3-kafka-connector v0.11.0**
Regards
Mihail
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]