This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git
commit 94f18cb9e753a21efe4efc8ee7964267e4476c17 Author: Jason Huynh <[email protected]> AuthorDate: Fri Jan 17 15:34:55 2020 -0800 geode as source to kafka hack test successful --- build.gradle | 1 + src/main/java/kafka/GeodeKafkaSourceTask.java | 23 ++++++- src/test/java/kafka/GeodeKafkaTestCluster.java | 91 ++++++++++++++------------ src/test/java/kafka/GeodeLocalCluster.java | 4 +- 4 files changed, 71 insertions(+), 48 deletions(-) diff --git a/build.gradle b/build.gradle index 9e23d87..c9500b0 100644 --- a/build.gradle +++ b/build.gradle @@ -14,6 +14,7 @@ repositories { dependencies { compile 'org.apache.geode:geode-core:1.11.0' + compile 'org.apache.geode:geode-cq:1.11.0' compile(group: 'org.apache.kafka', name: 'connect-api', version: '2.3.1') compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.13.0' compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.13.0' diff --git a/src/main/java/kafka/GeodeKafkaSourceTask.java b/src/main/java/kafka/GeodeKafkaSourceTask.java index 1f74700..9608f07 100644 --- a/src/main/java/kafka/GeodeKafkaSourceTask.java +++ b/src/main/java/kafka/GeodeKafkaSourceTask.java @@ -8,6 +8,7 @@ import org.apache.geode.cache.query.CqEvent; import org.apache.geode.cache.query.CqException; import org.apache.geode.cache.query.CqExistsException; import org.apache.geode.cache.query.CqListener; +import org.apache.geode.cache.query.RegionNotFoundException; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; @@ -38,11 +39,12 @@ public class GeodeKafkaSourceTask extends SourceTask { @Override public void start(Map<String, String> props) { + System.out.println("JASON task start"); batchSize = 100; queueSize = 100000; String regionName = "someRegion"; eventBuffer = new LinkedBlockingQueue<>(queueSize); - topics = new String[] {"myTopic"}; + topics = new String[] {"someTopic"}; sourcePartition = new HashMap<>(); sourcePartition.put(REGION_NAME, regionName); @@ -50,10 +52,12 @@ public class GeodeKafkaSourceTask extends SourceTask { offset.put("OFFSET", 0L); installOnGeode("localHost", 10334, "someRegion"); + System.out.println("JASON task start end"); } @Override public List<SourceRecord> poll() throws InterruptedException { +// System.out.println("JASON polling"); ArrayList<SourceRecord> records = new ArrayList<>(batchSize); ArrayList<CqEvent> events = new ArrayList<>(batchSize); if (eventBuffer.drainTo(events, batchSize) > 0) { @@ -63,9 +67,11 @@ public class GeodeKafkaSourceTask extends SourceTask { records.add(new SourceRecord(sourcePartition, offset, topic, null, event)); } + System.out.println("JASON we polled and returning records" + records.size()); return records; } +// System.out.println("JASON we didn't poll any records"); return null; } @@ -80,16 +86,26 @@ public class GeodeKafkaSourceTask extends SourceTask { .setPoolSubscriptionEnabled(true).addPoolLocator(locatorHost, locatorPort).create(); CqAttributesFactory cqAttributesFactory = new CqAttributesFactory(); cqAttributesFactory.addCqListener(new GeodeKafkaSourceListener()); + System.out.println("JASON installing on Geode"); CqAttributes cqAttributes = cqAttributesFactory.create(); try { + System.out.println("JASON installing new cq"); clientCache.getQueryService().newCq("kafkaCQFor" + regionName, "select * from /" + regionName, cqAttributes, - true); + true).execute(); + System.out.println("JASON finished installing cq"); } catch (CqExistsException e) { + System.out.println("UHH"); e.printStackTrace(); - } catch (CqException e) { + } catch (CqException | RegionNotFoundException e) { + System.out.println("UHH e"); e.printStackTrace(); } + catch (Exception e) { + System.out.println("UHHHHHH " + e); + } + System.out.println("JASON task calling ready for events"); clientCache.readyForEvents(); + System.out.println("JASON task ready for events"); } private static class GeodeKafkaSourceListener implements CqListener { @@ -97,6 +113,7 @@ public class GeodeKafkaSourceTask extends SourceTask { @Override public void onEvent(CqEvent aCqEvent) { try { + System.out.println("JASON cqEvent and putting into eventBuffer"); eventBuffer.offer(aCqEvent, 2, TimeUnit.SECONDS); } catch (InterruptedException e) { diff --git a/src/test/java/kafka/GeodeKafkaTestCluster.java b/src/test/java/kafka/GeodeKafkaTestCluster.java index cb5f9ad..2b0293f 100644 --- a/src/test/java/kafka/GeodeKafkaTestCluster.java +++ b/src/test/java/kafka/GeodeKafkaTestCluster.java @@ -10,6 +10,12 @@ import org.apache.geode.cache.Region; import org.apache.geode.cache.client.ClientCache; import org.apache.geode.cache.client.ClientCacheFactory; import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy; @@ -32,7 +38,10 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import java.io.IOException; +import java.time.Duration; +import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.Properties; @@ -45,6 +54,8 @@ public class GeodeKafkaTestCluster { private static ZooKeeperLocalCluster zooKeeperLocalCluster; private static KafkaLocalCluster kafkaLocalCluster; private static GeodeLocalCluster geodeLocalCluster; + private static WorkerAndHerderCluster workerAndHerderCluster; + private static Consumer<String, String> consumer; @BeforeClass public static void setup() throws IOException, QuorumPeerConfig.ConfigException, InterruptedException { @@ -52,63 +63,30 @@ public class GeodeKafkaTestCluster { startKafka(); startGeode(); createTopic(); + Thread.sleep(5000); startWorker(); + consumer = createConsumer(); + Thread.sleep(5000); } @AfterClass public static void shutdown() { + workerAndHerderCluster.stop(); KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",false,200000, 15000,10, Time.SYSTEM, "myGroup","myMetricType", null); + AdminZkClient adminZkClient = new AdminZkClient(zkClient); adminZkClient.deleteTopic("someTopic"); + kafkaLocalCluster.stop(); geodeLocalCluster.stop(); } - private static void startWorker() { - Map props = new HashMap(); - props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8888"); - props.put("offset.storage.file.filename", "/tmp/connect.offsets"); - // fast flushing for testing. - props.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10"); - - - props.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); - props.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); - props.put("internal.key.converter.schemas.enable", "false"); - props.put("internal.value.converter.schemas.enable", "false"); - props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); - props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); - props.put("key.converter.schemas.enable", "false"); - props.put("value.converter.schemas.enable", "false"); - - - WorkerConfig workerCfg = new StandaloneConfig(props); - - MemoryOffsetBackingStore offBackingStore = new MemoryOffsetBackingStore(); - offBackingStore.configure(workerCfg); - - Worker worker = new Worker("WORKER_ID", new SystemTime(), new Plugins(props), workerCfg, offBackingStore, new AllConnectorClientConfigOverridePolicy()); - worker.start(); - - Herder herder = new StandaloneHerder(worker, ConnectUtils.lookupKafkaClusterId(workerCfg), new AllConnectorClientConfigOverridePolicy()); - herder.start(); - - - - - Map<String, String> sourceProps = new HashMap<>(); - sourceProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSource.class.getName()); - sourceProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-source-connector"); - sourceProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "1"); -// - herder.putConnectorConfig( - sourceProps.get(ConnectorConfig.NAME_CONFIG), - sourceProps, true, (error, result)->{ - System.out.println("CALLBACK: " + result); - }); - System.out.println("Worker and herder started"); + private static void startWorker() throws IOException, InterruptedException { + workerAndHerderCluster = new WorkerAndHerderCluster(); + workerAndHerderCluster.start(); + Thread.sleep(20000); } private static void createTopic() { @@ -186,13 +164,40 @@ topic=connect-test } + //consumer props, less important, just for testing? + public static Consumer<String,String> createConsumer() { + final Properties props = new Properties(); + props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8888"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, + "myGroup"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class.getName()); + // Create the consumer using props. + final Consumer<String, String> consumer = + new KafkaConsumer<>(props); + // Subscribe to the topic. + consumer.subscribe(Collections.singletonList("someTopic")); + return consumer; + } @Test public void testX() throws InterruptedException { ClientCache client = createGeodeClient(); Region region = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create("someRegion"); region.put("JASON KEY", "JASON VALUE"); + System.out.println("PUT COMPLETE!"); + region.get("JASON KEY"); +// client.close(); + + + ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2)); + for (ConsumerRecord<String, String> record: records) { + System.out.println("JASON we consumed a record:" + record); + } System.out.println("TEST COMPLETE!"); + } } diff --git a/src/test/java/kafka/GeodeLocalCluster.java b/src/test/java/kafka/GeodeLocalCluster.java index fd72dec..43ac8f5 100644 --- a/src/test/java/kafka/GeodeLocalCluster.java +++ b/src/test/java/kafka/GeodeLocalCluster.java @@ -13,12 +13,12 @@ public class GeodeLocalCluster { } public void start() throws IOException, InterruptedException { + System.out.println("starting locator"); locatorProcess.exec("10334"); - Thread.sleep(30000); + Thread.sleep(15000); System.out.println("is alive?" + locatorProcess.process.isAlive()); serverProcess.exec("40404"); Thread.sleep(30000); - } public void stop() {
