This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch cql-sink in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector-examples.git
commit 4875732934d0b8dd0830f280cb60f32c75fbe4af Author: Andrea Cosentino <[email protected]> AuthorDate: Fri May 15 14:29:27 2020 +0200 Added CQL Sink example --- README.adoc | 1 + cql-sink/CamelCassandraQLSinkConnector.properties | 29 +++++ cql-sink/README.md | 141 ++++++++++++++++++++++ 3 files changed, 171 insertions(+) diff --git a/README.adoc b/README.adoc index e0b0be4..e7da7d9 100644 --- a/README.adoc +++ b/README.adoc @@ -4,3 +4,4 @@ List of existing examples: - aws-s3 to JMS example - aws2-sqs source example +- cql sink example diff --git a/cql-sink/CamelCassandraQLSinkConnector.properties b/cql-sink/CamelCassandraQLSinkConnector.properties new file mode 100644 index 0000000..aaa63ce --- /dev/null +++ b/cql-sink/CamelCassandraQLSinkConnector.properties @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +name=CamelCassandraQLSinkConnector +topics=mytopic +tasks.max=1 +connector.class=org.apache.camel.kafkaconnector.cql.CamelCqlSinkConnector +key.converter=org.apache.kafka.connect.storage.StringConverter +value.converter=org.apache.kafka.connect.storage.StringConverter + +camel.sink.path.hosts=172.17.0.2 +camel.sink.path.keyspace=9042/test +camel.sink.endpoint.cql=insert into users(id, name) values (now(), ?) + + diff --git a/cql-sink/README.md b/cql-sink/README.md new file mode 100644 index 0000000..c3db678 --- /dev/null +++ b/cql-sink/README.md @@ -0,0 +1,141 @@ +# Camel-Kafka-connector CQL Sink + +## Introduction + +This is an example for Camel-Kafka-connector CQL + +## What is needed + +- A Cassandra instance + +## Running Kafka + +``` +$KAFKA_HOME/bin/zookeeper-server-start.sh config/zookeeper.properties +$KAFKA_HOME/bin/kafka-server-start.sh config/server.properties +$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test +``` + +## Setting up the needed bits and running the example + +You'll need to setup the plugin.path property in your kafka + +Open the `$KAFKA_HOME/config/connect-standalone.properties` + +and set the `plugin.path` property to your choosen location + +In this example we'll use `/home/oscerd/connectors/` + +``` +> cd /home/oscerd/connectors/ +> wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-cql-kafka-connector/0.1.0/camel-cql-kafka-connector-0.1.0-package.zip +> unzip camel-cql-kafka-connector-0.1.0-package.zip +``` + +## Setting up Apache Cassandra + +This examples require a running Cassandra instance, for simplicity the steps below show how to start Cassandra using Docker. First you'll need to run a Cassandra instance: + +[source,bash] +---- +docker run --name master_node --env MAX_HEAP_SIZE='800M' -dt oscerd/cassandra +---- + +Next, check and make sure Cassandra is running: + +[source,bash] +---- +docker exec -ti master_node /opt/cassandra/bin/nodetool status +Datacenter: datacenter1 +======================= +Status=Up/Down +|/ State=Normal/Leaving/Joining/Moving +-- Address Load Tokens Owns (effective) Host ID Rack +UN 172.17.0.2 251.32 KiB 256 100.0% 5126aaad-f143-43e9-920a-0f9540a93967 rack1 +---- + +To populate the database using to the `cqlsh` tool, you'll need a local installation of Cassandra. Download and extract the Apache Cassandra distribution to a directory. We reference the Cassandra installation directory with `LOCAL_CASSANDRA_HOME`. Here we use version 3.11.4 to connect to the Cassandra instance we started using Docker. + +[source,bash] +---- +<LOCAL_CASSANDRA_HOME>/bin/cqlsh $(docker inspect --format='{{ .NetworkSettings.IPAddress }}' master_node) +---- + +Next, execute the following script to create keyspace `test`, the table `users` and insert one row into it. + +[source,bash] +---- +create keyspace test with replication = {'class':'SimpleStrategy', 'replication_factor':3}; +use test; +create table users ( id timeuuid primary key, name text ); +insert into users (id,name) values (now(), 'oscerd'); +quit; +---- + +In the configuration `.properties` file we use below the IP address of the Cassandra master node needs to be configured, replace the value `172.17.0.2` in the `camel.source.url` or `localhost` in `camel.sink.url` configuration property with the IP of the master node obtained from Docker. Each example uses a different `.properties` file shown in the command line to run the example. + +[source,bash] +---- +docker inspect --format='{{ .NetworkSettings.IPAddress }}' master_node +---- + +Now it's time to setup the connectors + +Open the CQL Sink configuration file + +``` +name=CamelCassandraQLSinkConnector +topics=mytopic +tasks.max=1 +connector.class=org.apache.camel.kafkaconnector.cql.CamelCqlSinkConnector +key.converter=org.apache.kafka.connect.storage.StringConverter +value.converter=org.apache.kafka.connect.storage.StringConverter + +camel.sink.path.hosts=172.17.0.2 +camel.sink.path.keyspace=9042/test +camel.sink.endpoint.cql=insert into users(id, name) values (now(), ?) +``` + +and add the correct credentials for AWS. + +Now you can run the example + +``` +$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelCassandraQLSinkConnector.properties +``` + +On a different terminal run the kafka-producer and you should see messages from the Cassandra test keyspace populated + +``` +kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic +>message +``` +You can verify the behavior through the following command + +[source,bash] +---- +<LOCAL_CASSANDRA_HOME>/bin/cqlsh $(docker inspect --format='{{ .NetworkSettings.IPAddress }}' master_node) +---- + +Next, execute the following script to create keyspace `test`, the table `users` and insert one row into it. + +[source,bash] +---- +use test; +select * from users; +---- + +and you should see + +[source,bash] +---- +(2 rows) +cqlsh:test> select * from users; + + id | name +--------------------------------------+---------- + 6cbe74a0-96a6-11ea-a8ff-09d03512038e | message + fc2c66c0-96a5-11ea-a8ff-09d03512038e | oscerd + +---- +
