This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git
commit 455192ad84229e8998475e28dc37bff70edb391e Author: Jason Huynh <[email protected]> AuthorDate: Wed Jan 22 17:33:38 2020 -0800 added missing files --- src/main/java/kafka/GeodeConnectorConfig.java | 41 ++++++++ src/main/java/kafka/GeodeEvent.java | 25 +++++ src/main/java/kafka/GeodeKafkaSink.java | 71 ++++++++++++++ src/main/java/kafka/GeodeKafkaSourceListener.java | 42 ++++++++ src/main/java/kafka/LocatorHostPort.java | 23 +++++ src/test/java/kafka/GeodeKafkaSourceTaskTest.java | 114 ++++++++++++++++++++++ src/test/java/kafka/GeodeKafkaSourceTest.java | 22 +++++ src/test/java/kafka/WorkerAndHerderCluster.java | 23 +++++ src/test/java/kafka/WorkerAndHerderWrapper.java | 72 ++++++++++++++ 9 files changed, 433 insertions(+) diff --git a/src/main/java/kafka/GeodeConnectorConfig.java b/src/main/java/kafka/GeodeConnectorConfig.java new file mode 100644 index 0000000..b7140aa --- /dev/null +++ b/src/main/java/kafka/GeodeConnectorConfig.java @@ -0,0 +1,41 @@ +package kafka; + +public class GeodeConnectorConfig { + + //Geode Configuration + public static final String DURABLE_CLIENT_ID_PREFIX = "DurableClientId"; + public static final String DEFAULT_DURABLE_CLIENT_ID = ""; + public static final String DURABLE_CLIENT_TIME_OUT = "DurableClientTimeout"; + public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000"; + + + //GeodeKafka Specific Configuration + public static final String CQ_PREFIX = "CqPrefix"; + public static final String DEFAULT_CQ_PREFIX = "CqForGeodeKafka"; + /** + * Specifies which Locators to connect to Apache Geode + */ + public static final String LOCATORS = "Locators"; + public static final String DEFAULT_LOCATOR = "localhost[10334]"; + + /** + * Specifies which Regions to connect in Apache Geode + */ + public static final String REGIONS = "Regions"; + + /** + * Specifies which Topics to connect in Kafka + */ + public static final String TOPICS = "Topics"; + + /** + * Property to describe the Source Partition in a record + */ + public static final String REGION_NAME = "RegionName"; //used for Source Partition Events + + public static final String BATCH_SIZE = "GeodeConnectorBatchSize"; + public static final String DEFAULT_BATCH_SIZE = "100"; + + public static final String QUEUE_SIZE = "GeodeConnectorQueueSize"; + public static final String DEFAULT_QUEUE_SIZE = "100000"; +} diff --git a/src/main/java/kafka/GeodeEvent.java b/src/main/java/kafka/GeodeEvent.java new file mode 100644 index 0000000..805f4a0 --- /dev/null +++ b/src/main/java/kafka/GeodeEvent.java @@ -0,0 +1,25 @@ +package kafka; + +import org.apache.geode.cache.query.CqEvent; + +/** + * wrapper class to store regionName and cq event so the correct topics can be updated + */ +public class GeodeEvent { + + private String regionName; + private CqEvent event; + + public GeodeEvent(String regionName, CqEvent event) { + this.regionName = regionName; + this.event = event; + } + + public String getRegionName() { + return regionName; + } + + public CqEvent getEvent() { + return event; + } +} diff --git a/src/main/java/kafka/GeodeKafkaSink.java b/src/main/java/kafka/GeodeKafkaSink.java index af3a22a..67a244e 100644 --- a/src/main/java/kafka/GeodeKafkaSink.java +++ b/src/main/java/kafka/GeodeKafkaSink.java @@ -1,5 +1,76 @@ package kafka; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.utils.AppInfoParser; + +import java.util.Map; + public class GeodeKafkaSink { +// +// /** Sink properties. */ +// private Map<String, String> configProps; +// +// /** Expected configurations. */ +// private static final ConfigDef CONFIG_DEF = new ConfigDef(); +// +// /** {@inheritDoc} */ +// @Override public String version() { +// return AppInfoParser.getVersion(); +// } +// +// /** +// * A sink lifecycle method. Validates grid-specific sink properties. +// * +// * @param props Sink properties. +// */ +// @Override public void start(Map<String, String> props) { +// configProps = props; +// +// try { +// A.notNullOrEmpty(configProps.get(SinkConnector.TOPICS_CONFIG), "topics"); +// A.notNullOrEmpty(configProps.get(IgniteSinkConstants.CACHE_NAME), "cache name"); +// A.notNullOrEmpty(configProps.get(IgniteSinkConstants.CACHE_CFG_PATH), "path to cache config file"); +// } +// catch (IllegalArgumentException e) { +// throw new ConnectException("Cannot start IgniteSinkConnector due to configuration error", e); +// } +// } +// +// /** +// * Obtains a sink task class to be instantiated for feeding data into grid. +// * +// * @return IgniteSinkTask class. +// */ +// @Override public Class<? extends Task> taskClass() { +// return IgniteSinkTask.class; +// } +// +// /** +// * Builds each config for <tt>maxTasks</tt> tasks. +// * +// * @param maxTasks Max number of tasks. +// * @return Task configs. +// */ +// @Override public List<Map<String, String>> taskConfigs(int maxTasks) { +// List<Map<String, String>> taskConfigs = new ArrayList<>(); +// Map<String, String> taskProps = new HashMap<>(); +// +// taskProps.putAll(configProps); +// +// for (int i = 0; i < maxTasks; i++) +// taskConfigs.add(taskProps); +// +// return taskConfigs; +// } +// +// /** {@inheritDoc} */ +// @Override public void stop() { +// // No-op. +// } +// +// /** {@inheritDoc} */ +// @Override public ConfigDef config() { +// return CONFIG_DEF; +// } } diff --git a/src/main/java/kafka/GeodeKafkaSourceListener.java b/src/main/java/kafka/GeodeKafkaSourceListener.java new file mode 100644 index 0000000..ec94ee3 --- /dev/null +++ b/src/main/java/kafka/GeodeKafkaSourceListener.java @@ -0,0 +1,42 @@ +package kafka; + +import org.apache.geode.cache.query.CqEvent; +import org.apache.geode.cache.query.CqListener; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +class GeodeKafkaSourceListener implements CqListener { + + public String regionName; + private BlockingQueue<GeodeEvent> eventBuffer; + + public GeodeKafkaSourceListener(BlockingQueue<GeodeEvent> eventBuffer, String regionName) { + this.eventBuffer = eventBuffer; + this.regionName = regionName; + } + + @Override + public void onEvent(CqEvent aCqEvent) { + try { + System.out.println("JASON cqEvent and putting into eventBuffer"); + eventBuffer.offer(new GeodeEvent(regionName, aCqEvent), 2, TimeUnit.SECONDS); + } catch (InterruptedException e) { + + while (true) { + try { + if (!eventBuffer.offer(new GeodeEvent(regionName, aCqEvent), 2, TimeUnit.SECONDS)) + break; + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + System.out.println("GeodeKafkaSource Queue is full"); + } + } + } + + @Override + public void onError(CqEvent aCqEvent) { + + } +} diff --git a/src/main/java/kafka/LocatorHostPort.java b/src/main/java/kafka/LocatorHostPort.java new file mode 100644 index 0000000..517bad7 --- /dev/null +++ b/src/main/java/kafka/LocatorHostPort.java @@ -0,0 +1,23 @@ +package kafka; + +public class LocatorHostPort { + + private String hostName; + private int port; + + public LocatorHostPort(String hostName, int port) { + this.hostName = hostName; + this.port = port; + } + + public String getHostName() { + return hostName; + } + + public int getPort() { + return port; + } + public String toString() { + return hostName + "[" + port + "]"; + } +} diff --git a/src/test/java/kafka/GeodeKafkaSourceTaskTest.java b/src/test/java/kafka/GeodeKafkaSourceTaskTest.java new file mode 100644 index 0000000..2c15664 --- /dev/null +++ b/src/test/java/kafka/GeodeKafkaSourceTaskTest.java @@ -0,0 +1,114 @@ +package kafka; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static kafka.GeodeConnectorConfig.REGION_NAME; +import static org.hamcrest.CoreMatchers.allOf; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +public class GeodeKafkaSourceTaskTest { + + @Test + public void cqListenerOnEventPopulatesEventsBuffer() { + + } + + @Test + public void pollReturnsEventsWhenEventBufferHasValues() { + + } + + @Test + public void regionsArePassedCorrectlyToTask() { + + } + + @Test + public void installOnGeodeShouldCallCq() { + GeodeKafkaSourceTask task = new GeodeKafkaSourceTask(); + } + + + + @Test + public void parseRegionNamesShouldSplitOnComma() { + GeodeKafkaSourceTask task = new GeodeKafkaSourceTask(); + List<String> regionNames = task.parseNames("region1,region2,region3,region4"); + assertEquals(4, regionNames.size()); + assertThat(true, allOf(is(regionNames.contains("region1")) + , is(regionNames.contains("region2")) + , is(regionNames.contains("region3")) + , is(regionNames.contains("region4")))); + } + + @Test + public void parseRegionNamesShouldChomp() { + GeodeKafkaSourceTask task = new GeodeKafkaSourceTask(); + List<String> regionNames = task.parseNames("region1, region2, region3,region4"); + assertEquals(4, regionNames.size()); + assertThat(true, allOf(is(regionNames instanceof List) + , is(regionNames.contains("region1")) + , is(regionNames.contains("region2")) + , is(regionNames.contains("region3")) + , is(regionNames.contains("region4")))); + } + + @Test + public void createSourcePartitionsShouldReturnAMapOfSourcePartitions() { + GeodeKafkaSourceTask task = new GeodeKafkaSourceTask(); + List<String> regionNames = Arrays.asList(new String[]{"region1", "region2", "region3"}); + Map<String, Map<String,String>> sourcePartitions = task.createSourcePartitionsMap(regionNames); + assertThat(3, is(sourcePartitions.size())); + assertThat(true, is(sourcePartitions.get("region1").get(REGION_NAME).equals("region1"))); + assertThat(true, is(sourcePartitions.get("region2").get(REGION_NAME).equals("region2"))); + assertThat(true, is(sourcePartitions.get("region3").get(REGION_NAME).equals("region3"))); + } + + @Test + public void shouldBeAbleToParseGeodeLocatorStrings() { + GeodeKafkaSourceTask task = new GeodeKafkaSourceTask(); + String locatorString="localhost[8888], localhost[8881]"; + List<LocatorHostPort> locators = task.parseLocators(locatorString); + assertThat(2, is(locators.size())); + } + + @Test + public void listOfLocatorsShouldBeConfiguredIntoClientCache() { + + } + + @Test + public void shouldNotBeDurableIfDurableClientIdIsNull() { + + } + + @Test + public void shouldNotCallReadyForEventsIfDurableClientPrefixIsEmpty() { + + } + + //Source properties tests + @Test + public void propertiesShouldBeCorrectlyTranslatedToConfiguration() { + Map<String, String> props = new HashMap<>(); + props.put(GeodeConnectorConfig.QUEUE_SIZE, GeodeConnectorConfig.DEFAULT_QUEUE_SIZE); + props.put(GeodeConnectorConfig.BATCH_SIZE, GeodeConnectorConfig.DEFAULT_BATCH_SIZE); + + + GeodeKafkaSourceTask task = new GeodeKafkaSourceTask(); + task.start(props); + +// assertThat(task.getQueueSize(GeodeConnectorConfig.QUEUE_SIZE)); + + + } + + +} diff --git a/src/test/java/kafka/GeodeKafkaSourceTest.java b/src/test/java/kafka/GeodeKafkaSourceTest.java new file mode 100644 index 0000000..ec6dff8 --- /dev/null +++ b/src/test/java/kafka/GeodeKafkaSourceTest.java @@ -0,0 +1,22 @@ +package kafka; + +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class GeodeKafkaSourceTest { + + @Test + public void durableClientIdShouldNotBeSetIfPropertyIsNotSet() { + GeodeKafkaSource source = new GeodeKafkaSource(); + Map<String, String> props = new HashMap<>(); + source.start(props); + + } + + @Test + public void cqPrefixShouldBeProperlyCalculatedFromProps() { + + } +} diff --git a/src/test/java/kafka/WorkerAndHerderCluster.java b/src/test/java/kafka/WorkerAndHerderCluster.java new file mode 100644 index 0000000..7357232 --- /dev/null +++ b/src/test/java/kafka/WorkerAndHerderCluster.java @@ -0,0 +1,23 @@ +package kafka; + +import java.io.IOException; + +public class WorkerAndHerderCluster { + + private JavaProcess workerAndHerder; + + public WorkerAndHerderCluster() { + workerAndHerder = new JavaProcess(WorkerAndHerderWrapper.class); + } + + public void start() throws IOException, InterruptedException { + System.out.println("JASON starting worker"); + workerAndHerder.exec(); + + } + + public void stop() { + workerAndHerder.destroy(); + } +} + diff --git a/src/test/java/kafka/WorkerAndHerderWrapper.java b/src/test/java/kafka/WorkerAndHerderWrapper.java new file mode 100644 index 0000000..5f8ccd2 --- /dev/null +++ b/src/test/java/kafka/WorkerAndHerderWrapper.java @@ -0,0 +1,72 @@ +package kafka; + +import org.apache.geode.distributed.ConfigurationProperties; +import org.apache.geode.distributed.Locator; +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy; +import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.Worker; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; +import org.apache.kafka.connect.runtime.standalone.StandaloneHerder; +import org.apache.kafka.connect.storage.MemoryOffsetBackingStore; +import org.apache.kafka.connect.util.ConnectUtils; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import static kafka.GeodeConnectorConfig.REGIONS; +import static kafka.GeodeConnectorConfig.TOPICS; +import static kafka.GeodeKafkaTestCluster.TEST_REGIONS; +import static kafka.GeodeKafkaTestCluster.TEST_TOPICS; + +public class WorkerAndHerderWrapper { + + public static void main(String[] args) throws IOException { + Map props = new HashMap(); + props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put("offset.storage.file.filename", "/tmp/connect.offsets"); + // fast flushing for testing. + props.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10"); + + + props.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); + props.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); + props.put("internal.key.converter.schemas.enable", "false"); + props.put("internal.value.converter.schemas.enable", "false"); + props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); + props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); + props.put("key.converter.schemas.enable", "false"); + props.put("value.converter.schemas.enable", "false"); + + WorkerConfig workerCfg = new StandaloneConfig(props); + + MemoryOffsetBackingStore offBackingStore = new MemoryOffsetBackingStore(); + offBackingStore.configure(workerCfg); + + Worker worker = new Worker("WORKER_ID", new SystemTime(), new Plugins(props), workerCfg, offBackingStore, new AllConnectorClientConfigOverridePolicy()); + worker.start(); + + Herder herder = new StandaloneHerder(worker, ConnectUtils.lookupKafkaClusterId(workerCfg), new AllConnectorClientConfigOverridePolicy()); + herder.start(); + + Map<String, String> sourceProps = new HashMap<>(); + sourceProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSource.class.getName()); + sourceProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-source-connector"); + sourceProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "1"); + sourceProps.put(REGIONS, TEST_REGIONS); + sourceProps.put(TOPICS, TEST_TOPICS); + + herder.putConnectorConfig( + sourceProps.get(ConnectorConfig.NAME_CONFIG), + sourceProps, true, (error, result)->{ + System.out.println("CALLBACK: " + result + "::: error?" + error); + }); + + } +}
