This is an automated email from the ASF dual-hosted git repository.

nnag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git

commit 455192ad84229e8998475e28dc37bff70edb391e
Author: Jason Huynh <[email protected]>
AuthorDate: Wed Jan 22 17:33:38 2020 -0800

    added missing files
---
 src/main/java/kafka/GeodeConnectorConfig.java     |  41 ++++++++
 src/main/java/kafka/GeodeEvent.java               |  25 +++++
 src/main/java/kafka/GeodeKafkaSink.java           |  71 ++++++++++++++
 src/main/java/kafka/GeodeKafkaSourceListener.java |  42 ++++++++
 src/main/java/kafka/LocatorHostPort.java          |  23 +++++
 src/test/java/kafka/GeodeKafkaSourceTaskTest.java | 114 ++++++++++++++++++++++
 src/test/java/kafka/GeodeKafkaSourceTest.java     |  22 +++++
 src/test/java/kafka/WorkerAndHerderCluster.java   |  23 +++++
 src/test/java/kafka/WorkerAndHerderWrapper.java   |  72 ++++++++++++++
 9 files changed, 433 insertions(+)

diff --git a/src/main/java/kafka/GeodeConnectorConfig.java 
b/src/main/java/kafka/GeodeConnectorConfig.java
new file mode 100644
index 0000000..b7140aa
--- /dev/null
+++ b/src/main/java/kafka/GeodeConnectorConfig.java
@@ -0,0 +1,41 @@
+package kafka;
+
+public class GeodeConnectorConfig {
+
+    //Geode Configuration
+    public static final String DURABLE_CLIENT_ID_PREFIX = "DurableClientId";
+    public static final String DEFAULT_DURABLE_CLIENT_ID = "";
+    public static final String DURABLE_CLIENT_TIME_OUT = 
"DurableClientTimeout";
+    public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000";
+
+
+    //GeodeKafka Specific Configuration
+    public static final String CQ_PREFIX = "CqPrefix";
+    public static final String DEFAULT_CQ_PREFIX = "CqForGeodeKafka";
+    /**
+     * Specifies which Locators to connect to Apache Geode
+     */
+    public static final String LOCATORS = "Locators";
+    public static final String DEFAULT_LOCATOR = "localhost[10334]";
+
+    /**
+     * Specifies which Regions to connect in Apache Geode
+     */
+    public static final String REGIONS = "Regions";
+
+    /**
+     * Specifies which Topics to connect in Kafka
+     */
+    public static final String TOPICS = "Topics";
+
+    /**
+     * Property to describe the Source Partition in a record
+     */
+    public static final String REGION_NAME = "RegionName";  //used for Source 
Partition Events
+
+    public static final String BATCH_SIZE = "GeodeConnectorBatchSize";
+    public static final String DEFAULT_BATCH_SIZE = "100";
+
+    public static final String QUEUE_SIZE = "GeodeConnectorQueueSize";
+    public static final String DEFAULT_QUEUE_SIZE = "100000";
+}
diff --git a/src/main/java/kafka/GeodeEvent.java 
b/src/main/java/kafka/GeodeEvent.java
new file mode 100644
index 0000000..805f4a0
--- /dev/null
+++ b/src/main/java/kafka/GeodeEvent.java
@@ -0,0 +1,25 @@
+package kafka;
+
+import org.apache.geode.cache.query.CqEvent;
+
+/**
+ * wrapper class to store regionName and cq event so the correct topics can be 
updated
+ */
+public class GeodeEvent {
+
+    private String regionName;
+    private CqEvent event;
+
+    public GeodeEvent(String regionName, CqEvent event) {
+        this.regionName = regionName;
+        this.event = event;
+    }
+
+    public String getRegionName() {
+        return regionName;
+    }
+
+    public CqEvent getEvent() {
+        return event;
+    }
+}
diff --git a/src/main/java/kafka/GeodeKafkaSink.java 
b/src/main/java/kafka/GeodeKafkaSink.java
index af3a22a..67a244e 100644
--- a/src/main/java/kafka/GeodeKafkaSink.java
+++ b/src/main/java/kafka/GeodeKafkaSink.java
@@ -1,5 +1,76 @@
 package kafka;
 
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
+
+import java.util.Map;
+
 public class GeodeKafkaSink {
 
+//
+//    /** Sink properties. */
+//    private Map<String, String> configProps;
+//
+//    /** Expected configurations. */
+//    private static final ConfigDef CONFIG_DEF = new ConfigDef();
+//
+//    /** {@inheritDoc} */
+//    @Override public String version() {
+//        return AppInfoParser.getVersion();
+//    }
+//
+//    /**
+//     * A sink lifecycle method. Validates grid-specific sink properties.
+//     *
+//     * @param props Sink properties.
+//     */
+//    @Override public void start(Map<String, String> props) {
+//        configProps = props;
+//
+//        try {
+//            A.notNullOrEmpty(configProps.get(SinkConnector.TOPICS_CONFIG), 
"topics");
+//            
A.notNullOrEmpty(configProps.get(IgniteSinkConstants.CACHE_NAME), "cache name");
+//            
A.notNullOrEmpty(configProps.get(IgniteSinkConstants.CACHE_CFG_PATH), "path to 
cache config file");
+//        }
+//        catch (IllegalArgumentException e) {
+//            throw new ConnectException("Cannot start IgniteSinkConnector due 
to configuration error", e);
+//        }
+//    }
+//
+//    /**
+//     * Obtains a sink task class to be instantiated for feeding data into 
grid.
+//     *
+//     * @return IgniteSinkTask class.
+//     */
+//    @Override public Class<? extends Task> taskClass() {
+//        return IgniteSinkTask.class;
+//    }
+//
+//    /**
+//     * Builds each config for <tt>maxTasks</tt> tasks.
+//     *
+//     * @param maxTasks Max number of tasks.
+//     * @return Task configs.
+//     */
+//    @Override public List<Map<String, String>> taskConfigs(int maxTasks) {
+//        List<Map<String, String>> taskConfigs = new ArrayList<>();
+//        Map<String, String> taskProps = new HashMap<>();
+//
+//        taskProps.putAll(configProps);
+//
+//        for (int i = 0; i < maxTasks; i++)
+//            taskConfigs.add(taskProps);
+//
+//        return taskConfigs;
+//    }
+//
+//    /** {@inheritDoc} */
+//    @Override public void stop() {
+//        // No-op.
+//    }
+//
+//    /** {@inheritDoc} */
+//    @Override public ConfigDef config() {
+//        return CONFIG_DEF;
+//    }
 }
