gohanbg opened a new issue #843:
URL: https://github.com/apache/camel-kafka-connector/issues/843


   Hello,
    
   I have been playing around with Strimzi and camel's kafka connectors, in 
particular the S3 sink connector.
   
   My setup consists of Strimzi kafka and Confluent schema registry, all 
running in k8s
   
   The **KafkaConnect** looks like this:
   ```yaml
   apiVersion: kafka.strimzi.io/v1beta1
   kind: KafkaConnect
   metadata:
     namespace: messaging
     name: aws-connect
     annotations:
       strimzi.io/use-connector-resources: "true"
   spec:
     version: 2.5.0
     image: my-image
     logging:
       type: inline
       loggers:
         connect.root.logger.level: "INFO"
     replicas: 1
     bootstrapServers: my-cluster-kafka-brokers.messaging.svc:9092
     externalConfiguration:
       volumes:
         - name: kafka-aws-credentials
           secret:
             secretName: kafka-aws-credentials
     config:
       config.providers: file
       config.providers.file.class: 
org.apache.kafka.common.config.provider.FileConfigProvider
       key.converter: org.apache.kafka.connect.storage.StringConverter
       value.converter: io.confluent.connect.avro.AvroConverter
       value.converter.schema.registry.url: 
http://confluent-cp-schema-registry.messaging.svc:8081
       key.converter.schemas.enable: false
       value.converter.schemas.enable: true
     template:
       pod:
         imagePullSecrets:
           - name: docker-registry-secret
   ```
   
   the **KafkaConnector** looks like this:
   ```yaml
   apiVersion: kafka.strimzi.io/v1alpha1
   kind: KafkaConnector
   metadata:
     name: s3-sink-connector
     labels:
       strimzi.io/cluster: aws-connect
   spec:
     class: org.apache.camel.kafkaconnector.awss3.CamelAwss3SinkConnector
     tasksMax: 1
     config:
       key.converter: org.apache.kafka.connect.storage.StringConverter
       value.converter: io.confluent.connect.avro.AvroConverter
       value.converter.schema.registry.url: 
http://confluent-cp-schema-registry.messaging.svc:8081
       topics: topic-to-export
       camel.sink.path.bucketNameOrArn: s3-bucket-name
       camel.sink.endpoint.keyName: ${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}
       camel.sink.maxPollDuration: 10000
       camel.component.aws-s3.configuration.autocloseBody: false
       camel.component.aws-s3.accessKey: 
${file:/opt/kafka/external-configuration/kafka-aws-credentials/kafka-s3-credentials.properties:aws_access_key_id}
       camel.component.aws-s3.secretKey: 
${file:/opt/kafka/external-configuration/kafka-aws-credentials/kafka-s3-credentials.properties:aws_secret_access_key}
       camel.component.aws-s3.region: S3_REGION
   ```
   
   When I run the Connector it starts reading the topic correctly, but just 
before sending it to S3 it says the following:
   ```error
   org.apache.camel.NoTypeConversionAvailableException: No type converter 
available to convert from type: org.apache.kafka.connect.data.Struct to the 
required type: java.io.InputStream
   ```
   
   
   I can see that the **Struct** message contains the correct data from the 
logs, but it can not be converted to InputStream.
   I just want to have the message as a JSON in S3 at the end. Not any special 
transformations.
   
   Here is how my **KafkaConnect** image looks like
   ```dockerfile
   FROM strimzi/kafka:0.17.0-kafka-2.4.0
   USER root:root
   RUN mkdir -p /opt/kafka/plugins/camel-aws-s3
   COPY ./camel-aws-s3/* /opt/kafka/plugins/camel-aws-s3/
   USER 1001
   ```
   and the camel-aws-s3 directory, from which I'm copying looks like this:
   ```
   annotations-13.0.jar                                        
camel-core-engine-3.7.0.jar      commons-compress-1.20.jar           
jackson-mapper-asl-1.9.13.jar             medeia-validator-core-1.1.1.jar
   apicurio-registry-common-1.3.1.Final.jar                    
camel-core-languages-3.7.0.jar   commons-logging-1.2.jar             
jboss-jaxrs-api_2.1_spec-2.0.1.Final.jar  medeia-validator-jackson-1.1.1.jar
   apicurio-registry-distro-connect-converter-1.3.0.Final.jar  
camel-core-model-3.7.0.jar       common-utils-5.5.0.jar              
jmespath-java-1.11.714.jar                NOTICE.txt
   apicurio-registry-rest-client-1.3.1.Final.jar               
camel-core-processor-3.7.0.jar   connect-json-2.6.0.jar              
joda-time-2.8.1.jar                       okhttp-3.14.9.jar
   apicurio-registry-utils-converter-1.3.1.Final.jar           
camel-core-reifier-3.7.0.jar     converter-jackson-2.9.0.jar         
kafka-avro-serializer-5.5.0.jar           okio-1.17.2.jar
   apicurio-registry-utils-serde-1.3.1.Final.jar               
camel-direct-3.7.0.jar           httpclient-4.5.13.jar               
kafka-clients-2.6.0.jar                   protobuf-java-3.13.0.jar
   avro-1.10.0.jar                                             
camel-jackson-3.7.0.jar          httpcore-4.4.14.jar                 
kafka-connect-avro-converter-5.5.0.jar    README.adoc
   aws-java-sdk-core-1.11.714.jar                              
camel-kafka-3.7.0.jar            ion-java-1.0.2.jar                  
kafka-connect-avro-data-5.5.0.jar         retrofit-2.9.0.jar
   aws-java-sdk-kms-1.11.714.jar                               
camel-kafka-connector-0.7.0.jar  jackson-annotations-2.11.3.jar      
kafka-schema-registry-client-5.5.0.jar    slf4j-api-1.7.30.jar
   aws-java-sdk-s3-1.11.714.jar                                
camel-main-3.7.0.jar             jackson-core-2.11.3.jar             
kafka-schema-serializer-5.5.0.jar         snappy-java-1.1.7.3.jar
   camel-api-3.7.0.jar                                         
camel-management-api-3.7.0.jar   jackson-core-asl-1.9.13.jar         
kotlin-reflect-1.3.20.jar                 zstd-jni-1.4.4-7.jar
   camel-aws-s3-3.7.0.jar                                      
camel-support-3.7.0.jar          jackson-databind-2.11.3.jar         
kotlin-stdlib-1.3.20.jar
   camel-aws-s3-kafka-connector-0.7.0.jar                      
camel-util-3.7.0.jar             jackson-dataformat-avro-2.11.3.jar  
kotlin-stdlib-common-1.3.20.jar
   camel-base-3.7.0.jar                                        
common-config-5.5.0.jar          jackson-dataformat-cbor-2.11.3.jar  LICENSE.txt
   camel-base-engine-3.7.0.jar                                 
commons-codec-1.15.jar           jackson-datatype-jdk8-2.10.2.jar    
lz4-java-1.7.1.jar
   
   ```
   Basically, it contains unzipped 
[camel-aws-s3-kafka-connector](https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws-s3-kafka-connector/0.7.0/camel-aws-s3-kafka-connector-0.7.0-package.zip)
 plus the confluent's libraries for reading from Avro schema registry. I was 
following [this 
example](https://github.com/debezium/docker-images/blob/master/connect-base/1.2/Dockerfile#L39)
 to get confluent's libraries
   
   Do you have any suggestions on how I can proceed 
   
   PS: If I don't use the **io.confluent.connect.avro.AvroConverter** for the 
values, but rather use **org.apache.kafka.connect.storage.StringConverter** it 
all works great and reaches S3 with no issues. The problem is that the message 
does not look great


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to