This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch transform in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit eaf5968cb23c2b3490e3932cb1282547a5a122e1 Author: Andrea Cosentino <[email protected]> AuthorDate: Mon Dec 9 15:45:40 2019 +0100 Camel-AWS S3 example: Use a Transformation instead of a converter --- .../converters/S3ObjectConverter.java | 43 ----------------- .../converters/S3ObjectTransformer.java | 56 ++++++++++++++++++++++ examples/CamelAWSS3SourceConnector.properties | 12 +++-- .../source/aws/s3/CamelAWSS3PropertyFactory.java | 4 +- 4 files changed, 65 insertions(+), 50 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/converters/S3ObjectConverter.java b/core/src/main/java/org/apache/camel/kafkaconnector/converters/S3ObjectConverter.java deleted file mode 100644 index 59678d6..0000000 --- a/core/src/main/java/org/apache/camel/kafkaconnector/converters/S3ObjectConverter.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.kafkaconnector.converters; - -import java.util.Map; -import com.amazonaws.services.s3.model.S3ObjectInputStream; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.SchemaAndValue; -import org.apache.kafka.connect.storage.Converter; - -public class S3ObjectConverter implements Converter { - - private final S3ObjectSerializer serializer = new S3ObjectSerializer(); - - @Override - public void configure(Map<String, ?> arg0, boolean arg1) { - } - - @Override - public byte[] fromConnectData(String topic, Schema schema, Object value) { - return serializer.serialize(topic, (S3ObjectInputStream) value); - } - - @Override - public SchemaAndValue toConnectData(String arg0, byte[] arg1) { - return null; - } - -} diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/converters/S3ObjectTransformer.java b/core/src/main/java/org/apache/camel/kafkaconnector/converters/S3ObjectTransformer.java new file mode 100644 index 0000000..1692e51 --- /dev/null +++ b/core/src/main/java/org/apache/camel/kafkaconnector/converters/S3ObjectTransformer.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.converters; + +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.transforms.Transformation; + +import com.amazonaws.services.s3.model.S3ObjectInputStream; + +public class S3ObjectTransformer<R extends ConnectRecord<R>> implements Transformation<R> { + + private final S3ObjectSerializer serializer = new S3ObjectSerializer(); + + public static final ConfigDef CONFIG_DEF = new ConfigDef() + .define("test", ConfigDef.Type.STRING, "test", ConfigDef.Importance.MEDIUM, + "Transform the content of a bucket into a string "); + + @Override + public void configure(Map<String, ?> configs) { + } + + @Override + public R apply(R record) { + byte[] v = serializer.serialize(record.topic(), (S3ObjectInputStream) record.value()); + String finalValue = new String(v); + return record.newRecord(record.topic(), record.kafkaPartition(), null, record.key(), Schema.STRING_SCHEMA, finalValue, record.timestamp()); + } + + @Override + public void close() { + } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + +} diff --git a/examples/CamelAWSS3SourceConnector.properties b/examples/CamelAWSS3SourceConnector.properties index bfee4e3..12f9efc 100644 --- a/examples/CamelAWSS3SourceConnector.properties +++ b/examples/CamelAWSS3SourceConnector.properties @@ -18,14 +18,16 @@ name=CamelAWSS3SourceConnector connector.class=org.apache.camel.kafkaconnector.CamelSourceConnector key.converter=org.apache.kafka.connect.storage.StringConverter -value.converter=org.apache.camel.kafkaconnector.converters.S3ObjectConverter +transforms=S3ObjectTransformer +transforms.S3ObjectTransformer.type=org.apache.camel.kafkaconnector.converters.S3ObjectTransformer + camel.source.maxPollDuration=10000 camel.source.kafka.topic=mytopic -camel.source.url=aws-s3://bucket?autocloseBody=false +camel.source.url=aws-s3://camel-kafka-connector?autocloseBody=false -camel.component.aws-s3.configuration.access-key=<youraccesskey> -camel.component.aws-s3.configuration.secret-key=<yoursecretkey> -camel.component.aws-s3.configuration.region=<yourregion> +camel.component.aws-s3.configuration.access-key=AKIAJ2ZAQSIYAPK4EX2Q +camel.component.aws-s3.configuration.secret-key=Z7qFOGWw9SiX5AjOBlzok1Nr6mQGEk27lI6FaPef +camel.component.aws-s3.configuration.region=EU_WEST_1 diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/CamelAWSS3PropertyFactory.java b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/CamelAWSS3PropertyFactory.java index 28fda93..d7c6456 100644 --- a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/CamelAWSS3PropertyFactory.java +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/CamelAWSS3PropertyFactory.java @@ -48,8 +48,8 @@ class CamelAWSS3PropertyFactory implements ConnectorPropertyFactory { connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.CamelSourceConnector"); connectorProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); - connectorProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.camel.kafkaconnector.converters.S3ObjectConverter"); - + connectorProps.put(ConnectorConfig.TRANSFORMS_CONFIG, "S3ObjectTransformer"); + connectorProps.put("transforms.S3ObjectTransformer.type", "org.apache.camel.kafkaconnector.converters.S3ObjectTransformer"); connectorProps.put("camel.source.kafka.topic", topic); String queueUrl = "aws-s3://" + bucket + "?maxMessagesPerPoll=10";
