This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git
commit 65c83c8e75a86e97caf6dc127ad510dff9f8b8ee Author: Jason Huynh <[email protected]> AuthorDate: Wed Jan 22 18:57:35 2020 -0800 Changed all config to lower case Changed to CqStatusListener instead of plain CqListener --- src/main/java/kafka/GeodeConnectorConfig.java | 20 ++++++++++---------- src/main/java/kafka/GeodeKafkaSourceListener.java | 13 ++++++++++++- src/main/java/kafka/GeodeKafkaSourceTask.java | 3 --- 3 files changed, 22 insertions(+), 14 deletions(-) diff --git a/src/main/java/kafka/GeodeConnectorConfig.java b/src/main/java/kafka/GeodeConnectorConfig.java index b7140aa..003367d 100644 --- a/src/main/java/kafka/GeodeConnectorConfig.java +++ b/src/main/java/kafka/GeodeConnectorConfig.java @@ -3,39 +3,39 @@ package kafka; public class GeodeConnectorConfig { //Geode Configuration - public static final String DURABLE_CLIENT_ID_PREFIX = "DurableClientId"; + public static final String DURABLE_CLIENT_ID_PREFIX = "durableClientId"; public static final String DEFAULT_DURABLE_CLIENT_ID = ""; - public static final String DURABLE_CLIENT_TIME_OUT = "DurableClientTimeout"; + public static final String DURABLE_CLIENT_TIME_OUT = "durableClientTimeout"; public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000"; //GeodeKafka Specific Configuration - public static final String CQ_PREFIX = "CqPrefix"; - public static final String DEFAULT_CQ_PREFIX = "CqForGeodeKafka"; + public static final String CQ_PREFIX = "cqPrefix"; + public static final String DEFAULT_CQ_PREFIX = "cqForGeodeKafka"; /** * Specifies which Locators to connect to Apache Geode */ - public static final String LOCATORS = "Locators"; + public static final String LOCATORS = "locators"; public static final String DEFAULT_LOCATOR = "localhost[10334]"; /** * Specifies which Regions to connect in Apache Geode */ - public static final String REGIONS = "Regions"; + public static final String REGIONS = "regions"; /** * Specifies which Topics to connect in Kafka */ - public static final String TOPICS = "Topics"; + public static final String TOPICS = "topics"; /** * Property to describe the Source Partition in a record */ - public static final String REGION_NAME = "RegionName"; //used for Source Partition Events + public static final String REGION_NAME = "regionName"; //used for Source Partition Events - public static final String BATCH_SIZE = "GeodeConnectorBatchSize"; + public static final String BATCH_SIZE = "geodeConnectorBatchSize"; public static final String DEFAULT_BATCH_SIZE = "100"; - public static final String QUEUE_SIZE = "GeodeConnectorQueueSize"; + public static final String QUEUE_SIZE = "geodeConnectorQueueSize"; public static final String DEFAULT_QUEUE_SIZE = "100000"; } diff --git a/src/main/java/kafka/GeodeKafkaSourceListener.java b/src/main/java/kafka/GeodeKafkaSourceListener.java index ec94ee3..4c0e729 100644 --- a/src/main/java/kafka/GeodeKafkaSourceListener.java +++ b/src/main/java/kafka/GeodeKafkaSourceListener.java @@ -2,11 +2,12 @@ package kafka; import org.apache.geode.cache.query.CqEvent; import org.apache.geode.cache.query.CqListener; +import org.apache.geode.cache.query.CqStatusListener; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; -class GeodeKafkaSourceListener implements CqListener { +class GeodeKafkaSourceListener implements CqStatusListener { public String regionName; private BlockingQueue<GeodeEvent> eventBuffer; @@ -39,4 +40,14 @@ class GeodeKafkaSourceListener implements CqListener { public void onError(CqEvent aCqEvent) { } + + @Override + public void onCqDisconnected() { + //we should probably redistribute or reconnect + } + + @Override + public void onCqConnected() { + + } } diff --git a/src/main/java/kafka/GeodeKafkaSourceTask.java b/src/main/java/kafka/GeodeKafkaSourceTask.java index c463e6f..896b602 100644 --- a/src/main/java/kafka/GeodeKafkaSourceTask.java +++ b/src/main/java/kafka/GeodeKafkaSourceTask.java @@ -138,13 +138,10 @@ public class GeodeKafkaSourceTask extends SourceTask { void installListenersToRegion(int taskId, BlockingQueue<GeodeEvent> eventBuffer, String regionName, String cqPrefix, boolean isDurable) { CqAttributesFactory cqAttributesFactory = new CqAttributesFactory(); cqAttributesFactory.addCqListener(new GeodeKafkaSourceListener(eventBuffer, regionName)); - System.out.println("JASON installing on Geode"); CqAttributes cqAttributes = cqAttributesFactory.create(); try { - System.out.println("JASON installing new cq"); clientCache.getQueryService().newCq(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes, isDurable).execute(); - System.out.println("JASON finished installing cq"); } catch (CqExistsException e) { System.out.println("UHH"); e.printStackTrace();
