This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git
commit a570805518713d82c8819daa01e8fa337d1cf787 Author: Jason Huynh <[email protected]> AuthorDate: Thu Jan 23 15:43:11 2020 -0800 Added sink task and sink class Added end to end sink test --- src/main/java/geode/kafka/sink/GeodeKafkaSink.java | 143 +++++++++++---------- .../java/geode/kafka/source/GeodeKafkaSource.java | 2 +- .../geode/kafka/source/GeodeKafkaSourceTask.java | 4 +- .../java/geode/kafka/GeodeKafkaTestCluster.java | 52 +++++++- .../java/geode/kafka/WorkerAndHerderWrapper.java | 15 ++- 5 files changed, 140 insertions(+), 76 deletions(-) diff --git a/src/main/java/geode/kafka/sink/GeodeKafkaSink.java b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java index 68460e4..08af2ad 100644 --- a/src/main/java/geode/kafka/sink/GeodeKafkaSink.java +++ b/src/main/java/geode/kafka/sink/GeodeKafkaSink.java @@ -1,76 +1,81 @@ package geode.kafka.sink; +import geode.kafka.GeodeConnectorConfig; import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.sink.SinkConnector; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Map; -public class GeodeKafkaSink { - -// -// /** Sink properties. */ -// private Map<String, String> configProps; -// -// /** Expected configurations. */ -// private static final ConfigDef CONFIG_DEF = new ConfigDef(); -// -// /** {@inheritDoc} */ -// @Override public String version() { -// return AppInfoParser.getVersion(); -// } -// -// /** -// * A sink lifecycle method. Validates grid-specific sink properties. -// * -// * @param props Sink properties. -// */ -// @Override public void start(Map<String, String> props) { -// configProps = props; -// -// try { -// A.notNullOrEmpty(configProps.get(SinkConnector.TOPICS_CONFIG), "topics"); -// A.notNullOrEmpty(configProps.get(IgniteSinkConstants.CACHE_NAME), "cache name"); -// A.notNullOrEmpty(configProps.get(IgniteSinkConstants.CACHE_CFG_PATH), "path to cache config file"); -// } -// catch (IllegalArgumentException e) { -// throw new ConnectException("Cannot start IgniteSinkConnector due to configuration error", e); -// } -// } -// -// /** -// * Obtains a sink task class to be instantiated for feeding data into grid. -// * -// * @return IgniteSinkTask class. -// */ -// @Override public Class<? extends Task> taskClass() { -// return IgniteSinkTask.class; -// } -// -// /** -// * Builds each config for <tt>maxTasks</tt> tasks. -// * -// * @param maxTasks Max number of tasks. -// * @return Task configs. -// */ -// @Override public List<Map<String, String>> taskConfigs(int maxTasks) { -// List<Map<String, String>> taskConfigs = new ArrayList<>(); -// Map<String, String> taskProps = new HashMap<>(); -// -// taskProps.putAll(configProps); -// -// for (int i = 0; i < maxTasks; i++) -// taskConfigs.add(taskProps); -// -// return taskConfigs; -// } -// -// /** {@inheritDoc} */ -// @Override public void stop() { -// // No-op. -// } -// -// /** {@inheritDoc} */ -// @Override public ConfigDef config() { -// return CONFIG_DEF; -// } +import static geode.kafka.GeodeConnectorConfig.BATCH_SIZE; +import static geode.kafka.GeodeConnectorConfig.CQ_PREFIX; +import static geode.kafka.GeodeConnectorConfig.DEFAULT_BATCH_SIZE; +import static geode.kafka.GeodeConnectorConfig.DEFAULT_CQ_PREFIX; +import static geode.kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_ID; +import static geode.kafka.GeodeConnectorConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT; +import static geode.kafka.GeodeConnectorConfig.DEFAULT_LOCATOR; +import static geode.kafka.GeodeConnectorConfig.DEFAULT_QUEUE_SIZE; +import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_ID_PREFIX; +import static geode.kafka.GeodeConnectorConfig.DURABLE_CLIENT_TIME_OUT; +import static geode.kafka.GeodeConnectorConfig.LOCATORS; +import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE; + +public class GeodeKafkaSink extends SinkConnector { + private static final ConfigDef CONFIG_DEF = new ConfigDef(); + private Map<String, String> sharedProps; + + @Override + public void start(Map<String, String> props) { + sharedProps = computeMissingConfigurations(props); + } + + @Override + public Class<? extends Task> taskClass() { + return GeodeKafkaSinkTask.class; + } + + @Override + public List<Map<String, String>> taskConfigs(int maxTasks) { + List<Map<String, String>> taskConfigs = new ArrayList<>(); + Map<String, String> taskProps = new HashMap<>(); + + taskProps.putAll(sharedProps); + + for (int i = 0; i < maxTasks; i++) { + taskProps.put(GeodeConnectorConfig.TASK_ID, "" + i); + taskConfigs.add(taskProps); + } + + return taskConfigs; + } + + @Override + public void stop() { + + } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + + @Override + public String version() { + //TODO + return "unknown"; + } + + + private Map<String, String> computeMissingConfigurations(Map<String, String> props) { + props.computeIfAbsent(LOCATORS, (key)-> DEFAULT_LOCATOR); + props.computeIfAbsent(DURABLE_CLIENT_TIME_OUT, (key) -> DEFAULT_DURABLE_CLIENT_TIMEOUT); + props.computeIfAbsent(DURABLE_CLIENT_ID_PREFIX, (key) -> DEFAULT_DURABLE_CLIENT_ID); + props.computeIfAbsent(BATCH_SIZE, (key) -> DEFAULT_BATCH_SIZE); + props.computeIfAbsent(QUEUE_SIZE, (key) -> DEFAULT_QUEUE_SIZE); + props.computeIfAbsent(CQ_PREFIX, (key) -> DEFAULT_CQ_PREFIX); + return props; + } } diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSource.java b/src/main/java/geode/kafka/source/GeodeKafkaSource.java index d5da62a..36b9942 100644 --- a/src/main/java/geode/kafka/source/GeodeKafkaSource.java +++ b/src/main/java/geode/kafka/source/GeodeKafkaSource.java @@ -28,6 +28,7 @@ import static geode.kafka.GeodeConnectorConfig.QUEUE_SIZE; public class GeodeKafkaSource extends SourceConnector { private Map<String, String> sharedProps; + //TODO maybe club this into GeodeConnnectorConfig private static final ConfigDef CONFIG_DEF = new ConfigDef(); @@ -40,7 +41,6 @@ public class GeodeKafkaSource extends SourceConnector { public List<Map<String, String>> taskConfigs(int maxTasks) { List<Map<String, String>> taskConfigs = new ArrayList<>(); Map<String, String> taskProps = new HashMap<>(); - taskProps.putAll(sharedProps); for (int i = 0; i < maxTasks; i++) { diff --git a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java index 3f2ac80..829eb29 100644 --- a/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java +++ b/src/main/java/geode/kafka/source/GeodeKafkaSourceTask.java @@ -69,7 +69,7 @@ public class GeodeKafkaSourceTask extends SourceTask { installOnGeode(geodeConnectorConfig, geodeContext, eventBuffer, cqPrefix); } catch (Exception e) { - logger.error("Unable to start task", e); + logger.error("Unable to start source task", e); throw e; } } @@ -81,7 +81,7 @@ public class GeodeKafkaSourceTask extends SourceTask { if (eventBuffer.drainTo(events, batchSize) > 0) { for (GeodeEvent event : events) { for (String topic : topics) { - records.add(new SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, topic, null, event.getEvent())); + records.add(new SourceRecord(sourcePartitions.get(event.getRegionName()), OFFSET_DEFAULT, topic, null, event.getEvent().getNewValue())); } } return records; diff --git a/src/test/java/geode/kafka/GeodeKafkaTestCluster.java b/src/test/java/geode/kafka/GeodeKafkaTestCluster.java index f1a6dff..f0d7f06 100644 --- a/src/test/java/geode/kafka/GeodeKafkaTestCluster.java +++ b/src/test/java/geode/kafka/GeodeKafkaTestCluster.java @@ -13,12 +13,19 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.zookeeper.server.quorum.QuorumPeerConfig; +import org.junit.After; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -26,12 +33,15 @@ import org.junit.rules.TemporaryFolder; import java.io.IOException; import java.time.Duration; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.awaitility.Awaitility.await; +import static org.junit.Assert.assertNotNull; public class GeodeKafkaTestCluster { @@ -54,10 +64,17 @@ public class GeodeKafkaTestCluster { startKafka(); startGeode(); createTopic(); - startWorker(); consumer = createConsumer(); - Thread.sleep(5000); + } + + @Before + public void beforeTests() { + } + + @After + public void afterTests() { + } @AfterClass @@ -86,7 +103,7 @@ public class GeodeKafkaTestCluster { Properties topicProperties = new Properties(); topicProperties.put("flush.messages", "1"); AdminZkClient adminZkClient = new AdminZkClient(zkClient); - adminZkClient.createTopic(TEST_TOPICS,3 + adminZkClient.createTopic(TEST_TOPICS,1 ,1, topicProperties, RackAwareMode.Disabled$.MODULE$); } @@ -161,6 +178,21 @@ public class GeodeKafkaTestCluster { return consumer; } + //consumer props, less important, just for testing? + public static Producer<String,String> createProducer() { + final Properties props = new Properties(); + props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + StringSerializer.class.getName()); + + // Create the producer using props. + final Producer<String, String> producer = + new KafkaProducer<>(props); + return producer; + } + @Test public void endToEndSourceTest() { ClientCache client = createGeodeClient(); @@ -179,4 +211,18 @@ public class GeodeKafkaTestCluster { }); } + @Test + public void endToEndSinkTest() { + ClientCache client = createGeodeClient(); + Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create(TEST_REGIONS); + + Producer<String, String> producer = createProducer(); + for (int i = 0; i < 10; i++) { + producer.send(new ProducerRecord(TEST_TOPICS, "KEY" + i, "VALUE" + i)); + } + + int i = 0; + await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> assertNotNull(region.get("KEY" + i))); + } + } diff --git a/src/test/java/geode/kafka/WorkerAndHerderWrapper.java b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java index cc8e27b..d6fc7a6 100644 --- a/src/test/java/geode/kafka/WorkerAndHerderWrapper.java +++ b/src/test/java/geode/kafka/WorkerAndHerderWrapper.java @@ -1,5 +1,6 @@ package geode.kafka; +import geode.kafka.sink.GeodeKafkaSink; import geode.kafka.source.GeodeKafkaSource; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy; @@ -62,8 +63,20 @@ public class WorkerAndHerderWrapper { herder.putConnectorConfig( sourceProps.get(ConnectorConfig.NAME_CONFIG), sourceProps, true, (error, result)->{ - System.out.println("CALLBACK: " + result + "::: error?" + error); }); + Map<String, String> sinkProps = new HashMap<>(); + sinkProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSink.class.getName()); + sinkProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-sink-connector"); + sinkProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "1"); + sinkProps.put(REGIONS, TEST_REGIONS); + sinkProps.put(TOPICS, TEST_TOPICS); + + herder.putConnectorConfig( + sinkProps.get(ConnectorConfig.NAME_CONFIG), + sinkProps, true, (error, result)->{ + }); + + } }
