This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git
commit 04bdfc9e4aa3abd336ff98e47d832367de851291 Author: Jason Huynh <[email protected]> AuthorDate: Thu Jan 23 16:43:49 2020 -0800 end to end tests use unique topic/regions, incase of cyclical/endless looping of events --- .../java/geode/kafka/GeodeKafkaTestCluster.java | 30 ++++++++++++---------- .../java/geode/kafka/WorkerAndHerderWrapper.java | 14 +++++----- 2 files changed, 25 insertions(+), 19 deletions(-) diff --git a/src/test/java/geode/kafka/GeodeKafkaTestCluster.java b/src/test/java/geode/kafka/GeodeKafkaTestCluster.java index f0d7f06..ce26354 100644 --- a/src/test/java/geode/kafka/GeodeKafkaTestCluster.java +++ b/src/test/java/geode/kafka/GeodeKafkaTestCluster.java @@ -33,14 +33,13 @@ import org.junit.rules.TemporaryFolder; import java.io.IOException; import java.time.Duration; -import java.util.ArrayList; import java.util.Collections; -import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.awaitility.Awaitility.await; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; public class GeodeKafkaTestCluster { @@ -49,8 +48,11 @@ public class GeodeKafkaTestCluster { public static TemporaryFolder temporaryFolder = new TemporaryFolder(); private static boolean debug = true; - public static String TEST_TOPICS = "someTopic"; - public static String TEST_REGIONS = "someRegion"; + public static String TEST_TOPIC = "someTopic"; + public static String TEST_REGION = "someRegion"; + + public static String TEST_TOPIC_FOR_SINK = "someTopicForSink"; + public static String TEST_REGION_FOR_SINK = "someTopicForSink"; private static ZooKeeperLocalCluster zooKeeperLocalCluster; private static KafkaLocalCluster kafkaLocalCluster; @@ -83,7 +85,8 @@ public class GeodeKafkaTestCluster { KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",false,200000, 15000,10, Time.SYSTEM, "myGroup","myMetricType", null); AdminZkClient adminZkClient = new AdminZkClient(zkClient); - adminZkClient.deleteTopic(TEST_TOPICS); + adminZkClient.deleteTopic(TEST_TOPIC); + adminZkClient.deleteTopic(TEST_TOPIC_FOR_SINK); kafkaLocalCluster.stop(); geodeLocalCluster.stop(); @@ -103,7 +106,9 @@ public class GeodeKafkaTestCluster { Properties topicProperties = new Properties(); topicProperties.put("flush.messages", "1"); AdminZkClient adminZkClient = new AdminZkClient(zkClient); - adminZkClient.createTopic(TEST_TOPICS,1 + adminZkClient.createTopic(TEST_TOPIC,1 + ,1, topicProperties, RackAwareMode.Disabled$.MODULE$); + adminZkClient.createTopic(TEST_TOPIC_FOR_SINK,1 ,1, topicProperties, RackAwareMode.Disabled$.MODULE$); } @@ -174,7 +179,7 @@ public class GeodeKafkaTestCluster { final Consumer<String, String> consumer = new KafkaConsumer<>(props); // Subscribe to the topic. - consumer.subscribe(Collections.singletonList(TEST_TOPICS)); + consumer.subscribe(Collections.singletonList(TEST_TOPIC)); return consumer; } @@ -196,7 +201,7 @@ public class GeodeKafkaTestCluster { @Test public void endToEndSourceTest() { ClientCache client = createGeodeClient(); - Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGIONS); + Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION); //right now just verify something makes it end to end AtomicInteger valueReceived = new AtomicInteger(0); @@ -204,25 +209,24 @@ public class GeodeKafkaTestCluster { region.put("KEY", "VALUE" + System.currentTimeMillis()); ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(4)); for (ConsumerRecord<String, String> record: records) { -// System.out.println("WE consumed a record:" + record); valueReceived.incrementAndGet(); } - return valueReceived.get() > 0; + return valueReceived.get() == 10; }); } @Test public void endToEndSinkTest() { ClientCache client = createGeodeClient(); - Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGIONS); + Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGION_FOR_SINK); Producer<String, String> producer = createProducer(); for (int i = 0; i < 10; i++) { - producer.send(new ProducerRecord(TEST_TOPICS, "KEY" + i, "VALUE" + i)); + producer.send(new ProducerRecord(TEST_TOPIC_FOR_SINK, "KEY" + i, "VALUE" + i)); } int i = 0; - await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> assertNotNull(region.get("KEY" + i))); + await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(10, region.sizeOnServer())); } } diff --git a/src/test/java/geode/kafka/WorkerAndHerderWrapper.java b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java index d6fc7a6..a33c135 100644 --- a/src/test/java/geode/kafka/WorkerAndHerderWrapper.java +++ b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java @@ -20,8 +20,10 @@ import java.util.Map; import static geode.kafka.GeodeConnectorConfig.REGIONS; import static geode.kafka.GeodeConnectorConfig.TOPICS; -import static geode.kafka.GeodeKafkaTestCluster.TEST_REGIONS; -import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPICS; +import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION; +import static geode.kafka.GeodeKafkaTestCluster.TEST_REGION_FOR_SINK; +import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPIC; +import static geode.kafka.GeodeKafkaTestCluster.TEST_TOPIC_FOR_SINK; public class WorkerAndHerderWrapper { @@ -57,8 +59,8 @@ public class WorkerAndHerderWrapper { sourceProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSource.class.getName()); sourceProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-source-connector"); sourceProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "1"); - sourceProps.put(REGIONS, TEST_REGIONS); - sourceProps.put(TOPICS, TEST_TOPICS); + sourceProps.put(REGIONS, TEST_REGION); + sourceProps.put(TOPICS, TEST_TOPIC); herder.putConnectorConfig( sourceProps.get(ConnectorConfig.NAME_CONFIG), @@ -69,8 +71,8 @@ public class WorkerAndHerderWrapper { sinkProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSink.class.getName()); sinkProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-sink-connector"); sinkProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "1"); - sinkProps.put(REGIONS, TEST_REGIONS); - sinkProps.put(TOPICS, TEST_TOPICS); + sinkProps.put(REGIONS, TEST_REGION_FOR_SINK); + sinkProps.put(TOPICS, TEST_TOPIC_FOR_SINK); herder.putConnectorConfig( sinkProps.get(ConnectorConfig.NAME_CONFIG),
