This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git
commit 409e6caa691ee324f0a6d66ceaab5687bc3adebb Author: Jason Huynh <[email protected]> AuthorDate: Thu Jan 16 09:30:10 2020 -0800 Worker and herder added Source taskes being instantiated --- src/main/java/kafka/GeodeKafkaSourceTask.java | 2 +- src/test/java/kafka/GeodeKafkaTestCluster.java | 92 +++++++++++++++++++++++--- 2 files changed, 82 insertions(+), 12 deletions(-) diff --git a/src/main/java/kafka/GeodeKafkaSourceTask.java b/src/main/java/kafka/GeodeKafkaSourceTask.java index e6aa578..1f74700 100644 --- a/src/main/java/kafka/GeodeKafkaSourceTask.java +++ b/src/main/java/kafka/GeodeKafkaSourceTask.java @@ -42,7 +42,7 @@ public class GeodeKafkaSourceTask extends SourceTask { queueSize = 100000; String regionName = "someRegion"; eventBuffer = new LinkedBlockingQueue<>(queueSize); - topics = new String[] {"default"}; + topics = new String[] {"myTopic"}; sourcePartition = new HashMap<>(); sourcePartition.put(REGION_NAME, regionName); diff --git a/src/test/java/kafka/GeodeKafkaTestCluster.java b/src/test/java/kafka/GeodeKafkaTestCluster.java index 824b148..cb5f9ad 100644 --- a/src/test/java/kafka/GeodeKafkaTestCluster.java +++ b/src/test/java/kafka/GeodeKafkaTestCluster.java @@ -1,10 +1,29 @@ package kafka; +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.zk.AdminZkClient; +import kafka.zk.KafkaZkClient; +import kafka.zookeeper.ZooKeeperClient; +import org.I0Itec.zkclient.ZkClient; import org.apache.geode.cache.Region; import org.apache.geode.cache.client.ClientCache; import org.apache.geode.cache.client.ClientCacheFactory; import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy; +import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.Worker; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; +import org.apache.kafka.connect.runtime.standalone.StandaloneHerder; +import org.apache.kafka.connect.storage.MemoryOffsetBackingStore; +import org.apache.kafka.connect.storage.StringConverter; +import org.apache.kafka.connect.util.ConnectUtils; import org.apache.zookeeper.server.quorum.QuorumPeerConfig; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -13,6 +32,8 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; public class GeodeKafkaTestCluster { @@ -30,14 +51,73 @@ public class GeodeKafkaTestCluster { startZooKeeper(); startKafka(); startGeode(); + createTopic(); + startWorker(); } @AfterClass public static void shutdown() { + KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",false,200000, + 15000,10, Time.SYSTEM, "myGroup","myMetricType", null); + AdminZkClient adminZkClient = new AdminZkClient(zkClient); + adminZkClient.deleteTopic("someTopic"); kafkaLocalCluster.stop(); geodeLocalCluster.stop(); } + + private static void startWorker() { + Map props = new HashMap(); + props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8888"); + props.put("offset.storage.file.filename", "/tmp/connect.offsets"); + // fast flushing for testing. + props.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10"); + + + props.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); + props.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); + props.put("internal.key.converter.schemas.enable", "false"); + props.put("internal.value.converter.schemas.enable", "false"); + props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); + props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); + props.put("key.converter.schemas.enable", "false"); + props.put("value.converter.schemas.enable", "false"); + + + WorkerConfig workerCfg = new StandaloneConfig(props); + + MemoryOffsetBackingStore offBackingStore = new MemoryOffsetBackingStore(); + offBackingStore.configure(workerCfg); + + Worker worker = new Worker("WORKER_ID", new SystemTime(), new Plugins(props), workerCfg, offBackingStore, new AllConnectorClientConfigOverridePolicy()); + worker.start(); + + Herder herder = new StandaloneHerder(worker, ConnectUtils.lookupKafkaClusterId(workerCfg), new AllConnectorClientConfigOverridePolicy()); + herder.start(); + + + + + Map<String, String> sourceProps = new HashMap<>(); + sourceProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSource.class.getName()); + sourceProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-source-connector"); + sourceProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "1"); +// + herder.putConnectorConfig( + sourceProps.get(ConnectorConfig.NAME_CONFIG), + sourceProps, true, (error, result)->{ + System.out.println("CALLBACK: " + result); + }); + System.out.println("Worker and herder started"); + } + + private static void createTopic() { + KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",false,200000, + 15000,10, Time.SYSTEM, "myGroup","myMetricType", null); + AdminZkClient adminZkClient = new AdminZkClient(zkClient); + adminZkClient.createTopic("someTopic",3,1, new Properties(), RackAwareMode.Disabled$.MODULE$); + } + private ClientCache createGeodeClient() { return new ClientCacheFactory().addPoolLocator("localhost", 10334).create(); } @@ -87,17 +167,7 @@ public class GeodeKafkaTestCluster { //Specifically GeodeKafka connector configs - /* - props.put(ConnectorConfig.TASKS_MAX_CONFIG, "2"); - props.put(ConnectorConfig.NAME_CONFIG, "test-src-connector"); - props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, IgniteSourceConnectorMock.class.getName()); - props.put(IgniteSourceConstants.CACHE_NAME, "testCache"); - props.put(IgniteSourceConstants.CACHE_CFG_PATH, "example-ignite.xml"); - props.put(IgniteSourceConstants.TOPIC_NAMES, topics); - props.put(IgniteSourceConstants.CACHE_EVENTS, "put"); - props.put(IgniteSourceConstants.CACHE_FILTER_CLASS, TestCacheEventFilter.class.getName()); - props.put(IgniteSourceConstants.INTL_BUF_SIZE, "1000000"); - */ + /* name=file-source
