This is an automated email from the ASF dual-hosted git repository.

nnag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git

commit 6d2ec84ccc0ad22a1e46be54fb7a4952bcfaa924
Author: Jason Huynh <[email protected]>
AuthorDate: Wed Jan 22 21:35:00 2020 -0800

    Added GeodeContext to handle cache related operations
    Added GeodeConnectorConfig for shared configs
    Moved packages
---
 .../java/geode/kafka/GeodeConnectorConfig.java     | 132 ++++++++++++++
 .../java/{ => geode}/kafka/LocatorHostPort.java    |   2 +-
 .../kafka/sink}/GeodeKafkaSink.java                |   2 +-
 .../{kafka => geode/kafka/source}/GeodeEvent.java  |   2 +-
 .../kafka/source}/GeodeKafkaSource.java            |  36 ++--
 .../kafka/source}/GeodeKafkaSourceListener.java    |   3 +-
 .../geode/kafka/source/GeodeKafkaSourceTask.java   | 133 ++++++++++++++
 src/main/java/kafka/GeodeConnectorConfig.java      |  41 -----
 src/main/java/kafka/GeodeKafkaSourceTask.java      | 196 ---------------------
 .../{ => geode}/kafka/GeodeKafkaTestCluster.java   |   3 +-
 .../java/{ => geode}/kafka/GeodeLocalCluster.java  |   2 +-
 src/test/java/{ => geode}/kafka/JavaProcess.java   |   2 +-
 .../java/{ => geode}/kafka/KafkaLocalCluster.java  |   2 +-
 .../{ => geode}/kafka/LocatorLauncherWrapper.java  |   2 +-
 .../{ => geode}/kafka/ServerLauncherWrapper.java   |   2 +-
 .../{ => geode}/kafka/WorkerAndHerderCluster.java  |   2 +-
 .../{ => geode}/kafka/WorkerAndHerderWrapper.java  |  15 +-
 .../{ => geode}/kafka/ZooKeeperLocalCluster.java   |   2 +-
 .../kafka/source}/GeodeKafkaSourceTaskTest.java    |  37 +---
 .../kafka/source}/GeodeKafkaSourceTest.java        |   2 +-
 20 files changed, 306 insertions(+), 312 deletions(-)

