This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git
commit d4cf91517e2880ff5636144dccded9f183363ce6 Author: Jason Huynh <[email protected]> AuthorDate: Wed Jan 29 16:38:29 2020 -0800 Reworked region to topic and topic to region parsing Tasks are assigned region to topic or topic to region groups A task can handle multiple regions or multiple topics based on configuration --- build.gradle | 1 + .../java/geode/kafka/GeodeConnectorConfig.java | 79 ++++++++++++---- src/main/java/geode/kafka/sink/GeodeKafkaSink.java | 14 +-- .../java/geode/kafka/source/GeodeKafkaSource.java | 13 +-- .../geode/kafka/source/GeodeKafkaSourceTask.java | 30 +++---- .../java/geode/kafka/GeodeConnectorConfigTest.java | 100 +++++++++++++++++++-- .../java/geode/kafka/GeodeKafkaTestCluster.java | 17 ++-- src/test/java/geode/kafka/GeodeLocalCluster.java | 1 - .../java/geode/kafka/ServerLauncherWrapper.java | 7 +- .../java/geode/kafka/WorkerAndHerderWrapper.java | 13 ++- .../kafka/source/GeodeKafkaSourceTaskTest.java | 92 ++++++++++++++----- 11 files changed, 267 insertions(+), 100 deletions(-) diff --git a/build.gradle b/build.gradle index 515967d..a131d22 100644 --- a/build.gradle +++ b/build.gradle @@ -27,6 +27,7 @@ dependencies { testCompile group: 'junit', name: 'junit', version: '4.12' testCompile 'org.mockito:mockito-core:3.2.4' + testCompile 'pl.pragmatists:JUnitParams:1.1.1' testImplementation 'org.awaitility:awaitility:4.0.2' } diff --git a/src/main/java/geode/kafka/GeodeConnectorConfig.java b/src/main/java/geode/kafka/GeodeConnectorConfig.java index 11eff62..dcc479e 100644 --- a/src/main/java/geode/kafka/GeodeConnectorConfig.java +++ b/src/main/java/geode/kafka/GeodeConnectorConfig.java @@ -25,16 +25,19 @@ public class GeodeConnectorConfig { public static final String LOCATORS = "locators"; public static final String DEFAULT_LOCATOR = "localhost[10334]"; - /** - * Specifies which Regions to connect in Apache Geode - */ - public static final String REGIONS = "regions"; /** - * Specifies which Topics to connect in Kafka + * Specifies which Topics to connect in Kafka, uses the variable name with Kafka Sink Configuration + * Only used in sink configuration */ public static final String TOPICS = "topics"; + //Used by sink + public static final String TOPIC_TO_REGION_BINDINGS = "topicToRegion"; + + //Used by source + public static final String REGION_TO_TOPIC_BINDINGS = "regionToTopic"; + /** * Property to describe the Source Partition in a record */ @@ -54,8 +57,9 @@ public class GeodeConnectorConfig { private final String durableClientId; private final String durableClientIdPrefix; private final String durableClientTimeout; - private List<String> regionNames; - private List<String> topics; + + private Map<String, List<String>> regionToTopics; + private Map<String, List<String>> topicToRegions; private List<LocatorHostPort> locatorHostPorts; //just for tests @@ -71,22 +75,59 @@ public class GeodeConnectorConfig { durableClientIdPrefix = connectorProperties.get(DURABLE_CLIENT_ID_PREFIX); if (isDurable(durableClientIdPrefix)) { durableClientId = durableClientIdPrefix + taskId; - } - else { + } else { durableClientId = ""; } durableClientTimeout = connectorProperties.get(DURABLE_CLIENT_TIME_OUT); - regionNames = parseNames(connectorProperties.get(GeodeConnectorConfig.REGIONS)); - topics = parseNames(connectorProperties.get(GeodeConnectorConfig.TOPICS)); + regionToTopics = parseRegionToTopics(connectorProperties.get(GeodeConnectorConfig.REGION_TO_TOPIC_BINDINGS)); + topicToRegions = parseTopicToRegions(connectorProperties.get(GeodeConnectorConfig.TOPIC_TO_REGION_BINDINGS)); locatorHostPorts = parseLocators(connectorProperties.get(GeodeConnectorConfig.LOCATORS)); } + + public static Map<String, List<String>> parseTopicToRegions(String combinedBindings) { + //It's the same formatting, so parsing is the same going topic to region or region to topic + return parseRegionToTopics(combinedBindings); + } + + /** + * Given a string of the form [region:topic,...] will produce a map where the key is the + * regionName and the value is a list of topicNames to push values to + * + * @param combinedBindings a string with similar form to "[region:topic,...], [region2:topic2,...] + * @return mapping of regionName to list of topics to update + */ + public static Map<String, List<String>> parseRegionToTopics(String combinedBindings) { + if (combinedBindings == "" || combinedBindings == null){ + return null; + } + List<String> bindings = parseBindings(combinedBindings); + return bindings.stream().map(binding -> { + String[] regionToTopicsArray = parseBinding(binding); + return regionToTopicsArray; + }).collect(Collectors.toMap(regionToTopicsArray -> regionToTopicsArray[0], regionToTopicsArray -> parseNames(regionToTopicsArray[1]))); + } + + public static List<String> parseBindings(String bindings) { + return Arrays.stream(bindings.split("](\\s)*,")).map((s) -> { + s = s.replaceAll("\\[", ""); + s = s.replaceAll("\\]", ""); + s = s.trim(); + return s; + }).collect(Collectors.toList()); + } + + private static String[] parseBinding(String binding) { + return binding.split(":"); + } + + //Used to parse a string of topics or regions public static List<String> parseNames(String names) { return Arrays.stream(names.split(",")).map((s) -> s.trim()).collect(Collectors.toList()); } public static String reconstructString(Collection<String> strings) { - return strings.stream().collect(Collectors.joining(",")); + return strings.stream().collect(Collectors.joining("],[")) + "]"; } List<LocatorHostPort> parseLocators(String locators) { @@ -115,7 +156,6 @@ public class GeodeConnectorConfig { return !durableClientId.equals(""); } - public int getTaskId() { return taskId; } @@ -128,15 +168,16 @@ public class GeodeConnectorConfig { return durableClientTimeout; } - public List<String> getRegionNames() { - return regionNames; + public List<LocatorHostPort> getLocatorHostPorts() { + return locatorHostPorts; } - public List<String> getTopics() { - return topics; + public Map<String, List<String>> getRegionToTopics() { + return regionToTopics; } - public List<LocatorHostPort> getLocatorHostPorts() { - return locatorHostPorts; + public Map<String, List<String>> getTopicToRegions() { + return topicToRegions; } + } diff --git a/src/main/java/geode/kafka/sink/GeodeKafkaSink.java b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java index c7afb96..99f9b9d 100644 --- a/src/main/java/geode/kafka/sink/GeodeKafkaSink.java +++ b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java @@ -23,8 +23,7 @@ import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX; import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT; import static geode.kafka.GeodeConnectorConfig.LOCATORS; import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE; -import static geode.kafka.GeodeConnectorConfig.REGIONS; -import static geode.kafka.GeodeConnectorConfig.TOPICS; +import static geode.kafka.GeodeConnectorConfig.TOPIC_TO_REGION_BINDINGS; public class GeodeKafkaSink extends SinkConnector { private static final ConfigDef CONFIG_DEF = new ConfigDef(); @@ -45,17 +44,12 @@ public class GeodeKafkaSink extends SinkConnector { List<Map<String, String>> taskConfigs = new ArrayList<>(); Map<String, String> taskProps = new HashMap<>(); taskProps.putAll(sharedProps); - - List<String> topics = GeodeConnectorConfig.parseNames(taskProps.get(TOPICS)); - List<List<String>> topicsPerTask = ConnectorUtils.groupPartitions(topics, maxTasks); - - List<String> regions = GeodeConnectorConfig.parseNames(taskProps.get(REGIONS)); - List<List<String>> regionsPerTask = ConnectorUtils.groupPartitions(regions, maxTasks); + List<String> bindings = GeodeConnectorConfig.parseNames(taskProps.get(TOPIC_TO_REGION_BINDINGS)); + List<List<String>> bindingsPerTask = ConnectorUtils.groupPartitions(bindings, maxTasks); for (int i = 0; i < maxTasks; i++) { taskProps.put(GeodeConnectorConfig.TASK_ID, "" + i); - taskProps.put(TOPICS, GeodeConnectorConfig.reconstructString(topicsPerTask.get(i))); - taskProps.put(REGIONS, GeodeConnectorConfig.reconstructString(regionsPerTask.get(i))); + taskProps.put(TOPIC_TO_REGION_BINDINGS, GeodeConnectorConfig.reconstructString(bindingsPerTask.get(i))); taskConfigs.add(taskProps); } diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSource.java b/src/main/java/geode/kafka/source/GeodeKafkaSource.java index ca3f2c8..91e6203 100644 --- a/src/main/java/geode/kafka/source/GeodeKafkaSource.java +++ b/src/main/java/geode/kafka/source/GeodeKafkaSource.java @@ -26,8 +26,7 @@ import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT; import static geode.kafka.GeodeConnectorConfig.LOAD_ENTIRE_REGION; import static geode.kafka.GeodeConnectorConfig.LOCATORS; import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE; -import static geode.kafka.GeodeConnectorConfig.REGIONS; -import static geode.kafka.GeodeConnectorConfig.TOPICS; +import static geode.kafka.GeodeConnectorConfig.REGION_TO_TOPIC_BINDINGS; public class GeodeKafkaSource extends SourceConnector { @@ -48,16 +47,12 @@ public class GeodeKafkaSource extends SourceConnector { Map<String, String> taskProps = new HashMap<>(); taskProps.putAll(sharedProps); - List<String> topics = GeodeConnectorConfig.parseNames(taskProps.get(TOPICS)); - List<List<String>> topicsPerTask = ConnectorUtils.groupPartitions(topics, maxTasks); - - List<String> regions = GeodeConnectorConfig.parseNames(taskProps.get(REGIONS)); - List<List<String>> regionsPerTask = ConnectorUtils.groupPartitions(regions, maxTasks); + List<String> bindings = GeodeConnectorConfig.parseNames(taskProps.get(REGION_TO_TOPIC_BINDINGS)); + List<List<String>> bindingsPerTask = ConnectorUtils.groupPartitions(bindings, maxTasks); for (int i = 0; i < maxTasks; i++) { taskProps.put(GeodeConnectorConfig.TASK_ID, "" + i); - taskProps.put(TOPICS, GeodeConnectorConfig.reconstructString(topicsPerTask.get(i))); - taskProps.put(REGIONS, GeodeConnectorConfig.reconstructString(regionsPerTask.get(i))); + taskProps.put(REGION_TO_TOPIC_BINDINGS, GeodeConnectorConfig.reconstructString(bindingsPerTask.get(i))); taskConfigs.add(taskProps); } return taskConfigs; diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java index 8080179..dadc8ba 100644 --- a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java +++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java @@ -38,7 +38,7 @@ public class GeodeKafkaSourceTask extends SourceTask { private GeodeContext geodeContext; private GeodeConnectorConfig geodeConnectorConfig; - private List<String> topics; + private Map<String, List<String>> regionToTopics; private Map<String, Map<String, String>> sourcePartitions; private BlockingQueue<GeodeEvent> eventBuffer; private int batchSize; @@ -55,12 +55,6 @@ public class GeodeKafkaSourceTask extends SourceTask { return null; } - void startForTesting(BlockingQueue eventBuffer, List<String> topics, int batchSize) { - this.eventBuffer = eventBuffer; - this.topics = topics; - this.batchSize = batchSize; - } - @Override public void start(Map<String, String> props) { try { @@ -72,15 +66,14 @@ public class GeodeKafkaSourceTask extends SourceTask { int queueSize = Integer.parseInt(props.get(QUEUE_SIZE)); eventBuffer = new LinkedBlockingQueue<>(queueSize); - sourcePartitions = createSourcePartitionsMap(geodeConnectorConfig.getRegionNames()); - topics = geodeConnectorConfig.getTopics(); + regionToTopics = geodeConnectorConfig.getRegionToTopics(); + sourcePartitions = createSourcePartitionsMap(regionToTopics.keySet()); String cqPrefix = props.get(CQ_PREFIX); boolean loadEntireRegion = Boolean.parseBoolean(props.get(LOAD_ENTIRE_REGION)); installOnGeode(geodeConnectorConfig, geodeContext, eventBuffer, cqPrefix, loadEntireRegion); - } - catch (Exception e) { + } catch (Exception e) { logger.error("Unable to start source task", e); throw e; } @@ -92,8 +85,10 @@ public class GeodeKafkaSourceTask extends SourceTask { ArrayList<GeodeEvent> events = new ArrayList<>(batchSize); if (eventBuffer.drainTo(events, batchSize) > 0) { for (GeodeEvent event : events) { + String regionName = event.getRegionName(); + List<String> topics = regionToTopics.get(regionName); for (String topic : topics) { - records.add(new SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, topic, null, event.getEvent().getNewValue())); + records.add(new SourceRecord(sourcePartitions.get(regionName), OFFSET_DEFAULT, topic, null, event.getEvent().getNewValue())); } } return records; @@ -108,9 +103,9 @@ public class GeodeKafkaSourceTask extends SourceTask { } void installOnGeode(GeodeConnectorConfig geodeConnectorConfig, GeodeContext geodeContext, BlockingQueue eventBuffer, String cqPrefix, boolean loadEntireRegion) { - boolean isDurable = geodeConnectorConfig.isDurable(); - int taskId = geodeConnectorConfig.getTaskId(); - for (String region : geodeConnectorConfig.getRegionNames()) { + boolean isDurable = geodeConnectorConfig.isDurable(); + int taskId = geodeConnectorConfig.getTaskId(); + for (String region : geodeConnectorConfig.getRegionToTopics().keySet()) { installListenersToRegion(geodeContext, taskId, eventBuffer, region, cqPrefix, loadEntireRegion, isDurable); } if (isDurable) { @@ -132,8 +127,7 @@ public class GeodeKafkaSourceTask extends SourceTask { geodeContext.newCq(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes, isDurable); } - } - finally { + } finally { listener.signalInitialResultsLoaded(); } return listener; @@ -145,7 +139,7 @@ public class GeodeKafkaSourceTask extends SourceTask { * @param regionNames list of regionNames * @return Map<String, Map < String, String>> a map of source partitions, keyed by region name */ - Map<String, Map<String, String>> createSourcePartitionsMap(List<String> regionNames) { + Map<String, Map<String, String>> createSourcePartitionsMap(Collection<String> regionNames) { return regionNames.stream().map(regionName -> { Map<String, String> sourcePartition = new HashMap<>(); sourcePartition.put(REGION_NAME, regionName); diff --git a/src/test/java/geode/kafka/GeodeConnectorConfigTest.java b/src/test/java/geode/kafka/GeodeConnectorConfigTest.java index 8f06f61..6a39c5d 100644 --- a/src/test/java/geode/kafka/GeodeConnectorConfigTest.java +++ b/src/test/java/geode/kafka/GeodeConnectorConfigTest.java @@ -1,16 +1,26 @@ package geode.kafka; +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; import org.junit.Test; +import org.junit.runner.RunWith; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX; +import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT; +import static geode.kafka.GeodeConnectorConfig.LOCATORS; +import static geode.kafka.GeodeConnectorConfig.TASK_ID; import static org.hamcrest.CoreMatchers.allOf; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +@RunWith(JUnitParamsRunner.class) public class GeodeConnectorConfigTest { @Test @@ -45,14 +55,94 @@ public class GeodeConnectorConfigTest { } @Test - public void durableClientIdShouldNotBeSetIfPropertyIsNotSet() { - Map<String, String> props = new HashMap<>(); - GeodeConnectorConfig config = new GeodeConnectorConfig(props); - assertEquals("", config.getDurableClientId()); + @Parameters(method="oneToOneBindings") + public void parseBindingsCanSplitOneToOneBindings(String value) { + List<String> splitBindings = GeodeConnectorConfig.parseBindings(value); + assertEquals(2, splitBindings.size()); + } + + @Test + public void parseBindingsCanSplitASingleOneToOneBindings() { + String binding = "[region1:topic1]"; + List<String> splitBindings = GeodeConnectorConfig.parseBindings(binding); + assertEquals(1, splitBindings.size()); + assertEquals(binding.replaceAll("\\[", "").replaceAll("\\]", ""), splitBindings.get(0)); + } + + public List<String> oneToOneBindings() { + return Arrays.asList(new String[]{"[region1:topic1],[region2:topic2]" + ,"[region1:topic1] , [region2:topic2]" + ,"[region1:topic1], [region2:topic2] ," + ,"[region1: topic1], [region2 :topic2]"}); + } + + @Test + @Parameters(method="oneToManyBindings") + public void parseBindingsCanSplitOneToManyBindings(String value) { + List<String> splitBindings = GeodeConnectorConfig.parseBindings(value); + assertEquals(Arrays.toString(splitBindings.toArray()), 2, splitBindings.size()); + } + + public List<String> oneToManyBindings() { + return Arrays.asList(new String[]{"[region1:topic1,topic2],[region2:topic2,topic3]" + ,"[region1:topic1 , topic2] , [region2:topic2 , topic3]" + ,"[region1:topic1 ,], [region2:topic2 ,] ," + ,"[region1: topic1 ,topic3], [region2 :topic2]"}); + } + + + @Test + @Parameters(method="oneToManyBindings") + public void reconstructBindingsToStringShouldReformAParsableString(String value) { + List<String> splitBindings = GeodeConnectorConfig.parseBindings(value); + String reconstructString = GeodeConnectorConfig.reconstructString(splitBindings); + splitBindings = GeodeConnectorConfig.parseBindings(reconstructString); + assertEquals(Arrays.toString(splitBindings.toArray()), 2, splitBindings.size()); } @Test - public void cqPrefixShouldBeProperlyCalculatedFromProps() { + @Parameters(method="oneToOneBindings") + public void configurationShouldReturnRegionToTopicsMappingWhenParseRegionToTopics(String value) { + Map<String, List<String>> regionToTopics = GeodeConnectorConfig.parseRegionToTopics(value); + assertEquals(2, regionToTopics.size()); + assertTrue(regionToTopics.get("region1") != null); + assertEquals(1, regionToTopics.get("region1").size()); + assertTrue(regionToTopics.get("region1").contains("topic1")); + } + @Test + public void regionToTopicParsingShouldParseCorrectlyWithASingleBinding() { + Map<String, List<String>> regionToTopics = GeodeConnectorConfig.parseRegionToTopics("[region1:topic1]"); + assertTrue(regionToTopics.get("region1") != null); + assertEquals(1, regionToTopics.get("region1").size()); + assertTrue(regionToTopics.get("region1").contains("topic1")); } + + + /* + taskId = Integer.parseInt(connectorProperties.get(TASK_ID)); + durableClientIdPrefix = connectorProperties.get(DURABLE_CLIENT_ID_PREFIX); + if (isDurable(durableClientIdPrefix)) { + durableClientId = durableClientIdPrefix + taskId; + } else { + durableClientId = ""; + } + durableClientTimeout = connectorProperties.get(DURABLE_CLIENT_TIME_OUT); + regionToTopics = parseRegionToTopics(connectorProperties.get(GeodeConnectorConfig.REGION_TO_TOPIC_BINDINGS)); + topicToRegions = parseTopicToRegions(connectorProperties.get(GeodeConnectorConfig.REGION_TO_TOPIC_BINDINGS)); + locatorHostPorts = parseLocators(connectorProperties.get(GeodeConnectorConfig.LOCATORS)); + + */ + + + @Test + public void durableClientIdShouldNotBeSetIfPrefixIsEmpty() { + Map<String, String> props = new HashMap<>(); + props.put(TASK_ID, "0"); + props.put(DURABLE_CLIENT_ID_PREFIX, ""); + props.put(LOCATORS, "localhost[10334]"); + GeodeConnectorConfig config = new GeodeConnectorConfig(props); + assertEquals("", config.getDurableClientId()); + } + } diff --git a/src/test/java/geode/kafka/GeodeKafkaTestCluster.java b/src/test/java/geode/kafka/GeodeKafkaTestCluster.java index ce26354..c6cb832 100644 --- a/src/test/java/geode/kafka/GeodeKafkaTestCluster.java +++ b/src/test/java/geode/kafka/GeodeKafkaTestCluster.java @@ -40,7 +40,6 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; public class GeodeKafkaTestCluster { @@ -48,11 +47,13 @@ public class GeodeKafkaTestCluster { public static TemporaryFolder temporaryFolder = new TemporaryFolder(); private static boolean debug = true; - public static String TEST_TOPIC = "someTopic"; - public static String TEST_REGION = "someRegion"; + public static String TEST_REGION_TO_TOPIC_BINDINGS = "[someRegionForSource:someTopicForSource]"; + public static String TEST_TOPIC_TO_REGION_BINDINGS = "[someTopicForSink:someRegionForSink]"; + public static String TEST_TOPIC_FOR_SOURCE = "someTopicForSource"; + public static String TEST_REGION_FOR_SOURCE = "someRegionForSource"; public static String TEST_TOPIC_FOR_SINK = "someTopicForSink"; - public static String TEST_REGION_FOR_SINK = "someTopicForSink"; + public static String TEST_REGION_FOR_SINK = "someRegionForSink"; private static ZooKeeperLocalCluster zooKeeperLocalCluster; private static KafkaLocalCluster kafkaLocalCluster; @@ -85,7 +86,7 @@ public class GeodeKafkaTestCluster { KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",false,200000, 15000,10, Time.SYSTEM, "myGroup","myMetricType", null); AdminZkClient adminZkClient = new AdminZkClient(zkClient); - adminZkClient.deleteTopic(TEST_TOPIC); + adminZkClient.deleteTopic(TEST_TOPIC_FOR_SOURCE); adminZkClient.deleteTopic(TEST_TOPIC_FOR_SINK); kafkaLocalCluster.stop(); @@ -106,7 +107,7 @@ public class GeodeKafkaTestCluster { Properties topicProperties = new Properties(); topicProperties.put("flush.messages", "1"); AdminZkClient adminZkClient = new AdminZkClient(zkClient); - adminZkClient.createTopic(TEST_TOPIC,1 + adminZkClient.createTopic(TEST_TOPIC_FOR_SOURCE,1 ,1, topicProperties, RackAwareMode.Disabled$.MODULE$); adminZkClient.createTopic(TEST_TOPIC_FOR_SINK,1 ,1, topicProperties, RackAwareMode.Disabled$.MODULE$); @@ -179,7 +180,7 @@ public class GeodeKafkaTestCluster { final Consumer<String, String> consumer = new KafkaConsumer<>(props); // Subscribe to the topic. - consumer.subscribe(Collections.singletonList(TEST_TOPIC)); + consumer.subscribe(Collections.singletonList(TEST_TOPIC_FOR_SOURCE)); return consumer; } @@ -201,7 +202,7 @@ public class GeodeKafkaTestCluster { @Test public void endToEndSourceTest() { ClientCache client = createGeodeClient(); - Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION); + Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SOURCE); //right now just verify something makes it end to end AtomicInteger valueReceived = new AtomicInteger(0); diff --git a/src/test/java/geode/kafka/GeodeLocalCluster.java b/src/test/java/geode/kafka/GeodeLocalCluster.java index afd3b9d..276dc05 100644 --- a/src/test/java/geode/kafka/GeodeLocalCluster.java +++ b/src/test/java/geode/kafka/GeodeLocalCluster.java @@ -16,7 +16,6 @@ public class GeodeLocalCluster { System.out.println("starting locator"); locatorProcess.exec("10334"); Thread.sleep(15000); - System.out.println("is alive?" + locatorProcess.process.isAlive()); serverProcess.exec("40404"); Thread.sleep(30000); } diff --git a/src/test/java/geode/kafka/ServerLauncherWrapper.java b/src/test/java/geode/kafka/ServerLauncherWrapper.java index b36a3aa..7ef9db3 100644 --- a/src/test/java/geode/kafka/ServerLauncherWrapper.java +++ b/src/test/java/geode/kafka/ServerLauncherWrapper.java @@ -11,6 +11,9 @@ import org.apache.geode.pdx.ReflectionBasedAutoSerializer; import java.io.IOException; import java.util.Properties; +import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION_FOR_SINK; +import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION_FOR_SOURCE; + public class ServerLauncherWrapper { public static void main(String... args) throws IOException { @@ -43,11 +46,11 @@ public class ServerLauncherWrapper { .create(); CacheServer cacheServer = cache.addCacheServer(); cacheServer.setPort(0); -// cacheServer.setMaxConnections(Integer.MAX_VALUE); cacheServer.start(); //create the region - cache.createRegionFactory(RegionShortcut.PARTITION).create("someRegion"); + cache.createRegionFactory(RegionShortcut.PARTITION).create(TEST_REGION_FOR_SINK); + cache.createRegionFactory(RegionShortcut.PARTITION).create(TEST_REGION_FOR_SOURCE); System.out.println("starting cacheserver"); while (true) { diff --git a/src/test/java/geode/kafka/WorkerAndHerderWrapper.java b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java index a33c135..5e8f074 100644 --- a/src/test/java/geode/kafka/WorkerAndHerderWrapper.java +++ b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java @@ -18,12 +18,12 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; -import static geode.kafka.GeodeConnectorConfig.REGIONS; +import static geode.kafka.GeodeConnectorConfig.REGION_TO_TOPIC_BINDINGS; import static geode.kafka.GeodeConnectorConfig.TOPICS; -import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION; -import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION_FOR_SINK; -import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPIC; +import static geode.kafka.GeodeConnectorConfig.TOPIC_TO_REGION_BINDINGS; +import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION_TO_TOPIC_BINDINGS; import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPIC_FOR_SINK; +import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPIC_TO_REGION_BINDINGS; public class WorkerAndHerderWrapper { @@ -59,8 +59,7 @@ public class WorkerAndHerderWrapper { sourceProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSource.class.getName()); sourceProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-source-connector"); sourceProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "1"); - sourceProps.put(REGIONS, TEST_REGION); - sourceProps.put(TOPICS, TEST_TOPIC); + sourceProps.put(REGION_TO_TOPIC_BINDINGS, TEST_REGION_TO_TOPIC_BINDINGS); herder.putConnectorConfig( sourceProps.get(ConnectorConfig.NAME_CONFIG), @@ -71,7 +70,7 @@ public class WorkerAndHerderWrapper { sinkProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSink.class.getName()); sinkProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-sink-connector"); sinkProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "1"); - sinkProps.put(REGIONS, TEST_REGION_FOR_SINK); + sinkProps.put(TOPIC_TO_REGION_BINDINGS, TEST_TOPIC_TO_REGION_BINDINGS); sinkProps.put(TOPICS, TEST_TOPIC_FOR_SINK); herder.putConnectorConfig( diff --git a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java index 041adc8..d4149db 100644 --- a/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java +++ b/src/test/java/geode/kafka/source/GeodeKafkaSourceTaskTest.java @@ -2,8 +2,8 @@ package geode.kafka.source; import geode.kafka.GeodeConnectorConfig; import geode.kafka.GeodeContext; +import org.apache.geode.cache.client.ClientCache; import org.apache.geode.cache.query.CqEvent; -import org.apache.kafka.connect.source.SourceRecord; import org.junit.Test; import java.util.ArrayList; @@ -15,15 +15,8 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import static geode.kafka.GeodeConnectorConfig.BATCH_SIZE; import static geode.kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX; -import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOCATOR; -import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX; -import static geode.kafka.GeodeConnectorConfig.LOCATORS; -import static geode.kafka.GeodeConnectorConfig.REGIONS; import static geode.kafka.GeodeConnectorConfig.REGION_NAME; -import static geode.kafka.GeodeConnectorConfig.TASK_ID; -import static geode.kafka.GeodeConnectorConfig.TOPICS; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -31,6 +24,8 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -87,6 +82,70 @@ public class GeodeKafkaSourceTaskTest { } @Test + public void readyForEventsIsCalledIfDurable() { + ClientCache clientCache = mock(ClientCache.class); + + GeodeContext geodeContext = mock(GeodeContext.class); + when(geodeContext.getClientCache()).thenReturn(clientCache); + + GeodeConnectorConfig config = mock(GeodeConnectorConfig.class); + when (config.isDurable()).thenReturn(true); + GeodeKafkaSourceTask task = new GeodeKafkaSourceTask(); + task.installOnGeode(config, geodeContext, null, "", false); + verify(clientCache, times(1)).readyForEvents(); + } + + @Test + public void cqIsInvokedForEveryRegionWithATopic() { + ClientCache clientCache = mock(ClientCache.class); + + GeodeContext geodeContext = mock(GeodeContext.class); + when(geodeContext.getClientCache()).thenReturn(clientCache); + + Map<String, List<String>> regionToTopicsMap = new HashMap<>(); + regionToTopicsMap.put("region1", new ArrayList()); + + GeodeConnectorConfig config = mock(GeodeConnectorConfig.class); + when (config.getRegionToTopics()).thenReturn(regionToTopicsMap); + + GeodeKafkaSourceTask task = new GeodeKafkaSourceTask(); + task.installOnGeode(config, geodeContext, null, "someCqPrefix", false); + verify(geodeContext, times(1)).newCq(anyString(), anyString(), any(), anyBoolean()); + } + + @Test + public void cqWithInitialResultsIsInvokedForEveryRegionWithATopicIfLoadEntireIsSet() { + ClientCache clientCache = mock(ClientCache.class); + + GeodeContext geodeContext = mock(GeodeContext.class); + when(geodeContext.getClientCache()).thenReturn(clientCache); + + Map<String, List<String>> regionToTopicsMap = new HashMap<>(); + regionToTopicsMap.put("region1", new ArrayList()); + + GeodeConnectorConfig config = mock(GeodeConnectorConfig.class); + when (config.getRegionToTopics()).thenReturn(regionToTopicsMap); + + GeodeKafkaSourceTask task = new GeodeKafkaSourceTask(); + task.installOnGeode(config, geodeContext, new LinkedBlockingQueue(), "someCqPrefix", true); + verify(geodeContext, times(1)).newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean()); + } + + @Test + public void readyForEventsIsNotCalledIfNotDurable() { + ClientCache clientCache = mock(ClientCache.class); + + GeodeContext geodeContext = mock(GeodeContext.class); + when(geodeContext.getClientCache()).thenReturn(clientCache); + + GeodeConnectorConfig config = mock(GeodeConnectorConfig.class); + when (config.isDurable()).thenReturn(false); + GeodeKafkaSourceTask task = new GeodeKafkaSourceTask(); + task.installOnGeode(config, geodeContext, null, "", false); + verify(clientCache, times(0)).readyForEvents(); + } + + @Test public void pollReturnsEventsWhenEventBufferHasValues() throws Exception { // BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100); // CqEvent cqEvent = mock(CqEvent.class); @@ -139,20 +198,11 @@ public class GeodeKafkaSourceTaskTest { } - //Source properties tests - @Test - public void propertiesShouldBeCorrectlyTranslatedToConfiguration() { - Map<String, String> props = new HashMap<>(); - props.put(GeodeConnectorConfig.QUEUE_SIZE, GeodeConnectorConfig.DEFAULT_QUEUE_SIZE); - props.put(GeodeConnectorConfig.BATCH_SIZE, GeodeConnectorConfig.DEFAULT_BATCH_SIZE); - - GeodeKafkaSourceTask task = new GeodeKafkaSourceTask(); -// task.start(props); - -// assertThat(task.getQueueSize(GeodeConnectorConfig.QUEUE_SIZE)); - + @Test + public void cqPrefixShouldBeProperlyCalculatedFromProps() { +// GeodeContext geodeContext = mock(GeodeContext.class); +// GeodeKafkaSourceTask task = new GeodeKafkaSourceTask(); } - }
