This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch exec-example in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector-examples.git
commit 8aeda83b34fdfcdf459722442e1e344f7df8c944 Author: Andrea Cosentino <[email protected]> AuthorDate: Wed Oct 28 08:48:36 2020 +0100 Added an Exec sink connector example --- exec/exec-sink/README.adoc | 81 ++++++++++++++++++++++ .../camel-kafka-exec-simple-producer/.gitignore | 4 ++ .../camel-kafka-exec-simple-producer/README.md | 7 ++ .../camel-kafka-exec-simple-producer/pom.xml | 77 ++++++++++++++++++++ .../camel/kafkaconnector/SimpleProducer.java | 29 ++++++++ .../src/main/resources/log4j.properties | 9 +++ .../config/CamelExecSinkConnector.properties | 27 ++++++++ 7 files changed, 234 insertions(+) diff --git a/exec/exec-sink/README.adoc b/exec/exec-sink/README.adoc new file mode 100644 index 0000000..f06ef7f --- /dev/null +++ b/exec/exec-sink/README.adoc @@ -0,0 +1,81 @@ +# Camel-Kafka-connector Exec Sink + +This is an example for Camel-Kafka-connector Exec Sink + +## Standalone + +### What is needed + +- The touch command + +### Running Kafka + +``` +$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties +$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties +$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic +``` + +### Setting up the needed bits and running the example + +You'll need to setup the plugin.path property in your kafka + +Open the `$KAFKA_HOME/config/connect-standalone.properties` + +and set the `plugin.path` property to your choosen location + +In this example we'll use `/home/oscerd/connectors/` + +``` +> cd /home/oscerd/connectors/ +> wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-exec-kafka-connector/0.6.0/camel-exec-kafka-connector-0.6.0-package.zip +> unzip camel-exec-kafka-connector-0.6.0-package.zip +``` + +Now it's time to setup the connectors + +Open the Exec sink configuration file + +``` +name=CamelExecSinkConnector +connector.class=org.apache.camel.kafkaconnector.exec.CamelExecSinkConnector +key.converter=org.apache.kafka.connect.storage.StringConverter +value.converter=org.apache.kafka.connect.storage.StringConverter + +topics=mytopic + +camel.sink.path.executable=touch +camel.sink.endpoint.args=/tmp/${body}-${headers.detail}.txt +``` + +Set the correct options in the file. + +Now you can run the example + +``` +$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelExecSinkConnector.properties +``` + +You'll need to send messages to mytopic. You can run the simple producer included in this project. So cd in the folder: + +``` +> cd camel-kafka-exec-simple-producer +> mvn compile exec:exec -Dkafka.topic.name=mytopic -Dkafka.key=1 -Dcamel.body="FileName" -Dcamel.header.detail="detail1" +``` + +Under tmp folder you should see the file created + +``` +> cd /tmp/ +> ls | grep FileName +FileName-detail1.txt +``` + +In the mytopic topic you should see the message too + +``` +> ./kafkacat -b localhost:9092 -t mytopic -f 'Headers: %h: Message value: %s\n' +Headers: CamelHeader.detail=detail1: Message value: FileName +% Reached end of topic mytopic [0] at offset 1 +``` + diff --git a/exec/exec-sink/camel-kafka-exec-simple-producer/.gitignore b/exec/exec-sink/camel-kafka-exec-simple-producer/.gitignore new file mode 100644 index 0000000..72bba6e --- /dev/null +++ b/exec/exec-sink/camel-kafka-exec-simple-producer/.gitignore @@ -0,0 +1,4 @@ +.project +.classpath +.settings/ +/target diff --git a/exec/exec-sink/camel-kafka-exec-simple-producer/README.md b/exec/exec-sink/camel-kafka-exec-simple-producer/README.md new file mode 100644 index 0000000..4a4622d --- /dev/null +++ b/exec/exec-sink/camel-kafka-exec-simple-producer/README.md @@ -0,0 +1,7 @@ +## Infinispan Kafka Producer + +Related to https://github.com/oscerd/infinispan-kafka-demo + +To run the producer: + +mvn clean compile exec:exec diff --git a/exec/exec-sink/camel-kafka-exec-simple-producer/pom.xml b/exec/exec-sink/camel-kafka-exec-simple-producer/pom.xml new file mode 100644 index 0000000..9f64f04 --- /dev/null +++ b/exec/exec-sink/camel-kafka-exec-simple-producer/pom.xml @@ -0,0 +1,77 @@ +<?xml version="1.0"?> +<project + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" + xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.camel.kafkaconnector</groupId> + <artifactId>camel-kafka-exec-simple-producer</artifactId> + <name>Camel Kafka Exec Simple Producer</name> + <version>0.0.1-SNAPSHOT</version> + + <dependencies> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.12</artifactId> + <version>2.5.0</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>2.5.0</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>2.11.3</version> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <version>1.2.17</version> + </dependency> + </dependencies> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>exec</goal> + </goals> + </execution> + </executions> + <configuration> + <executable>java</executable> + <arguments> + <argument>-classpath</argument> + <classpath /> + <argument>org.apache.camel.kafkaconnector.SimpleProducer</argument> + <argument>${kafka.topic.name}</argument> + <argument>${kafka.key}</argument> + <argument>${camel.body}</argument> + <argument>${camel.header.detail}</argument> + </arguments> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.2</version> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + </plugins> + </build> + + + +</project> diff --git a/exec/exec-sink/camel-kafka-exec-simple-producer/src/main/java/org/apache/camel/kafkaconnector/SimpleProducer.java b/exec/exec-sink/camel-kafka-exec-simple-producer/src/main/java/org/apache/camel/kafkaconnector/SimpleProducer.java new file mode 100644 index 0000000..dfc156e --- /dev/null +++ b/exec/exec-sink/camel-kafka-exec-simple-producer/src/main/java/org/apache/camel/kafkaconnector/SimpleProducer.java @@ -0,0 +1,29 @@ +package org.apache.camel.kafkaconnector; + +import java.util.Properties; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; + +public class SimpleProducer { + + public static void main(String[] args) { + + Properties props = new Properties(); + props.put("bootstrap.servers", "localhost:9092"); + props.put("key.serializer", StringSerializer.class.getName()); + props.put("value.serializer", StringSerializer.class.getName()); + + KafkaProducer<String, String> prod = new KafkaProducer<String, String>(props); + + + ProducerRecord<String, String> rec = new ProducerRecord<String, String>(args[0], args[1], args[2]); + + rec.headers().add("CamelHeader.detail", args[3].getBytes()); + + prod.send(rec); + + prod.close(); + } +} diff --git a/exec/exec-sink/camel-kafka-exec-simple-producer/src/main/resources/log4j.properties b/exec/exec-sink/camel-kafka-exec-simple-producer/src/main/resources/log4j.properties new file mode 100644 index 0000000..36af21f --- /dev/null +++ b/exec/exec-sink/camel-kafka-exec-simple-producer/src/main/resources/log4j.properties @@ -0,0 +1,9 @@ +# Root logger option +log4j.rootLogger=INFO, stdout + +# Direct log messages to stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n + diff --git a/exec/exec-sink/config/CamelExecSinkConnector.properties b/exec/exec-sink/config/CamelExecSinkConnector.properties new file mode 100644 index 0000000..8537766 --- /dev/null +++ b/exec/exec-sink/config/CamelExecSinkConnector.properties @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +name=CamelExecSinkConnector +connector.class=org.apache.camel.kafkaconnector.exec.CamelExecSinkConnector +tasks.max=1 +key.converter=org.apache.kafka.connect.storage.StringConverter +value.converter=org.apache.kafka.connect.storage.StringConverter + +topics=mytopic + +camel.sink.path.executable=touch +camel.sink.endpoint.args=/tmp/${body}-${headers.detail}.txt
