This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch zip-example in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector-examples.git
commit 06246bfa3921bcf2d96c68cb0753c143345b218a Author: Andrea Cosentino <[email protected]> AuthorDate: Wed Aug 26 12:03:32 2020 +0200 Added an AWS2-S3 zip aggregation example --- .../aws2-s3-sink-with-zip-aggregation/README.adoc | 185 +++++++++++++++++++++ .../config/CamelAWS2S3SinkConnector.properties | 35 ++++ 2 files changed, 220 insertions(+) diff --git a/aws2-s3/aws2-s3-sink-with-zip-aggregation/README.adoc b/aws2-s3/aws2-s3-sink-with-zip-aggregation/README.adoc new file mode 100644 index 0000000..a631ad5 --- /dev/null +++ b/aws2-s3/aws2-s3-sink-with-zip-aggregation/README.adoc @@ -0,0 +1,185 @@ +# Camel-Kafka-connector AWS2 S3 Sink + +## Introduction + +This is an example for Camel-Kafka-connector AWS2-S3 Sink + +## What is needed + +- An AWS S3 bucket + +## Running Kafka + +``` +$KAFKA_HOME/bin/zookeeper-server-start.sh config/zookeeper.properties +$KAFKA_HOME/bin/kafka-server-start.sh config/server.properties +$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic +``` + +## Setting up the needed bits and running the example + +You'll need to setup the plugin.path property in your kafka + +Open the `$KAFKA_HOME/config/connect-standalone.properties` + +and set the `plugin.path` property to your choosen location + +You'll need to build your connector starting from an archetype: + +``` +> mvn archetype:generate -DarchetypeGroupId=org.apache.camel.kafkaconnector.archetypes -DarchetypeArtifactId=camel-kafka-connector-extensible-archetype -DarchetypeVersion=0.4.0 +[INFO] Using property: camel-kafka-connector-version = 0.4.0 +Confirm properties configuration: +groupId: org.apache.camel.ckc +artifactId: aws2s3 +version: 1.0-SNAPSHOT +package: org.apache.camel.ckc +camel-kafka-connector-version: 0.4.0 + Y: : Y +[INFO] ---------------------------------------------------------------------------- +[INFO] Using following parameters for creating project from Archetype: camel-kafka-connector-extensible-archetype:0.4.0 +[INFO] ---------------------------------------------------------------------------- +[INFO] Parameter: groupId, Value: org.apache.camel.ckc +[INFO] Parameter: artifactId, Value: aws2s3 +[INFO] Parameter: version, Value: 1.0-SNAPSHOT +[INFO] Parameter: package, Value: org.apache.camel.ckc +[INFO] Parameter: packageInPathFormat, Value: org/apache/camel/ckc +[INFO] Parameter: package, Value: com.github.oscerd +[INFO] Parameter: version, Value: 1.0-SNAPSHOT +[INFO] Parameter: groupId, Value: org.apache.camel.ckc +[INFO] Parameter: camel-kafka-connector-version, Value: 0.4.0 +[INFO] Parameter: artifactId, Value: aws2s3 +[INFO] Project created from Archetype in dir: /home/workspace/miscellanea/aws2s3 +[INFO] ------------------------------------------------------------------------ +[INFO] BUILD SUCCESS +[INFO] ------------------------------------------------------------------------ +[INFO] Total time: 30.084 s +[INFO] Finished at: 2020-08-26T11:08:21+02:00 +[INFO] ------------------------------------------------------------------------ +> cd /home/workspace/miscellanea/aws2s3 +``` + +Now we need to edit the POM + + +``` + . + . + . + <version>1.0-SNAPSHOT</version> + + <name>A Camel Kafka Connector extended</name> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + <kafka-version>2.5.0</kafka-version> + <camel-kafka-connector-version>${project.version}</camel-kafka-connector-version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>connect-api</artifactId> + <scope>provided</scope> + <version>${kafka-version}</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>connect-transforms</artifactId> + <scope>provided</scope> + <version>${kafka-version}</version> + </dependency> + <dependency> + <groupId>org.apache.camel.kafkaconnector</groupId> + <artifactId>camel-kafka-connector</artifactId> + <version>0.4.0</version> + </dependency> + <dependency> + <groupId>org.apache.camel.kafkaconnector</groupId> + <artifactId>camel-aws2-s3-kafka-connector</artifactId> + <version>0.4.0</version> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-zipfile</artifactId> + <version>3.4.2</version> + </dependency> + </dependencies> + . + . + . +``` + +In the dependencies section you'll need to uncomment the aws2-s3 connector dependency and adding the camel-zipfile component + +Now we need to build the connector: + +``` +> mvn clean package +``` + +In this example we'll use `/home/oscerd/connectors/` as plugin.path, but we'll need the generated zip from the previois build + +``` +> cd /home/oscerd/connectors/ +> cp /home/workspace/miscellanea/aws2s3/target/aws2s3-1.0-SNAPSHOT-package.zip . +> unzip aws2s3-1.0-SNAPSHOT-package.zip +``` + +Now it's time to setup the connectors + +Open the AWS2 S3 configuration file + +``` +name=CamelAWS2S3SourceConnector +connector.class=org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SinkConnector +key.converter=org.apache.kafka.connect.storage.StringConverter +value.converter=org.apache.kafka.connect.storage.StringConverter + +topics=mytopic + +camel.sink.path.bucketNameOrArn=camel-kafka-connector + +camel.component.aws2-s3.access-key=xxxx +camel.component.aws2-s3.secret-key=yyyy +camel.component.aws2-s3.region=eu-west-1 + +camel.sink.endpoint.keyName=${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}.zip + +camel.beans.aggregate=#class:org.apache.camel.processor.aggregate.zipfile.ZipAggregationStrategy +camel.beans.aggregation.size=10 +camel.beans.aggregation.timeout=5000 +``` + +and add the correct credentials for AWS. + +Now you can run the example + +``` +$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelAWS2S3SinkConnector.properties +``` + +Just connect to your AWS Console and check the content of camel-kafka-connector bucket. + +On a different terminal run the kafka-producer and send messages to your Kafka Broker. + +``` +bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic mytopic +Kafka to S3 message 1 +Kafka to S3 message 2 +Kafka to S3 message 3 +Kafka to S3 message 4 +Kafka to S3 message 5 +``` + +You should see (after the timeout has been reached) a file with date-exchangeId.zip name containing the following multiple files. Those files will contain the messages. + +``` +Kafka to S3 message 1 +Kafka to S3 message 2 +Kafka to S3 message 3 +Kafka to S3 message 4 +Kafka to S3 message 5 +``` + diff --git a/aws2-s3/aws2-s3-sink-with-zip-aggregation/config/CamelAWS2S3SinkConnector.properties b/aws2-s3/aws2-s3-sink-with-zip-aggregation/config/CamelAWS2S3SinkConnector.properties new file mode 100644 index 0000000..794b955 --- /dev/null +++ b/aws2-s3/aws2-s3-sink-with-zip-aggregation/config/CamelAWS2S3SinkConnector.properties @@ -0,0 +1,35 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +name=CamelAWS2S3SourceConnector +connector.class=org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SinkConnector +key.converter=org.apache.kafka.connect.storage.StringConverter +value.converter=org.apache.kafka.connect.storage.StringConverter + +topics=mytopic + +camel.sink.path.bucketNameOrArn=camel-kafka-connector + +camel.component.aws2-s3.access-key=xxxx +camel.component.aws2-s3.secret-key=yyyy +camel.component.aws2-s3.region=eu-west-1 + +camel.sink.endpoint.keyName=${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}.zip + +camel.beans.aggregate=#class:org.apache.camel.processor.aggregate.zipfile.ZipAggregationStrategy +camel.beans.aggregation.size=10 +camel.beans.aggregation.timeout=5000