diff --git a/src/main/java/kafka/GeodeKafkaSourceListener.java 
b/src/main/java/kafka/GeodeKafkaSourceListener.java
new file mode 100644
index 0000000..ec94ee3
--- /dev/null
+++ b/src/main/java/kafka/GeodeKafkaSourceListener.java
@@ -0,0 +1,42 @@
+package kafka;
+
+import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqListener;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+class GeodeKafkaSourceListener implements CqListener {
+
+    public String regionName;
+    private BlockingQueue<GeodeEvent> eventBuffer;
+
+    public GeodeKafkaSourceListener(BlockingQueue<GeodeEvent> eventBuffer, 
String regionName) {
+        this.eventBuffer = eventBuffer;
+        this.regionName = regionName;
+    }
+
+    @Override
+    public void onEvent(CqEvent aCqEvent) {
+        try {
+            System.out.println("JASON cqEvent and putting into eventBuffer");
+            eventBuffer.offer(new GeodeEvent(regionName, aCqEvent), 2, 
TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+
+            while (true) {
+                try {
+                    if (!eventBuffer.offer(new GeodeEvent(regionName, 
aCqEvent), 2, TimeUnit.SECONDS))
+                        break;
+                } catch (InterruptedException ex) {
+                    ex.printStackTrace();
+                }
+                System.out.println("GeodeKafkaSource Queue is full");
+            }
+        }
+    }
+
+    @Override
+    public void onError(CqEvent aCqEvent) {
+
+    }
+}
diff --git a/src/main/java/kafka/LocatorHostPort.java 
b/src/main/java/kafka/LocatorHostPort.java
new file mode 100644
index 0000000..517bad7
--- /dev/null
+++ b/src/main/java/kafka/LocatorHostPort.java
@@ -0,0 +1,23 @@
+package kafka;
+
+public class LocatorHostPort {
+
+    private String hostName;
+    private int port;
+
+    public LocatorHostPort(String hostName, int port) {
+        this.hostName = hostName;
+        this.port = port;
+    }
+
+    public String getHostName() {
+        return hostName;
+    }
+
+    public int getPort() {
+        return port;
+    }
+    public String toString() {
+        return hostName + "[" + port + "]";
+    }
+}
diff --git a/src/test/java/kafka/GeodeKafkaSourceTaskTest.java 
b/src/test/java/kafka/GeodeKafkaSourceTaskTest.java
new file mode 100644
index 0000000..2c15664
--- /dev/null
+++ b/src/test/java/kafka/GeodeKafkaSourceTaskTest.java
@@ -0,0 +1,114 @@
+package kafka;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static kafka.GeodeConnectorConfig.REGION_NAME;
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+public class GeodeKafkaSourceTaskTest {
+
+    @Test
+    public void cqListenerOnEventPopulatesEventsBuffer() {
+
+    }
+
+    @Test
+    public void pollReturnsEventsWhenEventBufferHasValues() {
+
+    }
+
+    @Test
+    public void regionsArePassedCorrectlyToTask() {
+
+    }
+
+    @Test
+    public void installOnGeodeShouldCallCq() {
+        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+    }
+
+
+
+    @Test
+    public void parseRegionNamesShouldSplitOnComma() {
+        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+        List<String> regionNames = 
task.parseNames("region1,region2,region3,region4");
+        assertEquals(4, regionNames.size());
+        assertThat(true, allOf(is(regionNames.contains("region1"))
+                , is(regionNames.contains("region2"))
+                , is(regionNames.contains("region3"))
+                , is(regionNames.contains("region4"))));
+    }
+
+    @Test
+    public void parseRegionNamesShouldChomp() {
+        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+        List<String> regionNames = task.parseNames("region1, region2, 
region3,region4");
+        assertEquals(4, regionNames.size());
+        assertThat(true, allOf(is(regionNames instanceof List)
+                , is(regionNames.contains("region1"))
+                , is(regionNames.contains("region2"))
+                , is(regionNames.contains("region3"))
+                , is(regionNames.contains("region4"))));
+    }
+
+    @Test
+    public void createSourcePartitionsShouldReturnAMapOfSourcePartitions() {
+        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+        List<String> regionNames = Arrays.asList(new String[]{"region1", 
"region2", "region3"});
+        Map<String, Map<String,String>> sourcePartitions = 
task.createSourcePartitionsMap(regionNames);
+        assertThat(3, is(sourcePartitions.size()));
+        assertThat(true, 
is(sourcePartitions.get("region1").get(REGION_NAME).equals("region1")));
+        assertThat(true, 
is(sourcePartitions.get("region2").get(REGION_NAME).equals("region2")));
+        assertThat(true, 
is(sourcePartitions.get("region3").get(REGION_NAME).equals("region3")));
+    }
+
+    @Test
+    public void shouldBeAbleToParseGeodeLocatorStrings() {
+        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+        String locatorString="localhost[8888], localhost[8881]";
+        List<LocatorHostPort> locators = task.parseLocators(locatorString);
+        assertThat(2, is(locators.size()));
+    }
+
+    @Test
+    public void listOfLocatorsShouldBeConfiguredIntoClientCache() {
+
+    }
+
+    @Test
+    public void shouldNotBeDurableIfDurableClientIdIsNull() {
+
+    }
+
+    @Test
+    public void shouldNotCallReadyForEventsIfDurableClientPrefixIsEmpty() {
+
+    }
+
+    //Source properties tests
+    @Test
+    public void propertiesShouldBeCorrectlyTranslatedToConfiguration() {
+        Map<String, String> props = new HashMap<>();
+        props.put(GeodeConnectorConfig.QUEUE_SIZE, 
GeodeConnectorConfig.DEFAULT_QUEUE_SIZE);
+        props.put(GeodeConnectorConfig.BATCH_SIZE, 
GeodeConnectorConfig.DEFAULT_BATCH_SIZE);
+
+
+        GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
+        task.start(props);
+
+//        assertThat(task.getQueueSize(GeodeConnectorConfig.QUEUE_SIZE));
+
+
+    }
+
+
+}
diff --git a/src/test/java/kafka/GeodeKafkaSourceTest.java 
b/src/test/java/kafka/GeodeKafkaSourceTest.java
new file mode 100644
index 0000000..ec6dff8
--- /dev/null
+++ b/src/test/java/kafka/GeodeKafkaSourceTest.java
@@ -0,0 +1,22 @@
+package kafka;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class GeodeKafkaSourceTest {
+
+    @Test
+    public void durableClientIdShouldNotBeSetIfPropertyIsNotSet() {
+        GeodeKafkaSource source = new GeodeKafkaSource();
+        Map<String, String> props = new HashMap<>();
+        source.start(props);
+
+    }
+
+    @Test
+    public void cqPrefixShouldBeProperlyCalculatedFromProps() {
+
+    }
+}
diff --git a/src/test/java/kafka/WorkerAndHerderCluster.java 
b/src/test/java/kafka/WorkerAndHerderCluster.java
new file mode 100644
index 0000000..7357232
--- /dev/null
+++ b/src/test/java/kafka/WorkerAndHerderCluster.java
@@ -0,0 +1,23 @@
+package kafka;
+
+import java.io.IOException;
+
+public class WorkerAndHerderCluster {
+
+    private JavaProcess workerAndHerder;
+
+    public WorkerAndHerderCluster() {
+        workerAndHerder = new JavaProcess(WorkerAndHerderWrapper.class);
+    }
+
+    public void start() throws IOException, InterruptedException {
+        System.out.println("JASON starting worker");
+        workerAndHerder.exec();
+
+    }
+
+    public void stop() {
+        workerAndHerder.destroy();
+    }
+}
+
diff --git a/src/test/java/kafka/WorkerAndHerderWrapper.java 
b/src/test/java/kafka/WorkerAndHerderWrapper.java
new file mode 100644
index 0000000..5f8ccd2
--- /dev/null
+++ b/src/test/java/kafka/WorkerAndHerderWrapper.java
@@ -0,0 +1,72 @@
+package kafka;
+
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.distributed.Locator;
+import org.apache.kafka.common.utils.SystemTime;
+import 
org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
+import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
+import org.apache.kafka.connect.util.ConnectUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static kafka.GeodeConnectorConfig.REGIONS;
+import static kafka.GeodeConnectorConfig.TOPICS;
+import static kafka.GeodeKafkaTestCluster.TEST_REGIONS;
+import static kafka.GeodeKafkaTestCluster.TEST_TOPICS;
+
+public class WorkerAndHerderWrapper {
+
+    public static void main(String[] args) throws IOException {
+        Map props = new HashMap();
+        props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put("offset.storage.file.filename", "/tmp/connect.offsets");
+        // fast flushing for testing.
+        props.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10");
+
+
+        props.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.storage.StringConverter");
+        props.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.storage.StringConverter");
+        props.put("internal.key.converter.schemas.enable", "false");
+        props.put("internal.value.converter.schemas.enable", "false");
+        props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.storage.StringConverter");
+        props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.storage.StringConverter");
+        props.put("key.converter.schemas.enable", "false");
+        props.put("value.converter.schemas.enable", "false");
+
+        WorkerConfig workerCfg = new StandaloneConfig(props);
+
+        MemoryOffsetBackingStore offBackingStore = new 
MemoryOffsetBackingStore();
+        offBackingStore.configure(workerCfg);
+
+        Worker worker = new Worker("WORKER_ID", new SystemTime(), new 
Plugins(props), workerCfg, offBackingStore, new 
AllConnectorClientConfigOverridePolicy());
+        worker.start();
+
+        Herder herder = new StandaloneHerder(worker, 
ConnectUtils.lookupKafkaClusterId(workerCfg), new 
AllConnectorClientConfigOverridePolicy());
+        herder.start();
+
+        Map<String, String> sourceProps = new HashMap<>();
+        sourceProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
GeodeKafkaSource.class.getName());
+        sourceProps.put(ConnectorConfig.NAME_CONFIG, 
"geode-kafka-source-connector");
+        sourceProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
+        sourceProps.put(REGIONS, TEST_REGIONS);
+        sourceProps.put(TOPICS, TEST_TOPICS);
+
+        herder.putConnectorConfig(
+                sourceProps.get(ConnectorConfig.NAME_CONFIG),
+                sourceProps, true, (error, result)->{
+                    System.out.println("CALLBACK: " + result + "::: error?" + 
error);
+                });
+
+    }
+}

Reply via email to