This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git
commit 2778b1ebaede94733982184a590e42c2ce832498 Author: Jason Huynh <[email protected]> AuthorDate: Thu Jan 30 16:10:13 2020 -0800 Added info to readme --- README.md | 60 +++++++++++++++++++++- .../geode/kafka/sink/GeodeSinkConnectorConfig.java | 2 +- .../kafka/source/GeodeSourceConnectorConfig.java | 7 ++- 3 files changed, 63 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 2917b17..2db1fd9 100644 --- a/README.md +++ b/README.md @@ -1 +1,59 @@ -# geode-kafka-connector \ No newline at end of file +## geode-kafka-connector + +## What is geode-kafka-connector + +Kafka provides an integration point through Source and Sink Connectors. The GeodeKafkaSource allows Geode to be a data source for Kafka +The GeodeKafkaSink allows Geode to consume data off of topics and store data from Kafka. + +### How to install the geode-kafka-connector +#### Prequisite +Kafka is installed and is up and running. See the Kafka quickstart for more info: [Kafka Quickstart](https://kafka.apache.org/quickstart) + +Installation of the connector is similar in process to other Kafka Connectors. For now, we will follow the guide for [Manual Installation](https://docs.confluent.io/current/connect/managing/install.html#install-connector-manually) +In summary, we will use the standalone worker for this example. +* Explode a zip file or build into a known and Kafka accessible location +* Modify the connect-standalone.properties, + +* Create and modify connect-geode-source.properties file +//TODO +* Create and modify connect-geode-sink.properties files +//TODO + +* Run +bin/connect-standalone.sh config/connect-standalone.properties config/connect-geode-source.properties config/connect-geode-sink.properties + + +--- +### Configuration Properties +| Property | Required | Description | Default | +|---|---|---|---| +| locators | yes| A comma separated string of locators that configure which locators to connect to | "localhost[10334]" | +#### GeodeKafkaSink Properties +| Property | Required | Description| Default | +|---|---|---|---| +|topicToRegions| yes| A comma separated list of "one topic to many regions" bindings. Each binding is surrounded by brackets. For example "[topicName:regionName], [anotherTopic: regionName, anotherRegion]" | None. This is required to be set in the source connector properties +|nullValuesMeanRemove | no | If set to true, when topics send a SinkRecord with a null value, we will convert to an operation similar to region.remove instead of putting a null value into the region | true + +The topicToRegions property allows us to create mappings between topics and regions. A single one-to-one mapping would look similar to "[topic:region]" A one-to-many mapping can be made by comma separating the regions, for example "[topic:region1,region2]" This is equivalent to both regions being consumers of the topic. + +#### GeodeKafkaSource Properties +| Property | Required| Description| Default | +|---|---|---|---| +|regionToTopics| yes | A comma separated list of "one region to many topics" mappings. Each mapping is surrounded by brackets. For example "[regionName:topicName], "[anotherRegion: topicName, anotherTopic]" | None. This is required to be set in the source connector properties +|geodeConnectorBatchSize| no | Maximum number of records to return on each poll| 100 | +|geodeConnectorQueueSize| no | Maximum number of entries in the connector queue before backing up all Geode cq listeners sharing the task queue | 10000 | +| loadEntireRegion| no| Determines if we should queue up all entries that currently exist in the region. This allows us to copy existing region data. Will be replayed whenever a task needs to re-register a cq| true | +|durableClientIdPrefix| no | Prefix string for tasks to append to when registering as a durable client. If empty string, will not register as a durable client | "" | +| durableClientTimeout| no | How long in milliseconds to persist values in Geode's durable queue before the queue is invalidated| 60000 | +| cqPrefix| no| Prefix string to identify Connector cq's on a Geode server |cqForGeodeKafka | + +The regionToTopics property allows us to create mappings between regions and topics. A single one-to-one mapping would look similar to "[region:topic]" A one-to-many mapping can be made by comma separating the topics, for example "[region:topic1,topic2]" This is equivalent to the region be a producer for both topics + +--- + + +Possible Upcoming Featured: +Formatter - Possibly a JSON to and from PDX formatter +Security - security settings for Geode +Dynamic Region creation - Dynamically create regions when topics are created (filter what names to look for and what types of regions to create) +Allow a single worker to connect to multiple Geode Clusters? \ No newline at end of file diff --git a/src/main/java/geode/kafka/sink/GeodeSinkConnectorConfig.java b/src/main/java/geode/kafka/sink/GeodeSinkConnectorConfig.java index 7c6aa3e..56c8f5d 100644 --- a/src/main/java/geode/kafka/sink/GeodeSinkConnectorConfig.java +++ b/src/main/java/geode/kafka/sink/GeodeSinkConnectorConfig.java @@ -5,7 +5,7 @@ import java.util.Map; public class GeodeSinkConnectorConfig extends GeodeConnectorConfig { //Used by sink - public static final String TOPIC_TO_REGION_BINDINGS = "topicToRegion"; + public static final String TOPIC_TO_REGION_BINDINGS = "topicToRegions"; public static final String NULL_VALUES_MEAN_REMOVE = "nullValuesMeanRemove"; public static final String DEFAULT_NULL_VALUES_MEAN_REMOVE = "true"; diff --git a/src/main/java/geode/kafka/source/GeodeSourceConnectorConfig.java b/src/main/java/geode/kafka/source/GeodeSourceConnectorConfig.java index c29048f..69cb890 100644 --- a/src/main/java/geode/kafka/source/GeodeSourceConnectorConfig.java +++ b/src/main/java/geode/kafka/source/GeodeSourceConnectorConfig.java @@ -9,7 +9,7 @@ import java.util.Map; public class GeodeSourceConnectorConfig extends GeodeConnectorConfig { //Geode Configuration - public static final String DURABLE_CLIENT_ID_PREFIX = "durableClientId"; + public static final String DURABLE_CLIENT_ID_PREFIX = "durableClientIdPrefix"; public static final String DEFAULT_DURABLE_CLIENT_ID = ""; public static final String DURABLE_CLIENT_TIME_OUT = "durableClientTimeout"; public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000"; @@ -21,14 +21,13 @@ public class GeodeSourceConnectorConfig extends GeodeConnectorConfig { * Used as a key for source partitions */ public static final String REGION = "region"; - - public static final String REGION_TO_TOPIC_BINDINGS = "regionToTopic"; + public static final String REGION_TO_TOPIC_BINDINGS = "regionToTopics"; public static final String BATCH_SIZE = "geodeConnectorBatchSize"; public static final String DEFAULT_BATCH_SIZE = "100"; public static final String QUEUE_SIZE = "geodeConnectorQueueSize"; - public static final String DEFAULT_QUEUE_SIZE = "100000"; + public static final String DEFAULT_QUEUE_SIZE = "10000"; public static final String LOAD_ENTIRE_REGION = "loadEntireRegion"; public static final String DEFAULT_LOAD_ENTIRE_REGION = "false";
