RYA-450 Implemented a Kafka backed QueryChangeLogSource.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/ee29450d Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/ee29450d Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/ee29450d Branch: refs/heads/master Commit: ee29450da821e3d61455322a046fa9a271df1d02 Parents: c1ccde8 Author: kchilton2 <[email protected]> Authored: Tue Jan 23 16:17:06 2018 -0500 Committer: Valiyil <[email protected]> Committed: Fri Mar 9 12:59:43 2018 -0500 ---------------------------------------------------------------------- .../apache/rya/streams/kafka/KafkaTopics.java | 28 +- .../rya/streams/kafka/KafkaTopicsTest.java | 53 ++++ extras/rya.streams/query-manager/pom.xml | 23 +- .../kafka/KafkaQueryChangeLogSource.java | 209 ++++++++++++++ .../kafka/KafkaQueryChangeLogSourceIT.java | 277 +++++++++++++++++++ .../rya/test/kafka/EmbeddedKafkaInstance.java | 1 + .../rya/test/kafka/KafkaTestInstanceRule.java | 52 ++++ 7 files changed, 638 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ee29450d/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java index 3e0df50..095465c 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,6 +20,7 @@ package org.apache.rya.streams.kafka; import static java.util.Objects.requireNonNull; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.UUID; @@ -40,6 +41,8 @@ import kafka.utils.ZkUtils; @DefaultAnnotation(NonNull.class) public class KafkaTopics { + private static final String QUERY_CHANGE_LOG_TOPIC_SUFFIX = "-QueryChangeLog"; + /** * Creates the Kafka topic name that is used for a specific instance of Rya's {@link QueryChangeLog}. * @@ -48,7 +51,28 @@ public class KafkaTopics { */ public static String queryChangeLogTopic(final String ryaInstance) { requireNonNull(ryaInstance); - return ryaInstance + "-QueryChangeLog"; + return ryaInstance + QUERY_CHANGE_LOG_TOPIC_SUFFIX; + } + + /** + * Get the Rya instance name from a Kafka topic name that has been used for a {@link QueryChnageLog}. + * <p/> + * This is the inverse function of {@link #queryChangeLogTopic(String)}. + * + * @param changeLogTopic - The topic to evaluate. (not null) + * @return If the topic is well formatted, then the Rya instance name that was part of the topic name. + */ + public static Optional<String> getRyaInstance(final String changeLogTopic) { + requireNonNull(changeLogTopic); + + // Return absent if the provided topic does not represent a query change log topic. + if(!changeLogTopic.endsWith(QUERY_CHANGE_LOG_TOPIC_SUFFIX)) { + return Optional.empty(); + } + + // Everything before the suffix is the Rya instance name. + final int endIndex = changeLogTopic.length() - QUERY_CHANGE_LOG_TOPIC_SUFFIX.length(); + return Optional.of( changeLogTopic.substring(0, endIndex) ); } /** http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ee29450d/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTopicsTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTopicsTest.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTopicsTest.java new file mode 100644 index 0000000..a057de7 --- /dev/null +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTopicsTest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import java.util.Optional; + +import org.junit.Test; + +/** + * Unit tests the methods of {@link KafkaTopics}. + */ +public class KafkaTopicsTest { + + @Test + public void getRyaInstance_wellFormattedTopic() { + // Make a topic name using a Rya instance name. + final String ryaInstance = "test"; + final String topicName = KafkaTopics.queryChangeLogTopic(ryaInstance); + + // Show the rya instance name is able to be extracted from the topic. + final Optional<String> resolvedRyaInstance = KafkaTopics.getRyaInstance(topicName); + assertEquals(ryaInstance, resolvedRyaInstance.get()); + } + + @Test + public void getRyaInstance_invalidTopicName() { + // Make up an invalid topic name. + final String invalidTopic = "thisIsABadTopicName"; + + // Show there is no Rya Instance name in it. + final Optional<String> ryaInstance = KafkaTopics.getRyaInstance(invalidTopic); + assertFalse( ryaInstance.isPresent() ); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ee29450d/extras/rya.streams/query-manager/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/pom.xml b/extras/rya.streams/query-manager/pom.xml index 76e521d..d321ab5 100644 --- a/extras/rya.streams/query-manager/pom.xml +++ b/extras/rya.streams/query-manager/pom.xml @@ -37,15 +37,32 @@ under the License. <!-- Rya dependencies --> <dependency> <groupId>org.apache.rya</groupId> - <artifactId>rya.streams.client</artifactId> + <artifactId>rya.streams.kafka</artifactId> </dependency> - + <!-- Apache Daemon dependencies --> <dependency> <groupId>commons-daemon</groupId> <artifactId>commons-daemon</artifactId> <version>1.1.0</version> </dependency> + + <!-- Test dependencies --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.test.kafka</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> <!-- Add the XSD directory as a resource so that it will be packaged in the jar. @@ -121,4 +138,4 @@ under the License. </plugin> </plugins> </build> -</project> \ No newline at end of file +</project> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ee29450d/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java new file mode 100644 index 0000000..32305f5 --- /dev/null +++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager.kafka; + +import static java.util.Objects.requireNonNull; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog; +import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory; +import org.apache.rya.streams.querymanager.QueryChangeLogSource; + +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.AbstractScheduledService; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Represents a Kafka Server that manages {@link KafkaQueryChangeLog}s. + * <p/> + * Thread safe. + */ +@DefaultAnnotation(NonNull.class) +public class KafkaQueryChangeLogSource extends AbstractScheduledService implements QueryChangeLogSource { + + /** + * Ensures thread safe interactions with this object. + */ + private final ReentrantLock lock = new ReentrantLock(); + + /** + * Used by the service to configure how often it polls the Kafka Server for topics. + */ + private final Scheduler scheduler; + + /** + * Which Kafka Server this source represents. + */ + private final String kafkaBootstrapServer; + + /** + * Listeners that need to be notified when logs are created/deleted. + */ + private final Set<SourceListener> listeners = new HashSet<>(); + + /** + * Maps Rya instance name to a Query Change Log for that instance. This map is used to keep + * track of how the change logs change over time within the Kafka Server. + */ + private final HashMap<String, QueryChangeLog> knownChangeLogs = new HashMap<>(); + + /** + * A consumer that is used to poll the Kafka Server for topics. + */ + private KafkaConsumer<String, String> listTopicsConsumer; + + /** + * Constructs an instance of {@link KafkaQueryChangeLogSource}. + * + * @param kafkaHostname - The hostname of the Kafka Server that is the source. (not null) + * @param kafkaPort - The port of the Kafka Server that is the source. (not null) + * @param scheduler - How frequently this source will poll the Kafka Server for topics. (not null) + */ + public KafkaQueryChangeLogSource( + final String kafkaHostname, + final int kafkaPort, + final Scheduler scheduler) { + kafkaBootstrapServer = requireNonNull(kafkaHostname) + ":" + kafkaPort; + this.scheduler = requireNonNull(scheduler); + } + + @Override + protected void startUp() throws Exception { + // Setup the consumer that is used to list topics for the source. + final Properties consumerProperties = new Properties(); + consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServer); + consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); + consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + listTopicsConsumer = new KafkaConsumer<>(consumerProperties); + } + + @Override + protected void shutDown() throws Exception { + // Shut down the consumer that's used to list topics. + listTopicsConsumer.close(); + + // Shut down all of the change logs that were created within this class. + for(final QueryChangeLog changeLog : knownChangeLogs.values()) { + changeLog.close(); + } + } + + @Override + public void subscribe(final SourceListener listener) { + requireNonNull(listener); + lock.lock(); + try { + // Add the listener to the list of known listeners. + listeners.add(listener); + + // Notify it with everything that already exists. + for(final Entry<String, QueryChangeLog> entry : knownChangeLogs.entrySet()) { + listener.notifyCreate(entry.getKey(), entry.getValue()); + } + } finally { + lock.unlock(); + } + } + + @Override + public void unsubscribe(final SourceListener listener) { + requireNonNull(listener); + lock.lock(); + try { + // Remove the listener from the list of known listeners. + listeners.remove(listener); + } finally { + lock.unlock(); + } + } + + @Override + protected void runOneIteration() throws Exception { + lock.lock(); + try { + // Get the list of topics from the Kafka Server. + final Set<String> changeLogTopics = new HashSet<>( listTopicsConsumer.listTopics().keySet() ); + + // Remove all topics that are not valid Rya Query Change Log topic names. + changeLogTopics.removeIf( topic -> !KafkaTopics.getRyaInstance(topic).isPresent() ); + + // Extract the Rya instance names from the change log topics. + final Set<String> ryaInstances = changeLogTopics.stream() + .map(topic -> KafkaTopics.getRyaInstance(topic).get() ) + .collect(Collectors.toSet()); + + // Any Rya instances that are in the old set of topics, but not the new one, have been deleted. + final Set<String> deletedRyaInstances = new HashSet<>( Sets.difference(knownChangeLogs.keySet(), ryaInstances) ); + + // Any Rya instances that are in the new set of topics, but not the old set, have been created. + final Set<String> createdRyaInstances = new HashSet<>( Sets.difference(ryaInstances, knownChangeLogs.keySet()) ); + + // Handle the deletes. + for(final String deletedRyaInstance : deletedRyaInstances) { + // Remove the change log from the set of known logs. + final QueryChangeLog removed = knownChangeLogs.remove(deletedRyaInstance); + + // Notify the listeners of the update. + for(final SourceListener listener : listeners) { + listener.notifyDelete(deletedRyaInstance); + } + + // Ensure the change log is closed. + removed.close(); + } + + // Handle the adds. + for(final String createdRyaInstance : createdRyaInstances) { + // Create and store the ChangeLog. + final String changeLogTopic = KafkaTopics.queryChangeLogTopic(createdRyaInstance); + final KafkaQueryChangeLog changeLog = KafkaQueryChangeLogFactory.make(kafkaBootstrapServer, changeLogTopic); + knownChangeLogs.put(createdRyaInstance, changeLog); + + // Notify the listeners of the update. + for(final SourceListener listener : listeners) { + listener.notifyCreate(createdRyaInstance, changeLog); + } + } + } finally { + lock.unlock(); + } + } + + @Override + protected Scheduler scheduler() { + return scheduler; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ee29450d/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSourceIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSourceIT.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSourceIT.java new file mode 100644 index 0000000..5914b78 --- /dev/null +++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSourceIT.java @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager.kafka; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.querymanager.QueryChangeLogSource; +import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener; +import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; + +/** + * Integration tests the methods of {@link KafkaQueryChangeLogSource}. + */ +public class KafkaQueryChangeLogSourceIT { + + @Rule + public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(false); + + @Before + public void clearTopics() throws InterruptedException { + kafka.deleteAllTopics(); + } + + @Test + public void discoverExistingLogs() throws Exception { + // Create a valid Query Change Log topic. + final String ryaInstance = UUID.randomUUID().toString(); + final String topic = KafkaTopics.queryChangeLogTopic(ryaInstance); + kafka.createTopic(topic); + + // Create the source. + final QueryChangeLogSource source = new KafkaQueryChangeLogSource( + kafka.getKafkaHostname(), + Integer.parseInt( kafka.getKafkaPort() ), + Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS)); + + // Register a listener that counts down a latch if it sees the new topic. + final CountDownLatch created = new CountDownLatch(1); + source.subscribe(new SourceListener() { + @Override + public void notifyCreate(final String ryaInstanceName, final QueryChangeLog log) { + assertEquals(ryaInstance, ryaInstanceName); + created.countDown(); + } + + @Override + public void notifyDelete(final String ryaInstanceName) { } + }); + + try { + // Start the source. + source.startAndWait(); + + // If the latch isn't counted down, then fail the test. + assertTrue( created.await(5, TimeUnit.SECONDS) ); + + } finally { + source.stopAndWait(); + } + } + + @Test + public void discoverNewLogs() throws Exception { + // Create the source. + final QueryChangeLogSource source = new KafkaQueryChangeLogSource( + kafka.getKafkaHostname(), + Integer.parseInt( kafka.getKafkaPort() ), + Scheduler.newFixedRateSchedule(0, 100, TimeUnit.MILLISECONDS)); + + // Register a listener that counts down a latch if it sees the new topic. + final String ryaInstance = UUID.randomUUID().toString(); + final CountDownLatch created = new CountDownLatch(1); + source.subscribe(new SourceListener() { + @Override + public void notifyCreate(final String ryaInstanceName, final QueryChangeLog log) { + assertEquals(ryaInstance, ryaInstanceName); + created.countDown(); + } + + @Override + public void notifyDelete(final String ryaInstanceName) { } + }); + + try { + // Start the source. + source.startAndWait(); + + // Wait twice the polling duration to ensure it iterates at least once. + Thread.sleep(200); + + // Create a valid Query Change Log topic. + final String topic = KafkaTopics.queryChangeLogTopic(ryaInstance); + kafka.createTopic(topic); + + // If the latch isn't counted down, then fail the test. + assertTrue( created.await(5, TimeUnit.SECONDS) ); + } finally { + source.stopAndWait(); + } + } + + @Test + public void discoverLogDeletions() throws Exception { + // Create a valid Query Change Log topic. + final String ryaInstance = UUID.randomUUID().toString(); + final String topic = KafkaTopics.queryChangeLogTopic(ryaInstance); + kafka.createTopic(topic); + + // Create the source. + final QueryChangeLogSource source = new KafkaQueryChangeLogSource( + kafka.getKafkaHostname(), + Integer.parseInt( kafka.getKafkaPort() ), + Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS)); + + // Register a listener that uses latches to indicate when the topic is created and deleted. + final CountDownLatch created = new CountDownLatch(1); + final CountDownLatch deleted = new CountDownLatch(1); + source.subscribe(new SourceListener() { + @Override + public void notifyCreate(final String ryaInstanceName, final QueryChangeLog log) { + assertEquals(ryaInstance, ryaInstanceName); + created.countDown(); + } + + @Override + public void notifyDelete(final String ryaInstanceName) { + assertEquals(ryaInstance, ryaInstanceName); + deleted.countDown(); + } + }); + + try { + // Start the source + source.startAndWait(); + + // Wait for it to indicate the topic was created. + assertTrue( created.await(5, TimeUnit.SECONDS) ); + + // Delete the topic. + kafka.deleteTopic(topic); + + // If the latch isn't counted down, then fail the test. + assertTrue( deleted.await(5, TimeUnit.SECONDS) ); + + } finally { + source.stopAndWait(); + } + } + + @Test + public void newListenerReceivesAllKnownLogs() throws Exception { + // Create a valid Query Change Log topic. + final String ryaInstance = UUID.randomUUID().toString(); + final String topic = KafkaTopics.queryChangeLogTopic(ryaInstance); + kafka.createTopic(topic); + + // Create the source. + final QueryChangeLogSource source = new KafkaQueryChangeLogSource( + kafka.getKafkaHostname(), + Integer.parseInt( kafka.getKafkaPort() ), + Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS)); + + // Register a listener that counts down a latch if it sees the new topic. + final CountDownLatch created = new CountDownLatch(1); + source.subscribe(new SourceListener() { + @Override + public void notifyCreate(final String ryaInstanceName, final QueryChangeLog log) { + assertEquals(ryaInstance, ryaInstanceName); + created.countDown(); + } + + @Override + public void notifyDelete(final String ryaInstanceName) { } + }); + + try { + // Start the source + source.startAndWait(); + + // Wait for that first listener to indicate the topic was created. This means that one has been cached. + assertTrue( created.await(5, TimeUnit.SECONDS) ); + + // Register a second listener that counts down when that same topic is encountered. This means the + // newly subscribed listener was notified with the already known change log. + final CountDownLatch newListenerCreated = new CountDownLatch(1); + source.subscribe(new SourceListener() { + @Override + public void notifyCreate(final String ryaInstanceName, final QueryChangeLog log) { + assertEquals(ryaInstance, ryaInstanceName); + newListenerCreated.countDown(); + } + + @Override + public void notifyDelete(final String ryaInstanceName) { } + }); + assertTrue( newListenerCreated.await(5, TimeUnit.SECONDS) ); + + } finally { + source.stopAndWait(); + } + } + + @Test + public void unsubscribedDoesNotReceiveNotifications() throws Exception { + // Create the source. + final QueryChangeLogSource source = new KafkaQueryChangeLogSource( + kafka.getKafkaHostname(), + Integer.parseInt( kafka.getKafkaPort() ), + Scheduler.newFixedRateSchedule(0, 100, TimeUnit.MILLISECONDS)); + + try { + // Start the source. + source.startAndWait(); + + // Create a listener that flips a boolean to true when it is notified. + final AtomicBoolean notified = new AtomicBoolean(false); + final SourceListener listener = new SourceListener() { + @Override + public void notifyCreate(final String ryaInstanceName, final QueryChangeLog log) { + notified.set(true); + } + + @Override + public void notifyDelete(final String ryaInstanceName) { + notified.set(true); + } + }; + + // Register and then unregister it. + source.subscribe(listener); + source.unsubscribe(listener); + + // Create a topic. + final String ryaInstance = UUID.randomUUID().toString(); + final String topic = KafkaTopics.queryChangeLogTopic(ryaInstance); + kafka.createTopic(topic); + + //Wait longer than the polling time for the listener to be notified. + Thread.sleep(300); + + // Show the boolean was never flipped to true. + assertFalse(notified.get()); + } finally { + source.stopAndWait(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ee29450d/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaInstance.java ---------------------------------------------------------------------- diff --git a/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaInstance.java b/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaInstance.java index c7c5929..3810d4f 100644 --- a/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaInstance.java +++ b/test/kafka/src/main/java/org/apache/rya/test/kafka/EmbeddedKafkaInstance.java @@ -75,6 +75,7 @@ public class EmbeddedKafkaInstance { brokerProps.setProperty(KafkaConfig$.MODULE$.PortProp(), brokerPort); brokerProps.setProperty(KafkaConfig$.MODULE$.ZkConnectProp(), zookeperConnect); brokerProps.setProperty(KafkaConfig$.MODULE$.LogDirsProp(), Files.createTempDirectory(getClass().getSimpleName() + "-").toAbsolutePath().toString()); + brokerProps.setProperty(KafkaConfig$.MODULE$.DeleteTopicEnableProp(), "true"); final KafkaConfig config = new KafkaConfig(brokerProps); final Time mock = new MockTime(); kafkaServer = TestUtils.createServer(config, mock); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/ee29450d/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java ---------------------------------------------------------------------- diff --git a/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java index 252c288..50ba4ea 100644 --- a/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java +++ b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java @@ -19,9 +19,15 @@ package org.apache.rya.test.kafka; import java.util.Properties; +import java.util.Set; +import java.util.UUID; import org.I0Itec.zkclient.ZkClient; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; import org.junit.rules.ExternalResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -97,6 +103,52 @@ public class KafkaTestInstanceRule extends ExternalResource { } /** + * Marks a topic for deletion. You may have to wait some time for the delete to actually complete. + * + * @param topicName - The topic that will be deleted. (not null) + */ + public void deleteTopic(final String topicName) { + ZkUtils zkUtils = null; + try { + logger.info("Deleting Kafka Topic: '{}'", topicName); + zkUtils = ZkUtils.apply(new ZkClient(kafkaInstance.getZookeeperConnect(), 30000, 30000, ZKStringSerializer$.MODULE$), false); + AdminUtils.deleteTopic(zkUtils, topicName); + } + finally { + if(zkUtils != null) { + zkUtils.close(); + } + } + } + + /** + * Delete all of the topics that are in the embedded Kafka instance. + * + * @throws InterruptedException Interrupted while waiting for the topics to be deleted. + */ + public void deleteAllTopics() throws InterruptedException { + // Setup the consumer that is used to list topics for the source. + final Properties consumerProperties = createBootstrapServerConfig(); + consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); + consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + + try(final Consumer<String, String> listTopicsConsumer = new KafkaConsumer<>(consumerProperties)) { + // Mark all existing topics for deletion. + Set<String> topics = listTopicsConsumer.listTopics().keySet(); + for(final String topic : topics) { + deleteTopic(topic); + } + + // Loop and wait until they are all gone. + while(!topics.isEmpty()) { + Thread.sleep(100); + topics = listTopicsConsumer.listTopics().keySet(); + } + } + } + + /** * @return A new Property object containing the correct value for Kafka's * {@link CommonClientConfigs#BOOTSTRAP_SERVERS_CONFIG}. */
