This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch aws2-s3-converters in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 3f69c9e8207c0273238a34625bf944fb21952a71 Author: Andrea Cosentino <[email protected]> AuthorDate: Wed Jun 10 15:20:20 2020 +0200 AWS2-S3 connector: Added the same set of converters/transformers we have in AWS-S3 --- .../aws2s3/converters/S3ObjectConverter.java | 46 +++++++++++++++++ .../aws2s3/serializers/S3ObjectSerializer.java | 60 ++++++++++++++++++++++ .../aws2s3/transforms/S3ObjectTransforms.java | 56 ++++++++++++++++++++ 3 files changed, 162 insertions(+) diff --git a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/converters/S3ObjectConverter.java b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/converters/S3ObjectConverter.java new file mode 100644 index 0000000..6195452 --- /dev/null +++ b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/converters/S3ObjectConverter.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.aws2s3.converters; + +import java.util.Map; + +import org.apache.camel.kafkaconnector.aws2s3.serializers.S3ObjectSerializer; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.storage.Converter; + +import software.amazon.awssdk.core.ResponseInputStream; + +public class S3ObjectConverter implements Converter { + + private final S3ObjectSerializer serializer = new S3ObjectSerializer(); + + @Override + public void configure(Map<String, ?> configs, boolean isKey) { + } + + @Override + public byte[] fromConnectData(String topic, Schema schema, Object value) { + return serializer.serialize(topic, (ResponseInputStream) value); + } + + @Override + public SchemaAndValue toConnectData(String arg0, byte[] arg1) { + return null; + } + +} diff --git a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/serializers/S3ObjectSerializer.java b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/serializers/S3ObjectSerializer.java new file mode 100644 index 0000000..99d4e91 --- /dev/null +++ b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/serializers/S3ObjectSerializer.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.aws2s3.serializers; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; + + +import org.apache.kafka.common.serialization.Serializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import software.amazon.awssdk.core.ResponseInputStream; + +public class S3ObjectSerializer implements Serializer<ResponseInputStream> { + private static final Logger LOG = LoggerFactory.getLogger(S3ObjectSerializer.class); + + @Override + public void configure(Map<String, ?> configs, boolean isKey) { + } + + @Override + public byte[] serialize(String topic, ResponseInputStream data) { + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + + int nRead; + byte[] byteArray = new byte[16384]; + + try { + while ((nRead = data.read(byteArray, 0, byteArray.length)) != -1) { + buffer.write(byteArray, 0, nRead); + } + } catch (IOException e) { + LOG.warn("I/O error while serializing data from or to topic {}: {} | {}", topic, e.getMessage(), e); + } + + return buffer.toByteArray(); + } + + @Override + public void close() { + } + +} diff --git a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transforms/S3ObjectTransforms.java b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transforms/S3ObjectTransforms.java new file mode 100644 index 0000000..8ecd74f --- /dev/null +++ b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transforms/S3ObjectTransforms.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.aws2s3.transforms; + +import java.util.Map; + +import org.apache.camel.kafkaconnector.aws2s3.serializers.S3ObjectSerializer; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.transforms.Transformation; + +import software.amazon.awssdk.core.ResponseInputStream; + +public class S3ObjectTransforms<R extends ConnectRecord<R>> implements Transformation<R> { + + public static final ConfigDef CONFIG_DEF = new ConfigDef() + .define("test", ConfigDef.Type.STRING, "test", ConfigDef.Importance.MEDIUM, "Transform the content of a bucket into a string "); + + private final S3ObjectSerializer serializer = new S3ObjectSerializer(); + + @Override + public void configure(Map<String, ?> configs) { + } + + @Override + public R apply(R record) { + byte[] v = serializer.serialize(record.topic(), (ResponseInputStream) record.value()); + String finalValue = new String(v); + return record.newRecord(record.topic(), record.kafkaPartition(), null, record.key(), Schema.STRING_SCHEMA, finalValue, record.timestamp()); + } + + @Override + public void close() { + } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + +}
