gohanbg opened a new issue #843:
URL: https://github.com/apache/camel-kafka-connector/issues/843
Hello,
I have been playing around with Strimzi and camel's kafka connectors, in
particular the S3 sink connector.
My setup consists of Strimzi kafka and Confluent schema registry, all
running in k8s
The **KafkaConnect** looks like this:
```yaml
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
namespace: messaging
name: aws-connect
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 2.5.0
image: my-image
logging:
type: inline
loggers:
connect.root.logger.level: "INFO"
replicas: 1
bootstrapServers: my-cluster-kafka-brokers.messaging.svc:9092
externalConfiguration:
volumes:
- name: kafka-aws-credentials
secret:
secretName: kafka-aws-credentials
config:
config.providers: file
config.providers.file.class:
org.apache.kafka.common.config.provider.FileConfigProvider
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url:
http://confluent-cp-schema-registry.messaging.svc:8081
key.converter.schemas.enable: false
value.converter.schemas.enable: true
template:
pod:
imagePullSecrets:
- name: docker-registry-secret
```
the **KafkaConnector** looks like this:
```yaml
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
name: s3-sink-connector
labels:
strimzi.io/cluster: aws-connect
spec:
class: org.apache.camel.kafkaconnector.awss3.CamelAwss3SinkConnector
tasksMax: 1
config:
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url:
http://confluent-cp-schema-registry.messaging.svc:8081
topics: topic-to-export
camel.sink.path.bucketNameOrArn: s3-bucket-name
camel.sink.endpoint.keyName: ${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}
camel.sink.maxPollDuration: 10000
camel.component.aws-s3.configuration.autocloseBody: false
camel.component.aws-s3.accessKey:
${file:/opt/kafka/external-configuration/kafka-aws-credentials/kafka-s3-credentials.properties:aws_access_key_id}
camel.component.aws-s3.secretKey:
${file:/opt/kafka/external-configuration/kafka-aws-credentials/kafka-s3-credentials.properties:aws_secret_access_key}
camel.component.aws-s3.region: S3_REGION
```
When I run the Connector it starts reading the topic correctly, but just
before sending it to S3 it says the following:
```error
org.apache.camel.NoTypeConversionAvailableException: No type converter
available to convert from type: org.apache.kafka.connect.data.Struct to the
required type: java.io.InputStream
```
I can see that the **Struct** message contains the correct data from the
logs, but it can not be converted to InputStream.
I just want to have the message as a JSON in S3 at the end. Not any special
transformations.
Here is how my **KafkaConnect** image looks like
```dockerfile
FROM strimzi/kafka:0.17.0-kafka-2.4.0
USER root:root
RUN mkdir -p /opt/kafka/plugins/camel-aws-s3
COPY ./camel-aws-s3/* /opt/kafka/plugins/camel-aws-s3/
USER 1001
```
and the camel-aws-s3 directory, from which I'm copying looks like this:
```
annotations-13.0.jar
camel-core-engine-3.7.0.jar commons-compress-1.20.jar
jackson-mapper-asl-1.9.13.jar medeia-validator-core-1.1.1.jar
apicurio-registry-common-1.3.1.Final.jar
camel-core-languages-3.7.0.jar commons-logging-1.2.jar
jboss-jaxrs-api_2.1_spec-2.0.1.Final.jar medeia-validator-jackson-1.1.1.jar
apicurio-registry-distro-connect-converter-1.3.0.Final.jar
camel-core-model-3.7.0.jar common-utils-5.5.0.jar
jmespath-java-1.11.714.jar NOTICE.txt
apicurio-registry-rest-client-1.3.1.Final.jar
camel-core-processor-3.7.0.jar connect-json-2.6.0.jar
joda-time-2.8.1.jar okhttp-3.14.9.jar
apicurio-registry-utils-converter-1.3.1.Final.jar
camel-core-reifier-3.7.0.jar converter-jackson-2.9.0.jar
kafka-avro-serializer-5.5.0.jar okio-1.17.2.jar
apicurio-registry-utils-serde-1.3.1.Final.jar
camel-direct-3.7.0.jar httpclient-4.5.13.jar
kafka-clients-2.6.0.jar protobuf-java-3.13.0.jar
avro-1.10.0.jar
camel-jackson-3.7.0.jar httpcore-4.4.14.jar
kafka-connect-avro-converter-5.5.0.jar README.adoc
aws-java-sdk-core-1.11.714.jar
camel-kafka-3.7.0.jar ion-java-1.0.2.jar
kafka-connect-avro-data-5.5.0.jar retrofit-2.9.0.jar
aws-java-sdk-kms-1.11.714.jar
camel-kafka-connector-0.7.0.jar jackson-annotations-2.11.3.jar
kafka-schema-registry-client-5.5.0.jar slf4j-api-1.7.30.jar
aws-java-sdk-s3-1.11.714.jar
camel-main-3.7.0.jar jackson-core-2.11.3.jar
kafka-schema-serializer-5.5.0.jar snappy-java-1.1.7.3.jar
camel-api-3.7.0.jar
camel-management-api-3.7.0.jar jackson-core-asl-1.9.13.jar
kotlin-reflect-1.3.20.jar zstd-jni-1.4.4-7.jar
camel-aws-s3-3.7.0.jar
camel-support-3.7.0.jar jackson-databind-2.11.3.jar
kotlin-stdlib-1.3.20.jar
camel-aws-s3-kafka-connector-0.7.0.jar
camel-util-3.7.0.jar jackson-dataformat-avro-2.11.3.jar
kotlin-stdlib-common-1.3.20.jar
camel-base-3.7.0.jar
common-config-5.5.0.jar jackson-dataformat-cbor-2.11.3.jar LICENSE.txt
camel-base-engine-3.7.0.jar
commons-codec-1.15.jar jackson-datatype-jdk8-2.10.2.jar
lz4-java-1.7.1.jar
```
Basically, it contains unzipped
[camel-aws-s3-kafka-connector](https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws-s3-kafka-connector/0.7.0/camel-aws-s3-kafka-connector-0.7.0-package.zip)
plus the confluent's libraries for reading from Avro schema registry. I was
following [this
example](https://github.com/debezium/docker-images/blob/master/connect-base/1.2/Dockerfile#L39)
to get confluent's libraries
Do you have any suggestions on how I can proceed
PS: If I don't use the **io.confluent.connect.avro.AvroConverter** for the
values, but rather use **org.apache.kafka.connect.storage.StringConverter** it
all works great and reaches S3 with no issues. The problem is that the message
does not look great
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]