diff --git a/src/main/java/geode/kafka/GeodeConnectorConfig.java 
b/src/main/java/geode/kafka/GeodeConnectorConfig.java
new file mode 100644
index 0000000..9ac561f
--- /dev/null
+++ b/src/main/java/geode/kafka/GeodeConnectorConfig.java
@@ -0,0 +1,132 @@
+package geode.kafka;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class GeodeConnectorConfig {
+
+    //Geode Configuration
+    public static final String DURABLE_CLIENT_ID_PREFIX = "durableClientId";
+    public static final String DEFAULT_DURABLE_CLIENT_ID = "";
+    public static final String DURABLE_CLIENT_TIME_OUT = 
"durableClientTimeout";
+    public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000";
+
+
+    //GeodeKafka Specific Configuration
+    public static final String TASK_ID = "GEODE_TASK_ID"; //One config per task
+
+    public static final String CQ_PREFIX = "cqPrefix";
+    public static final String DEFAULT_CQ_PREFIX = "cqForGeodeKafka";
+    /**
+     * Specifies which Locators to connect to Apache Geode
+     */
+    public static final String LOCATORS = "locators";
+    public static final String DEFAULT_LOCATOR = "localhost[10334]";
+
+    /**
+     * Specifies which Regions to connect in Apache Geode
+     */
+    public static final String REGIONS = "regions";
+
+    /**
+     * Specifies which Topics to connect in Kafka
+     */
+    public static final String TOPICS = "topics";
+
+    /**
+     * Property to describe the Source Partition in a record
+     */
+    public static final String REGION_NAME = "regionName";  //used for Source 
Partition Events
+
+    public static final String BATCH_SIZE = "geodeConnectorBatchSize";
+    public static final String DEFAULT_BATCH_SIZE = "100";
+
+    public static final String QUEUE_SIZE = "geodeConnectorQueueSize";
+    public static final String DEFAULT_QUEUE_SIZE = "100000";
+
+
+
+    private final int taskId;
+    private final String durableClientId;
+    private final String durableClientIdPrefix;
+    private final String durableClientTimeout;
+    private List<String> regionNames;
+    private List<String> topics;
+    private List<LocatorHostPort> locatorHostPorts;
+
+    //just for tests
+    GeodeConnectorConfig() {
+        taskId = 0;
+        durableClientId = "";
+        durableClientIdPrefix = "";
+        durableClientTimeout = "0";
+    }
+
+    public GeodeConnectorConfig(Map<String, String> connectorProperties) {
+        taskId = Integer.parseInt(connectorProperties.get(TASK_ID));
+        durableClientIdPrefix = 
connectorProperties.get(DURABLE_CLIENT_ID_PREFIX);
+        if (isDurable(durableClientIdPrefix)) {
+            durableClientId = durableClientIdPrefix + taskId;
+        }
+        else {
+            durableClientId = "";
+        }
+        durableClientTimeout = 
connectorProperties.get(DURABLE_CLIENT_TIME_OUT);
+        regionNames = 
parseNames(connectorProperties.get(GeodeConnectorConfig.REGIONS));
+        topics = 
parseNames(connectorProperties.get(GeodeConnectorConfig.TOPICS));
+        locatorHostPorts = 
parseLocators(connectorProperties.get(GeodeConnectorConfig.LOCATORS));
+    }
+
+    List<String> parseNames(String names) {
+        return Arrays.stream(names.split(",")).map((s) -> 
s.trim()).collect(Collectors.toList());
+    }
+
+    List<LocatorHostPort> parseLocators(String locators) {
+        return Arrays.stream(locators.split(",")).map((s) -> {
+            String locatorString = s.trim();
+            return parseLocator(locatorString);
+        }).collect(Collectors.toList());
+    }
+
+    private LocatorHostPort parseLocator(String locatorString) {
+        String[] splits = locatorString.split("\\[");
+        String locator = splits[0];
+        int port = Integer.parseInt(splits[1].replace("]", ""));
+        return new LocatorHostPort(locator, port);
+    }
+
+    public boolean isDurable() {
+        return isDurable(durableClientId);
+    }
+
+    /**
+     * @param durableClientId or prefix can be passed in.  Either both will be 
"" or both will have a value
+     * @return
+     */
+    boolean isDurable(String durableClientId) {
+        return !durableClientId.equals("");
+    }
+
+
+    public String getDurableClientId() {
+        return durableClientId;
+    }
+
+    public String getDurableClientTimeout() {
+        return durableClientTimeout;
+    }
+
+    public List<String> getRegionNames() {
+        return regionNames;
+    }
+
+    public List<String> getTopics() {
+        return topics;
+    }
+
+    public List<LocatorHostPort> getLocatorHostPorts() {
+        return locatorHostPorts;
+    }
+}
diff --git a/src/main/java/kafka/LocatorHostPort.java 
b/src/main/java/geode/kafka/LocatorHostPort.java
similarity index 95%
rename from src/main/java/kafka/LocatorHostPort.java
rename to src/main/java/geode/kafka/LocatorHostPort.java
index 517bad7..50d7440 100644
--- a/src/main/java/kafka/LocatorHostPort.java
+++ b/src/main/java/geode/kafka/LocatorHostPort.java
@@ -1,4 +1,4 @@
-package kafka;
+package geode.kafka;
 
 public class LocatorHostPort {
 
diff --git a/src/main/java/kafka/GeodeKafkaSink.java 
b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
similarity index 98%
rename from src/main/java/kafka/GeodeKafkaSink.java
rename to src/main/java/geode/kafka/sink/GeodeKafkaSink.java
index 67a244e..68460e4 100644
--- a/src/main/java/kafka/GeodeKafkaSink.java
+++ b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java
@@ -1,4 +1,4 @@
-package kafka;
+package geode.kafka.sink;
 
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
diff --git a/src/main/java/kafka/GeodeEvent.java 
b/src/main/java/geode/kafka/source/GeodeEvent.java
similarity index 94%
rename from src/main/java/kafka/GeodeEvent.java
rename to src/main/java/geode/kafka/source/GeodeEvent.java
index 805f4a0..41e37c6 100644
--- a/src/main/java/kafka/GeodeEvent.java
+++ b/src/main/java/geode/kafka/source/GeodeEvent.java
@@ -1,4 +1,4 @@
-package kafka;
+package geode.kafka.source;
 
 import org.apache.geode.cache.query.CqEvent;
 
diff --git a/src/main/java/kafka/GeodeKafkaSource.java 
b/src/main/java/geode/kafka/source/GeodeKafkaSource.java
similarity index 67%
rename from src/main/java/kafka/GeodeKafkaSource.java
rename to src/main/java/geode/kafka/source/GeodeKafkaSource.java
index 95afc50..d5da62a 100644
--- a/src/main/java/kafka/GeodeKafkaSource.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSource.java
@@ -1,5 +1,6 @@
-package kafka;
+package geode.kafka.source;
 
+import geode.kafka.GeodeConnectorConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.connector.Task;
@@ -10,21 +11,19 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static kafka.GeodeConnectorConfig.BATCH_SIZE;
-import static kafka.GeodeConnectorConfig.CQ_PREFIX;
-import static kafka.GeodeConnectorConfig.DEFAULT_BATCH_SIZE;
-import static kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX;
-import static kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_ID;
-import static kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT;
-import static kafka.GeodeConnectorConfig.DEFAULT_LOCATOR;
-import static kafka.GeodeConnectorConfig.DEFAULT_QUEUE_SIZE;
-import static kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
-import static kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT;
-import static kafka.GeodeConnectorConfig.LOCATORS;
-import static kafka.GeodeConnectorConfig.QUEUE_SIZE;
-import static kafka.GeodeConnectorConfig.REGIONS;
-import static kafka.GeodeConnectorConfig.TOPICS;
-import static kafka.GeodeKafkaSourceTask.TASK_ID;
+import static geode.kafka.GeodeConnectorConfig.BATCH_SIZE;
+import static geode.kafka.GeodeConnectorConfig.CQ_PREFIX;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_BATCH_SIZE;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_ID;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOCATOR;
+import static geode.kafka.GeodeConnectorConfig.DEFAULT_QUEUE_SIZE;
+import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
+import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT;
+import static geode.kafka.GeodeConnectorConfig.LOCATORS;
+import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE;
+
 
 public class GeodeKafkaSource extends SourceConnector {
 
@@ -39,16 +38,14 @@ public class GeodeKafkaSource extends SourceConnector {
 
   @Override
   public List<Map<String, String>> taskConfigs(int maxTasks) {
-    System.out.println("GKSource: taskConfigs");
     List<Map<String, String>> taskConfigs = new ArrayList<>();
     Map<String, String> taskProps = new HashMap<>();
 
     taskProps.putAll(sharedProps);
 
-    // use the same props for all tasks at the moment
     for (int i = 0; i < maxTasks; i++) {
     //TODO partition regions and topics
-      taskProps.put(TASK_ID, "" + i);
+      taskProps.put(GeodeConnectorConfig.TASK_ID, "" + i);
       taskConfigs.add(taskProps);
     }
     return taskConfigs;
@@ -82,6 +79,7 @@ public class GeodeKafkaSource extends SourceConnector {
 
   @Override
   public String version() {
+    //TODO
     return AppInfoParser.getVersion();
   }
 
diff --git a/src/main/java/kafka/GeodeKafkaSourceListener.java 
b/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
similarity index 95%
rename from src/main/java/kafka/GeodeKafkaSourceListener.java
rename to src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
index 4c0e729..c4d6b22 100644
--- a/src/main/java/kafka/GeodeKafkaSourceListener.java
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceListener.java
@@ -1,7 +1,6 @@
-package kafka;
+package geode.kafka.source;
 
 import org.apache.geode.cache.query.CqEvent;
-import org.apache.geode.cache.query.CqListener;
 import org.apache.geode.cache.query.CqStatusListener;
 
 import java.util.concurrent.BlockingQueue;
diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java 
b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
new file mode 100644
index 0000000..23fa141
--- /dev/null
+++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -0,0 +1,133 @@
+package geode.kafka.source;
+
+import geode.kafka.GeodeContext;
+import geode.kafka.GeodeConnectorConfig;
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqAttributesFactory;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+
+import static geode.kafka.GeodeConnectorConfig.BATCH_SIZE;
+import static geode.kafka.GeodeConnectorConfig.CQ_PREFIX;
+import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE;
+import static geode.kafka.GeodeConnectorConfig.REGION_NAME;
+
+public class GeodeKafkaSourceTask extends SourceTask {
+
+    private static final String TASK_PREFIX = "TASK";
+    private static final String DOT = ".";
+
+    //property string to pass in to identify task
+    private static final Map<String, Long> OFFSET_DEFAULT = createOffset();
+
+    private int taskId;
+    private GeodeContext geodeContext;
+    private List<String> topics;
+
+    private Map<String, Map<String, String>> sourcePartitions;
+    private static BlockingQueue<GeodeEvent> eventBuffer;
+    private int batchSize;
+
+
+    private static Map<String, Long> createOffset() {
+        Map<String, Long> offset = new HashMap<>();
+        offset.put("OFFSET", 0L);
+        return offset;
+    }
+
+    @Override
+    public String version() {
+        return null;
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+        try {
+            GeodeConnectorConfig geodeConnectorConfig = new 
GeodeConnectorConfig(props);
+            geodeContext = new GeodeContext(geodeConnectorConfig);
+
+            batchSize = Integer.parseInt(props.get(BATCH_SIZE));
+            int queueSize = Integer.parseInt(props.get(QUEUE_SIZE));
+            eventBuffer = new LinkedBlockingQueue<>(queueSize);
+
+            sourcePartitions = 
createSourcePartitionsMap(geodeConnectorConfig.getRegionNames());
+            topics = geodeConnectorConfig.getTopics();
+
+            String cqPrefix = props.get(CQ_PREFIX);
+
+            installOnGeode(geodeConnectorConfig, geodeContext, eventBuffer, 
cqPrefix);
+        }
+        catch (Exception e) {
+            System.out.println("Exception:" + e);
+            e.printStackTrace();
+            throw e;
+        }
+    }
+
+    @Override
+    public List<SourceRecord> poll() throws InterruptedException {
+        ArrayList<SourceRecord> records = new ArrayList<>(batchSize);
+        ArrayList<GeodeEvent> events = new ArrayList<>(batchSize);
+        if (eventBuffer.drainTo(events, batchSize) > 0) {
+            for (GeodeEvent event : events) {
+                for (String topic : topics) {
+                    records.add(new 
SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, 
topic, null, event.getEvent()));
+                }
+            }
+
+            return records;
+        }
+
+        return null;
+    }
+
+    @Override
+    public void stop() {
+        geodeContext.getClientCache().close(true);
+    }
+
+    void installOnGeode(GeodeConnectorConfig geodeConnectorConfig, 
GeodeContext geodeContext, BlockingQueue eventBuffer, String cqPrefix) {
+      boolean isDurable = geodeConnectorConfig.isDurable();
+        for (String region : geodeConnectorConfig.getRegionNames()) {
+            installListenersToRegion(geodeContext, taskId, eventBuffer, 
region, cqPrefix, isDurable);
+        }
+        if (isDurable) {
+            geodeContext.getClientCache().readyForEvents();
+        }
+    }
+
+    void installListenersToRegion(GeodeContext geodeContext, int taskId, 
BlockingQueue<GeodeEvent> eventBuffer, String regionName, String cqPrefix, 
boolean isDurable) {
+        CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
+        cqAttributesFactory.addCqListener(new 
GeodeKafkaSourceListener(eventBuffer, regionName));
+        CqAttributes cqAttributes = cqAttributesFactory.create();
+        geodeContext.newCq(generateCqName(taskId, cqPrefix, regionName), 
"select * from /" + regionName, cqAttributes,
+                isDurable);
+    }
+
+    /**
+     * converts a list of regions names into a map of source partitions
+     *
+     * @param regionNames list of regionNames
+     * @return Map<String, Map < String, String>> a map of source partitions, 
keyed by region name
+     */
+    Map<String, Map<String, String>> createSourcePartitionsMap(List<String> 
regionNames) {
+        return regionNames.stream().map(regionName -> {
+            Map<String, String> sourcePartition = new HashMap<>();
+            sourcePartition.put(REGION_NAME, regionName);
+            return sourcePartition;
+        }).collect(Collectors.toMap(s -> s.get(REGION_NAME), s -> s));
+    }
+
+    String generateCqName(int taskId, String cqPrefix, String regionName) {
+        return cqPrefix + DOT + TASK_PREFIX + taskId + DOT + regionName;
+    }
+
+}
diff --git a/src/main/java/kafka/GeodeConnectorConfig.java 
b/src/main/java/kafka/GeodeConnectorConfig.java
deleted file mode 100644
index 003367d..0000000
--- a/src/main/java/kafka/GeodeConnectorConfig.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package kafka;
-
-public class GeodeConnectorConfig {
-
-    //Geode Configuration
-    public static final String DURABLE_CLIENT_ID_PREFIX = "durableClientId";
-    public static final String DEFAULT_DURABLE_CLIENT_ID = "";
-    public static final String DURABLE_CLIENT_TIME_OUT = 
"durableClientTimeout";
-    public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000";
-
-
-    //GeodeKafka Specific Configuration
-    public static final String CQ_PREFIX = "cqPrefix";
-    public static final String DEFAULT_CQ_PREFIX = "cqForGeodeKafka";
-    /**
-     * Specifies which Locators to connect to Apache Geode
-     */
-    public static final String LOCATORS = "locators";
-    public static final String DEFAULT_LOCATOR = "localhost[10334]";
-
-    /**
-     * Specifies which Regions to connect in Apache Geode
-     */
-    public static final String REGIONS = "regions";
-
-    /**
-     * Specifies which Topics to connect in Kafka
-     */
-    public static final String TOPICS = "topics";
-
-    /**
-     * Property to describe the Source Partition in a record
-     */
-    public static final String REGION_NAME = "regionName";  //used for Source 
Partition Events
-
-    public static final String BATCH_SIZE = "geodeConnectorBatchSize";
-    public static final String DEFAULT_BATCH_SIZE = "100";
-
-    public static final String QUEUE_SIZE = "geodeConnectorQueueSize";
-    public static final String DEFAULT_QUEUE_SIZE = "100000";
-}
diff --git a/src/main/java/kafka/GeodeKafkaSourceTask.java 
b/src/main/java/kafka/GeodeKafkaSourceTask.java
deleted file mode 100644
index 896b602..0000000
--- a/src/main/java/kafka/GeodeKafkaSourceTask.java
+++ /dev/null
@@ -1,196 +0,0 @@
-package kafka;
-
-import org.apache.geode.cache.client.ClientCache;
-import org.apache.geode.cache.client.ClientCacheFactory;
-import org.apache.geode.cache.query.CqAttributes;
-import org.apache.geode.cache.query.CqAttributesFactory;
-import org.apache.geode.cache.query.CqException;
-import org.apache.geode.cache.query.CqExistsException;
-import org.apache.geode.cache.query.RegionNotFoundException;
-import org.apache.kafka.connect.source.SourceRecord;
-import org.apache.kafka.connect.source.SourceTask;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.stream.Collectors;
-
-import static kafka.GeodeConnectorConfig.BATCH_SIZE;
-import static kafka.GeodeConnectorConfig.CQ_PREFIX;
-import static kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX;
-import static kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_ID;
-import static kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT;
-import static kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX;
-import static kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT;
-import static kafka.GeodeConnectorConfig.QUEUE_SIZE;
-import static kafka.GeodeConnectorConfig.REGION_NAME;
-
-public class GeodeKafkaSourceTask extends SourceTask {
-
-    //property string to pass in to identify task
-    public static final String TASK_ID = "GEODE_TASK_ID";
-    private static final String TASK_PREFIX = "TASK";
-    private static final String DOT = ".";
-    private static final Map<String, Long> OFFSET_DEFAULT = createOffset();
-
-    private int taskId;
-    private ClientCache clientCache;
-    private List<String> regionNames;
-    private List<String> topics;
-    private Map<String, Map<String, String>> sourcePartitions;
-    private static BlockingQueue<GeodeEvent> eventBuffer;
-    private int batchSize;
-
-
-    private static Map<String, Long> createOffset() {
-        Map<String, Long> offset = new HashMap<>();
-        offset.put("OFFSET", 0L);
-        return offset;
-    }
-
-    @Override
-    public String version() {
-        return null;
-    }
-
-    @Override
-    public void start(Map<String, String> props) {
-        try {
-            System.out.println("JASON task start");
-            taskId = Integer.parseInt(props.get(TASK_ID));
-            batchSize = Integer.parseInt(props.get(BATCH_SIZE));
-            int queueSize = Integer.parseInt(props.get(QUEUE_SIZE));
-            eventBuffer = new LinkedBlockingQueue<>(queueSize);
-
-            //grouping will be done in the source and not the task
-            regionNames = parseNames(props.get(GeodeConnectorConfig.REGIONS));
-            topics = parseNames(props.get(GeodeConnectorConfig.TOPICS));
-            sourcePartitions = createSourcePartitionsMap(regionNames);
-
-            String durableClientId = props.get(DURABLE_CLIENT_ID_PREFIX);
-            if (!durableClientId.equals("")) {
-                durableClientId += taskId;
-            }
-            System.out.println("JASON durable client id is:" + 
durableClientId);
-            String durableClientTimeout = props.get(DURABLE_CLIENT_TIME_OUT);
-            String cqPrefix = props.get(CQ_PREFIX);
-
-            List<LocatorHostPort> locators = 
parseLocators(props.get(GeodeConnectorConfig.LOCATORS));
-            installOnGeode(taskId, eventBuffer, locators, regionNames, 
durableClientId, durableClientTimeout, cqPrefix);
-            System.out.println("JASON task start finished");
-        }
-        catch (Exception e) {
-            System.out.println("Exception:" + e);
-            e.printStackTrace();
-            throw e;
-        }
-    }
-
-    @Override
-    public List<SourceRecord> poll() throws InterruptedException {
-        ArrayList<SourceRecord> records = new ArrayList<>(batchSize);
-        ArrayList<GeodeEvent> events = new ArrayList<>(batchSize);
-        if (eventBuffer.drainTo(events, batchSize) > 0) {
-            for (GeodeEvent event : events) {
-                for (String topic : topics) {
-                    records.add(new 
SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, 
topic, null, event.getEvent()));
-//                    records.add(new 
SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, 
topic, null, "STRING"));
-                }
-            }
-
-            return records;
-        }
-
-        return null;
-    }
-
-    @Override
-    public void stop() {
-        clientCache.close(true);
-    }
-
-    ClientCache createClientCache(List<LocatorHostPort> locators, String 
durableClientName, String durableClientTimeOut) {
-        ClientCacheFactory ccf = new 
ClientCacheFactory().set("durable-client-id", durableClientName)
-                .set("durable-client-timeout", durableClientTimeOut)
-                .setPoolSubscriptionEnabled(true);
-        for (LocatorHostPort locator: locators) {
-          ccf.addPoolLocator(locator.getHostName(), 
locator.getPort()).create();
-        }
-        return ccf.create();
-    }
-
-    void installOnGeode(int taskId, BlockingQueue<GeodeEvent> eventBuffer, 
List<LocatorHostPort> locators, List<String> regionNames, String 
durableClientId, String durableClientTimeout, String cqPrefix) {
-      boolean isDurable = isDurable(durableClientId);
-
-      clientCache = createClientCache(locators, durableClientId, 
durableClientTimeout);
-        for (String region : regionNames) {
-            installListenersToRegion(taskId, eventBuffer, region, cqPrefix, 
isDurable);
-        }
-        if (isDurable) {
-          clientCache.readyForEvents();
-        }
-    }
-
-    void installListenersToRegion(int taskId, BlockingQueue<GeodeEvent> 
eventBuffer, String regionName, String cqPrefix, boolean isDurable) {
-        CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
-        cqAttributesFactory.addCqListener(new 
GeodeKafkaSourceListener(eventBuffer, regionName));
-        CqAttributes cqAttributes = cqAttributesFactory.create();
-        try {
-            clientCache.getQueryService().newCq(generateCqName(taskId, 
cqPrefix, regionName), "select * from /" + regionName, cqAttributes,
-                    isDurable).execute();
-        } catch (CqExistsException e) {
-            System.out.println("UHH");
-            e.printStackTrace();
-        } catch (CqException | RegionNotFoundException e) {
-            System.out.println("UHH e");
-            e.printStackTrace();
-        } catch (Exception e) {
-            System.out.println("UHHHHHH " + e);
-        }
-    }
-
-
-  List<String> parseNames(String names) {
-        return Arrays.stream(names.split(",")).map((s) -> 
s.trim()).collect(Collectors.toList());
-    }
-
-    List<LocatorHostPort> parseLocators(String locators) {
-        return Arrays.stream(locators.split(",")).map((s) -> {
-            String locatorString = s.trim();
-            return parseLocator(locatorString);
-        }).collect(Collectors.toList());
-    }
-
-    private LocatorHostPort parseLocator(String locatorString) {
-        String[] splits = locatorString.split("\\[");
-        String locator = splits[0];
-        int port = Integer.parseInt(splits[1].replace("]", ""));
-        return new LocatorHostPort(locator, port);
-    }
-
-    boolean isDurable(String durableClientId) {
-      return !durableClientId.equals("");
-    }
-
-    String generateCqName(int taskId, String cqPrefix, String regionName) {
-      return cqPrefix + DOT + TASK_PREFIX + taskId + DOT + regionName;
-    }
-
-    /**
-     * converts a list of regions names into a map of source partitions
-     *
-     * @param regionNames list of regionNames
-     * @return Map<String, Map < String, String>> a map of source partitions, 
keyed by region name
-     */
-    Map<String, Map<String, String>> createSourcePartitionsMap(List<String> 
regionNames) {
-        return regionNames.stream().map(regionName -> {
-            Map<String, String> sourcePartition = new HashMap<>();
-            sourcePartition.put(REGION_NAME, regionName);
-            return sourcePartition;
-        }).collect(Collectors.toMap(s -> s.get(REGION_NAME), s -> s));
-    }
-}
diff --git a/src/test/java/kafka/GeodeKafkaTestCluster.java 
b/src/test/java/geode/kafka/GeodeKafkaTestCluster.java
similarity index 99%
rename from src/test/java/kafka/GeodeKafkaTestCluster.java
rename to src/test/java/geode/kafka/GeodeKafkaTestCluster.java
index 85c0fc4..f1a6dff 100644
--- a/src/test/java/kafka/GeodeKafkaTestCluster.java
+++ b/src/test/java/geode/kafka/GeodeKafkaTestCluster.java
@@ -1,6 +1,7 @@
-package kafka;
+package geode.kafka;
 
 import kafka.admin.RackAwareMode;
+import geode.kafka.source.GeodeKafkaSource;
 import kafka.zk.AdminZkClient;
 import kafka.zk.KafkaZkClient;
 import org.apache.geode.cache.Region;
diff --git a/src/test/java/kafka/GeodeLocalCluster.java 
b/src/test/java/geode/kafka/GeodeLocalCluster.java
similarity index 97%
rename from src/test/java/kafka/GeodeLocalCluster.java
rename to src/test/java/geode/kafka/GeodeLocalCluster.java
index 43ac8f5..afd3b9d 100644
--- a/src/test/java/kafka/GeodeLocalCluster.java
+++ b/src/test/java/geode/kafka/GeodeLocalCluster.java
@@ -1,4 +1,4 @@
-package kafka;
+package geode.kafka;
 
 import java.io.IOException;
 
diff --git a/src/test/java/kafka/JavaProcess.java 
b/src/test/java/geode/kafka/JavaProcess.java
similarity index 98%
rename from src/test/java/kafka/JavaProcess.java
rename to src/test/java/geode/kafka/JavaProcess.java
index 30edfef..fe00094 100644
--- a/src/test/java/kafka/JavaProcess.java
+++ b/src/test/java/geode/kafka/JavaProcess.java
@@ -1,4 +1,4 @@
-package kafka;
+package geode.kafka;
 
 import java.io.File;
 import java.io.IOException;
diff --git a/src/test/java/kafka/KafkaLocalCluster.java 
b/src/test/java/geode/kafka/KafkaLocalCluster.java
similarity index 96%
rename from src/test/java/kafka/KafkaLocalCluster.java
rename to src/test/java/geode/kafka/KafkaLocalCluster.java
index cd2a3df..57f16f4 100644
--- a/src/test/java/kafka/KafkaLocalCluster.java
+++ b/src/test/java/geode/kafka/KafkaLocalCluster.java
@@ -1,4 +1,4 @@
-package kafka;
+package geode.kafka;
 
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServerStartable;
diff --git a/src/test/java/kafka/LocatorLauncherWrapper.java 
b/src/test/java/geode/kafka/LocatorLauncherWrapper.java
similarity index 98%
rename from src/test/java/kafka/LocatorLauncherWrapper.java
rename to src/test/java/geode/kafka/LocatorLauncherWrapper.java
index c1a7075..57ff405 100644
--- a/src/test/java/kafka/LocatorLauncherWrapper.java
+++ b/src/test/java/geode/kafka/LocatorLauncherWrapper.java
@@ -1,4 +1,4 @@
-package kafka;
+package geode.kafka;
 
 import org.apache.geode.distributed.ConfigurationProperties;
 import org.apache.geode.distributed.Locator;
diff --git a/src/test/java/kafka/ServerLauncherWrapper.java 
b/src/test/java/geode/kafka/ServerLauncherWrapper.java
similarity index 99%
rename from src/test/java/kafka/ServerLauncherWrapper.java
rename to src/test/java/geode/kafka/ServerLauncherWrapper.java
index 933824e..b36a3aa 100644
--- a/src/test/java/kafka/ServerLauncherWrapper.java
+++ b/src/test/java/geode/kafka/ServerLauncherWrapper.java
@@ -1,4 +1,4 @@
-package kafka;
+package geode.kafka;
 
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheFactory;
diff --git a/src/test/java/kafka/WorkerAndHerderCluster.java 
b/src/test/java/geode/kafka/WorkerAndHerderCluster.java
similarity index 95%
rename from src/test/java/kafka/WorkerAndHerderCluster.java
rename to src/test/java/geode/kafka/WorkerAndHerderCluster.java
index 7357232..c347946 100644
--- a/src/test/java/kafka/WorkerAndHerderCluster.java
+++ b/src/test/java/geode/kafka/WorkerAndHerderCluster.java
@@ -1,4 +1,4 @@
-package kafka;
+package geode.kafka;
 
 import java.io.IOException;
 
diff --git a/src/test/java/kafka/WorkerAndHerderWrapper.java 
b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
similarity index 89%
rename from src/test/java/kafka/WorkerAndHerderWrapper.java
rename to src/test/java/geode/kafka/WorkerAndHerderWrapper.java
index 5f8ccd2..cc8e27b 100644
--- a/src/test/java/kafka/WorkerAndHerderWrapper.java
+++ b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java
@@ -1,7 +1,6 @@
-package kafka;
+package geode.kafka;
 
-import org.apache.geode.distributed.ConfigurationProperties;
-import org.apache.geode.distributed.Locator;
+import geode.kafka.source.GeodeKafkaSource;
 import org.apache.kafka.common.utils.SystemTime;
 import 
org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
@@ -14,16 +13,14 @@ import 
org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
 import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
 import org.apache.kafka.connect.util.ConnectUtils;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Properties;
 
-import static kafka.GeodeConnectorConfig.REGIONS;
-import static kafka.GeodeConnectorConfig.TOPICS;
-import static kafka.GeodeKafkaTestCluster.TEST_REGIONS;
-import static kafka.GeodeKafkaTestCluster.TEST_TOPICS;
+import static geode.kafka.GeodeConnectorConfig.REGIONS;
+import static geode.kafka.GeodeConnectorConfig.TOPICS;
+import static geode.kafka.GeodeKafkaTestCluster.TEST_REGIONS;
+import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPICS;
 
 public class WorkerAndHerderWrapper {
 
diff --git a/src/test/java/kafka/ZooKeeperLocalCluster.java 
b/src/test/java/geode/kafka/ZooKeeperLocalCluster.java
similarity index 98%
rename from src/test/java/kafka/ZooKeeperLocalCluster.java
rename to src/test/java/geode/kafka/ZooKeeperLocalCluster.java
index 8b23f53..a3d3433 100644
--- a/src/test/java/kafka/ZooKeeperLocalCluster.java
+++ b/src/test/java/geode/kafka/ZooKeeperLocalCluster.java
@@ -1,4 +1,4 @@
-package kafka;
+package geode.kafka;
 
 import org.apache.zookeeper.server.ServerConfig;
 import org.apache.zookeeper.server.ZooKeeperServerMain;
diff --git a/src/test/java/kafka/GeodeKafkaSourceTaskTest.java 
b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
similarity index 57%
rename from src/test/java/kafka/GeodeKafkaSourceTaskTest.java
rename to src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
index 2c15664..33c260d 100644
--- a/src/test/java/kafka/GeodeKafkaSourceTaskTest.java
+++ b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java
@@ -1,5 +1,6 @@
-package kafka;
+package geode.kafka.source;
 
+import geode.kafka.GeodeConnectorConfig;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -7,10 +8,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static kafka.GeodeConnectorConfig.REGION_NAME;
-import static org.hamcrest.CoreMatchers.allOf;
+import static geode.kafka.GeodeConnectorConfig.REGION_NAME;
 import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 public class GeodeKafkaSourceTaskTest {
@@ -37,28 +36,7 @@ public class GeodeKafkaSourceTaskTest {
 
 
 
-    @Test
-    public void parseRegionNamesShouldSplitOnComma() {
-        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
-        List<String> regionNames = 
task.parseNames("region1,region2,region3,region4");
-        assertEquals(4, regionNames.size());
-        assertThat(true, allOf(is(regionNames.contains("region1"))
-                , is(regionNames.contains("region2"))
-                , is(regionNames.contains("region3"))
-                , is(regionNames.contains("region4"))));
-    }
 
-    @Test
-    public void parseRegionNamesShouldChomp() {
-        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
-        List<String> regionNames = task.parseNames("region1, region2, 
region3,region4");
-        assertEquals(4, regionNames.size());
-        assertThat(true, allOf(is(regionNames instanceof List)
-                , is(regionNames.contains("region1"))
-                , is(regionNames.contains("region2"))
-                , is(regionNames.contains("region3"))
-                , is(regionNames.contains("region4"))));
-    }
 
     @Test
     public void createSourcePartitionsShouldReturnAMapOfSourcePartitions() {
@@ -71,13 +49,7 @@ public class GeodeKafkaSourceTaskTest {
         assertThat(true, 
is(sourcePartitions.get("region3").get(REGION_NAME).equals("region3")));
     }
 
-    @Test
-    public void shouldBeAbleToParseGeodeLocatorStrings() {
-        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
-        String locatorString="localhost[8888], localhost[8881]";
-        List<LocatorHostPort> locators = task.parseLocators(locatorString);
-        assertThat(2, is(locators.size()));
-    }
+
 
     @Test
     public void listOfLocatorsShouldBeConfiguredIntoClientCache() {
@@ -101,7 +73,6 @@ public class GeodeKafkaSourceTaskTest {
         props.put(GeodeConnectorConfig.QUEUE_SIZE, 
GeodeConnectorConfig.DEFAULT_QUEUE_SIZE);
         props.put(GeodeConnectorConfig.BATCH_SIZE, 
GeodeConnectorConfig.DEFAULT_BATCH_SIZE);
 
-
         GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
         task.start(props);
 
diff --git a/src/test/java/kafka/GeodeKafkaSourceTest.java 
b/src/test/java/geode/kafka/source/GeodeKafkaSourceTest.java
similarity index 93%
rename from src/test/java/kafka/GeodeKafkaSourceTest.java
rename to src/test/java/geode/kafka/source/GeodeKafkaSourceTest.java
index ec6dff8..717d495 100644
--- a/src/test/java/kafka/GeodeKafkaSourceTest.java
+++ b/src/test/java/geode/kafka/source/GeodeKafkaSourceTest.java
@@ -1,4 +1,4 @@
-package kafka;
+package geode.kafka.source;
 
 import org.junit.Test;
 

Reply via email to