http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java index 947a215..3a59636 100644 --- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java +++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java @@ -32,10 +32,15 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.rya.streams.api.entity.StreamsQuery; import org.apache.rya.streams.kafka.KafkaStreamsFactory; import org.apache.rya.streams.kafka.KafkaStreamsFactory.KafkaStreamsFactoryException; +import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.interactor.CreateKafkaTopic; import org.apache.rya.streams.querymanager.QueryExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.AbstractIdleService; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; @@ -50,6 +55,7 @@ import kafka.consumer.KafkaStream; */ @DefaultAnnotation(NonNull.class) public class LocalQueryExecutor extends AbstractIdleService implements QueryExecutor { + private static final Logger log = LoggerFactory.getLogger(LocalQueryExecutor.class); /** * Provides thread safety when interacting with this class. @@ -72,6 +78,11 @@ public class LocalQueryExecutor extends AbstractIdleService implements QueryExec private final Map<UUID, KafkaStreams> byQueryId = new HashMap<>(); /** + * Used to create the input and output topics for a Kafka Streams job. + */ + private final CreateKafkaTopic createKafkaTopic; + + /** * Builds the {@link KafkaStreams} objects that execute {@link KafkaStream}s. */ private final KafkaStreamsFactory streamsFactory; @@ -79,23 +90,31 @@ public class LocalQueryExecutor extends AbstractIdleService implements QueryExec /** * Constructs an instance of {@link LocalQueryExecutor}. * + * @param createKafkaTopic - Used to create the input and output topics for a Kafka Streams job. (not null) * @param streamsFactory - Builds the {@link KafkaStreams} objects that execute {@link KafkaStream}s. (not null) */ - public LocalQueryExecutor(final KafkaStreamsFactory streamsFactory) { + public LocalQueryExecutor( + final CreateKafkaTopic createKafkaTopic, + final KafkaStreamsFactory streamsFactory) { + this.createKafkaTopic = requireNonNull(createKafkaTopic); this.streamsFactory = requireNonNull(streamsFactory); } @Override protected void startUp() throws Exception { - // Nothing to do. + log.info("Local Query Executor starting up."); } @Override protected void shutDown() throws Exception { + log.info("Local Query Executor shutting down. Stopping all jobs..."); + // Stop all of the running queries. for(final KafkaStreams job : byQueryId.values()) { job.close(); } + + log.info("Local Query Executor shut down."); } @Override @@ -106,6 +125,14 @@ public class LocalQueryExecutor extends AbstractIdleService implements QueryExec lock.lock(); try { + // Make sure the Statements topic exists for the query. + final Set<String> topics = Sets.newHashSet( + KafkaTopics.statementsTopic(ryaInstance), + KafkaTopics.queryResultsTopic(query.getQueryId())); + + // Make sure the Query Results topic exists for the query. + createKafkaTopic.createTopics(topics, 1, 1); + // Setup the Kafka Streams job that will execute. final KafkaStreams streams = streamsFactory.make(ryaInstance, query); streams.start();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/main/xsd/QueryManagerConfig.xsd ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/src/main/xsd/QueryManagerConfig.xsd b/extras/rya.streams/query-manager/src/main/xsd/QueryManagerConfig.xsd index c1285d4..21170bb 100644 --- a/extras/rya.streams/query-manager/src/main/xsd/QueryManagerConfig.xsd +++ b/extras/rya.streams/query-manager/src/main/xsd/QueryManagerConfig.xsd @@ -30,6 +30,13 @@ under the License. </xs:choice> </xs:complexType> </xs:element> + <xs:element name="queryExecutor"> + <xs:complexType> + <xs:choice> + <xs:element name="localKafkaStreams" type="localKafkaStreams"/> + </xs:choice> + </xs:complexType> + </xs:element> <xs:element name="performanceTunning"> <xs:complexType> <xs:sequence> @@ -65,6 +72,13 @@ under the License. </xs:sequence> </xs:complexType> + <!-- Define what a local Kafka Streams query executor looks like. --> + <xs:complexType name="localKafkaStreams"> + <xs:sequence> + <xs:element name="zookeepers" type="xs:string"/> + </xs:sequence> + </xs:complexType> + <!-- Define the legal range for a TCP port. --> <xs:simpleType name="tcpPort"> <xs:restriction base="xs:int"> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkGeneratorTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkGeneratorTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkGeneratorTest.java new file mode 100644 index 0000000..da9be78 --- /dev/null +++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkGeneratorTest.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.querymanager.QueryManager.LogEvent; +import org.apache.rya.streams.querymanager.QueryManager.LogEvent.LogEventType; +import org.apache.rya.streams.querymanager.QueryManager.LogEventWorkGenerator; +import org.junit.Test; + +/** + * Unit tests the methods of {@link LogEventWorkGenerator}. + */ +public class LogEventWorkGeneratorTest { + + @Test + public void shutdownSignalKillsThread() { + // The signal that will kill the notifying thread. + final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + + // The queue generated work is offered to. + final BlockingQueue<LogEvent> queue = new ArrayBlockingQueue<>(1); + + // The listener that will perform the LogEventWorkGenerator work. + final LogEventWorkGenerator generator = new LogEventWorkGenerator(queue, 50, TimeUnit.MILLISECONDS, shutdownSignal); + + // A thread that will attempt to notify the generator with a created change log. + final Thread notifyThread = new Thread(() -> { + generator.notifyCreate("rya", mock(QueryChangeLog.class)); + }); + + // Fill the queue so that nothing may be offered to it. + queue.offer(LogEvent.delete("rya")); + + // Start the thread and show that it is still alive after the offer period. + notifyThread.start(); + assertTrue( ThreadUtil.stillAlive(notifyThread, 200) ); + + // Set the shutdown signal to true and join the thread. If we were able to join, then it shut down. + shutdownSignal.set(true); + assertFalse( ThreadUtil.stillAlive(notifyThread, 1000) ); + } + + @Test + public void notifyCreate() throws Exception { + // The signal that will kill the notifying thread. + final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + + // The queue generated work is offered to. + final BlockingQueue<LogEvent> queue = new ArrayBlockingQueue<>(1); + + // The listener that will perform the LogEventWorkGenerator work. + final LogEventWorkGenerator generator = new LogEventWorkGenerator(queue, 50, TimeUnit.MILLISECONDS, shutdownSignal); + + // A thread that will attempt to notify the generator with a created change log. + final CountDownLatch notified = new CountDownLatch(1); + final Thread notifyThread = new Thread(() -> { + generator.notifyCreate("rya", mock(QueryChangeLog.class)); + notified.countDown(); + }); + + try { + // Start the thread that performs the notification. + notifyThread.start(); + + // Wait for the thread to indicate it has notified and check the queue for the value. + notified.await(200, TimeUnit.MILLISECONDS); + final LogEvent event = queue.poll(200, TimeUnit.MILLISECONDS); + assertEquals(LogEventType.CREATE, event.getEventType()); + assertEquals("rya", event.getRyaInstanceName()); + } finally { + shutdownSignal.set(true); + notifyThread.join(); + } + } + + @Test + public void notifyDelete() throws Exception { + // The signal that will kill the notifying thread. + final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + + // The queue generated work is offered to. + final BlockingQueue<LogEvent> queue = new ArrayBlockingQueue<>(1); + + // The listener that will perform the LogEventWorkGenerator work. + final LogEventWorkGenerator generator = new LogEventWorkGenerator(queue, 50, TimeUnit.MILLISECONDS, shutdownSignal); + + // A thread that will attempt to notify the generator with a deleted change log. + final CountDownLatch notified = new CountDownLatch(1); + final Thread notifyThread = new Thread(() -> { + generator.notifyDelete("rya"); + notified.countDown(); + }); + + try { + // Start the thread that performs the notification. + notifyThread.start(); + + // Wait for the thread to indicate it has notified and check the queue for the value. + notified.await(200, TimeUnit.MILLISECONDS); + final LogEvent event = queue.poll(200, TimeUnit.MILLISECONDS); + assertEquals(LogEventType.DELETE, event.getEventType()); + assertEquals("rya", event.getRyaInstanceName()); + } finally { + shutdownSignal.set(true); + notifyThread.join(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkerTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkerTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkerTest.java new file mode 100644 index 0000000..cb708ed --- /dev/null +++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkerTest.java @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.queries.InMemoryQueryChangeLog; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.querymanager.QueryManager.LogEvent; +import org.apache.rya.streams.querymanager.QueryManager.LogEventWorker; +import org.apache.rya.streams.querymanager.QueryManager.QueryEvent; +import org.junit.Test; + +/** + * Unit tests the methods of {@link LogEventWorker}. + */ +public class LogEventWorkerTest { + + @Test + public void shutdownSignalKillsThread() { + // The signal that will kill the working thread. + final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + + // The thread that will perform the LogEventWorker task. + final Thread logEventWorker = new Thread(new LogEventWorker(new ArrayBlockingQueue<>(1), + new ArrayBlockingQueue<>(1), 50, TimeUnit.MILLISECONDS, shutdownSignal)); + logEventWorker.start(); + + // Wait longer than the poll time to see if the thread died. Show that it is still running. + assertTrue(ThreadUtil.stillAlive(logEventWorker, 200)); + + // Set the shutdown signal to true and join the thread. If we were able to join, then it shut down. + shutdownSignal.set(true); + assertFalse(ThreadUtil.stillAlive(logEventWorker, 500)); + } + + @Test + public void nofity_logCreated_doesNotExist() throws Exception { + // The signal that will kill the working thread. + final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + + // The queue used to feed work. + final BlockingQueue<LogEvent> logEventQueue = new ArrayBlockingQueue<>(10); + + // The queue work is written to. + final BlockingQueue<QueryEvent> queryEventQueue = new ArrayBlockingQueue<>(10); + + // The Query Change Log that will be watched. + final QueryChangeLog changeLog = new InMemoryQueryChangeLog(); + + // Write a message that indicates a new query should be active. + final UUID firstQueryId = UUID.randomUUID(); + changeLog.write(QueryChange.create(firstQueryId, "select * where { ?a ?b ?c . }", true)); + + // Write a message that adds an active query, but then makes it inactive. Because both of these + // events are written to the log before the worker subscribes to the repository for updates, they + // must result in a single query stopped event. + final UUID secondQueryId = UUID.randomUUID(); + changeLog.write(QueryChange.create(secondQueryId, "select * where { ?d ?e ?f . }", true)); + changeLog.write(QueryChange.update(secondQueryId, false)); + + // Start the worker that will be tested. + final Thread logEventWorker = new Thread(new LogEventWorker(logEventQueue, + queryEventQueue, 50, TimeUnit.MILLISECONDS, shutdownSignal)); + logEventWorker.start(); + + try { + // Write a unit of work that indicates a log was created. + final LogEvent createLogEvent = LogEvent.create("rya", changeLog); + logEventQueue.offer(createLogEvent); + + // We must see the following Query Events added to the work queue. + // Query 1, executing. + // Query 2, stopped. + Set<QueryEvent> expectedEvents = new HashSet<>(); + expectedEvents.add(QueryEvent.executing("rya", + new StreamsQuery(firstQueryId, "select * where { ?a ?b ?c . }", true))); + expectedEvents.add(QueryEvent.stopped("rya", secondQueryId)); + + Set<QueryEvent> queryEvents = new HashSet<>(); + queryEvents.add( queryEventQueue.poll(500, TimeUnit.MILLISECONDS) ); + queryEvents.add( queryEventQueue.poll(500, TimeUnit.MILLISECONDS) ); + + assertEquals(expectedEvents, queryEvents); + + // Write an event to the change log that stops the first query. + changeLog.write(QueryChange.update(firstQueryId, false)); + + // Show it was also reflected in the changes. + // Query 1, stopped. + expectedEvents = new HashSet<>(); + expectedEvents.add(QueryEvent.stopped("rya", firstQueryId)); + + queryEvents = new HashSet<>(); + queryEvents.add( queryEventQueue.poll(500, TimeUnit.MILLISECONDS) ); + + assertEquals(expectedEvents, queryEvents); + } finally { + shutdownSignal.set(true); + logEventWorker.join(); + } + } + + @Test + public void nofity_logCreated_exists() throws Exception { + // The signal that will kill the working thread. + final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + + // The queue used to feed work. + final BlockingQueue<LogEvent> logEventQueue = new ArrayBlockingQueue<>(10); + + // The queue work is written to. + final BlockingQueue<QueryEvent> queryEventQueue = new ArrayBlockingQueue<>(10); + + // The Query Change Log that will be watched. + final QueryChangeLog changeLog = new InMemoryQueryChangeLog(); + + // Write a message that indicates a new query should be active. + final UUID firstQueryId = UUID.randomUUID(); + changeLog.write(QueryChange.create(firstQueryId, "select * where { ?a ?b ?c . }", true)); + + // Start the worker that will be tested. + final Thread logEventWorker = new Thread(new LogEventWorker(logEventQueue, + queryEventQueue, 50, TimeUnit.MILLISECONDS, shutdownSignal)); + logEventWorker.start(); + + try { + // Write a unit of work that indicates a log was created. + final LogEvent createLogEvent = LogEvent.create("rya", changeLog); + logEventQueue.offer(createLogEvent); + + // Say the same log was created a second time. + logEventQueue.offer(createLogEvent); + + // Show that only a single unit of work was added for the log. This indicates the + // second message was effectively skipped as it would have add its work added twice otherwise. + final Set<QueryEvent> expectedEvents = new HashSet<>(); + expectedEvents.add(QueryEvent.executing("rya", + new StreamsQuery(firstQueryId, "select * where { ?a ?b ?c . }", true))); + + final Set<QueryEvent> queryEvents = new HashSet<>(); + queryEvents.add( queryEventQueue.poll(500, TimeUnit.MILLISECONDS) ); + + assertNull(queryEventQueue.poll(500, TimeUnit.MILLISECONDS)); + assertEquals(expectedEvents, queryEvents); + } finally { + shutdownSignal.set(true); + logEventWorker.join(); + } + } + + @Test + public void notify_logDeleted_exists() throws Exception { + // The signal that will kill the working thread. + final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + + // The queue used to feed work. + final BlockingQueue<LogEvent> logEventQueue = new ArrayBlockingQueue<>(10); + + // The queue work is written to. + final BlockingQueue<QueryEvent> queryEventQueue = new ArrayBlockingQueue<>(10); + + // The Query Change Log that will be watched. + final QueryChangeLog changeLog = new InMemoryQueryChangeLog(); + + // Start the worker that will be tested. + final Thread logEventWorker = new Thread(new LogEventWorker(logEventQueue, + queryEventQueue, 50, TimeUnit.MILLISECONDS, shutdownSignal)); + logEventWorker.start(); + + try { + // Write a unit of work that indicates a log was created. + final LogEvent createLogEvent = LogEvent.create("rya", changeLog); + logEventQueue.offer(createLogEvent); + + // Write a unit of work that indicates a log was deleted. + logEventQueue.offer(LogEvent.delete("rya")); + + // Show that a single unit of work was created for deleting everything for "rya". + assertEquals(QueryEvent.stopALL("rya"), queryEventQueue.poll(500, TimeUnit.MILLISECONDS)); + assertNull(queryEventQueue.poll(500, TimeUnit.MILLISECONDS)); + } finally { + shutdownSignal.set(true); + logEventWorker.join(); + } + } + + @Test + public void notify_logDeleted_doesNotExist() throws Exception { + // The signal that will kill the working thread. + final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + + // The queue used to feed work. + final BlockingQueue<LogEvent> logEventQueue = new ArrayBlockingQueue<>(10); + + // The queue work is written to. + final BlockingQueue<QueryEvent> queryEventQueue = new ArrayBlockingQueue<>(10); + + // Start the worker that will be tested. + final Thread logEventWorker = new Thread(new LogEventWorker(logEventQueue, + queryEventQueue, 50, TimeUnit.MILLISECONDS, shutdownSignal)); + logEventWorker.start(); + + try { + // Write a unit of work that indicates a log was deleted. Since it was never created, + // this will not cause anything to be written to the QueryEvent queue. + logEventQueue.offer(LogEvent.delete("rya")); + + // Show that a single unit of work was created for deleting everything for "rya". + assertNull(queryEventQueue.poll(500, TimeUnit.MILLISECONDS)); + } finally { + shutdownSignal.set(true); + logEventWorker.join(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkGeneratorTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkGeneratorTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkGeneratorTest.java new file mode 100644 index 0000000..4495e19 --- /dev/null +++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkGeneratorTest.java @@ -0,0 +1,265 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.queries.ChangeLogEntry; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.querymanager.QueryManager.QueryEvent; +import org.apache.rya.streams.querymanager.QueryManager.QueryEventWorkGenerator; +import org.junit.Test; + +/** + * Unit tests the methods of {@link QueryEventWorkGenerator}. + */ +public class QueryEventWorkGeneratorTest { + + @Test + public void shutdownSignalKillsThread() throws Exception { + // The signal that will kill the notifying thread. + final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + + // The queue generated work is offered to. + final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1); + + // The listener that will perform the QueryEventWorkGenerator work. + final QueryEventWorkGenerator generator = + new QueryEventWorkGenerator("rya", new CountDownLatch(1), queue, 50, TimeUnit.MILLISECONDS, shutdownSignal); + + // A thread that will attempt to notify the generator with a created query. + final Thread notifyThread = new Thread(() -> { + generator.notify(mock(ChangeLogEntry.class), Optional.empty()); + }); + + // Fill the queue so that nothing may be offered to it. + queue.offer(QueryEvent.stopALL("rya")); + + // Start the thread and show that it is still alive after the offer period. + notifyThread.start(); + assertTrue( ThreadUtil.stillAlive(notifyThread, 200) ); + + // Set the shutdown signal to true and join the thread. If we were able to join, then it shut down. + shutdownSignal.set(true); + assertFalse( ThreadUtil.stillAlive(notifyThread, 1000) ); + } + + @Test + public void waitsForSubscriptionWork() throws Exception { + // The signal that will kill the notifying thread. + final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + + // The queue generated work is offered to. + final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1); + + // The listener that will perform the QueryEventWorkGenerator work. + final CountDownLatch latch = new CountDownLatch(1); + final QueryEventWorkGenerator generator = + new QueryEventWorkGenerator("rya", latch, queue, 50, TimeUnit.MILLISECONDS, shutdownSignal); + + // A thread that will attempt to notify the generator with a created query. + final UUID queryId = UUID.randomUUID(); + final StreamsQuery query = new StreamsQuery(queryId, "query", true); + final Thread notifyThread = new Thread(() -> { + final QueryChange change = QueryChange.create(queryId, query.getSparql(), query.isActive()); + final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, change); + generator.notify(entry, Optional.of(query)); + }); + + // Start the thread. + notifyThread.start(); + + try { + // Wait longer than the blocking period and show the thread is still alive and nothing has been added + // to the work queue. + Thread.sleep(150); + assertTrue( notifyThread.isAlive() ); + + // Count down the latch. + latch.countDown(); + + // Show work was added to the queue and the notifying thread died. + final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS); + final QueryEvent expected = QueryEvent.executing("rya", new StreamsQuery(queryId, query.getSparql(), query.isActive())); + assertEquals(expected, event); + } finally { + shutdownSignal.set(true); + notifyThread.join(); + } + } + + @Test + public void notifyCreate() throws Exception { + // The signal that will kill the notifying thread. + final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + + // The queue generated work is offered to. + final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1); + + // The listener that will perform the QueryEventWorkGenerator work. + final CountDownLatch latch = new CountDownLatch(1); + latch.countDown(); + final QueryEventWorkGenerator generator = + new QueryEventWorkGenerator("rya", latch, queue, 50, TimeUnit.MILLISECONDS, shutdownSignal); + + // A thread that will attempt to notify the generator with a created query. + final UUID queryId = UUID.randomUUID(); + final StreamsQuery query = new StreamsQuery(queryId, "query", true); + final Thread notifyThread = new Thread(() -> { + final QueryChange change = QueryChange.create(queryId, query.getSparql(), query.isActive()); + final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, change); + generator.notify(entry, Optional.of(query)); + }); + + // Start the thread. + notifyThread.start(); + + try { + // Show work was added to the queue and the notifying thread died. + final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS); + final QueryEvent expected = QueryEvent.executing("rya", new StreamsQuery(queryId, query.getSparql(), query.isActive())); + assertEquals(expected, event); + } finally { + shutdownSignal.set(true); + notifyThread.join(); + } + } + + @Test + public void notifyDelete() throws Exception { + // The signal that will kill the notifying thread. + final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + + // The queue generated work is offered to. + final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1); + + // The listener that will perform the QueryEventWorkGenerator work. + final CountDownLatch latch = new CountDownLatch(1); + latch.countDown(); + final QueryEventWorkGenerator generator = + new QueryEventWorkGenerator("rya", latch, queue, 50, TimeUnit.MILLISECONDS, shutdownSignal); + + // A thread that will attempt to notify the generator with a deleted query. + final UUID queryId = UUID.randomUUID(); + final Thread notifyThread = new Thread(() -> { + final QueryChange change = QueryChange.delete(queryId); + final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, change); + generator.notify(entry, Optional.empty()); + }); + + // Start the thread. + notifyThread.start(); + + try { + // Show work was added to the queue and the notifying thread died. + final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS); + final QueryEvent expected = QueryEvent.stopped("rya", queryId); + assertEquals(expected, event); + } finally { + shutdownSignal.set(true); + notifyThread.join(); + } + } + + @Test + public void notifyUpdate_isActive() throws Exception { + // The signal that will kill the notifying thread. + final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + + // The queue generated work is offered to. + final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1); + + // The listener that will perform the QueryEventWorkGenerator work. + final CountDownLatch latch = new CountDownLatch(1); + latch.countDown(); + final QueryEventWorkGenerator generator = + new QueryEventWorkGenerator("rya", latch, queue, 50, TimeUnit.MILLISECONDS, shutdownSignal); + + // A thread that will attempt to notify the generator with an update query change. + final UUID queryId = UUID.randomUUID(); + final StreamsQuery query = new StreamsQuery(queryId, "query", true); + final Thread notifyThread = new Thread(() -> { + final QueryChange change = QueryChange.update(queryId, true); + final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, change); + generator.notify(entry, Optional.of(query)); + }); + + // Start the thread. + notifyThread.start(); + + try { + // Show work was added to the queue and the notifying thread died. + final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS); + final QueryEvent expected = QueryEvent.executing("rya", new StreamsQuery(queryId, query.getSparql(), query.isActive())); + assertEquals(expected, event); + } finally { + shutdownSignal.set(true); + notifyThread.join(); + } + } + + @Test + public void notifyUpdate_isNotActive() throws Exception { + // The signal that will kill the notifying thread. + final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + + // The queue generated work is offered to. + final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1); + + // The listener that will perform the QueryEventWorkGenerator work. + final CountDownLatch latch = new CountDownLatch(1); + latch.countDown(); + final QueryEventWorkGenerator generator = + new QueryEventWorkGenerator("rya", latch, queue, 50, TimeUnit.MILLISECONDS, shutdownSignal); + + // A thread that will attempt to notify the generator with an update query change. + final UUID queryId = UUID.randomUUID(); + final StreamsQuery query = new StreamsQuery(queryId, "query", false); + final Thread notifyThread = new Thread(() -> { + final QueryChange change = QueryChange.update(queryId, false); + final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, change); + generator.notify(entry, Optional.of(query)); + }); + + // Start the thread. + notifyThread.start(); + + try { + // Show work was added to the queue and the notifying thread died. + final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS); + final QueryEvent expected = QueryEvent.stopped("rya", queryId); + assertEquals(expected, event); + } finally { + shutdownSignal.set(true); + notifyThread.join(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkerTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkerTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkerTest.java new file mode 100644 index 0000000..33c0719 --- /dev/null +++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkerTest.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; + +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.querymanager.QueryManager.QueryEvent; +import org.apache.rya.streams.querymanager.QueryManager.QueryEventWorker; +import org.junit.Test; + +/** + * Unit tests the methods of {@link QueryManager.QueryEventWorker}. + */ +public class QueryEventWorkerTest { + + @Test + public void shutdownSignalKillsThread() { + // The signal that will kill the working thread. + final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + + // The thread that will perform the QueryEventWorker task. + final Thread queryEventWorker = new Thread(new QueryEventWorker(new ArrayBlockingQueue<>(1), + mock(QueryExecutor.class), 50, TimeUnit.MILLISECONDS, shutdownSignal)); + queryEventWorker.start(); + + // Wait longer than the poll time to see if the thread died. Show that it is still running. + assertTrue(ThreadUtil.stillAlive(queryEventWorker, 200)); + + // Set the shutdown signal to true and join the thread. If we were able to join, then it shut down. + shutdownSignal.set(true); + assertFalse(ThreadUtil.stillAlive(queryEventWorker, 1000)); + } + + @Test + public void executingWork() throws Exception { + // The signal that will kill the working thread. + final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + + // The queue used to send the execute work to the thread. + final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1); + + // The message that indicates a query needs to be executed. + final String ryaInstance = "rya"; + final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "sparql", true); + final QueryEvent executingEvent = QueryEvent.executing(ryaInstance, query); + + // Release a latch if the startQuery method on the queryExecutor is invoked with the correct values. + final CountDownLatch startQueryInvoked = new CountDownLatch(1); + final QueryExecutor queryExecutor = mock(QueryExecutor.class); + doAnswer(invocation -> { + startQueryInvoked.countDown(); + return null; + }).when(queryExecutor).startQuery(ryaInstance, query); + + // The thread that will perform the QueryEventWorker task. + final Thread queryEventWorker = new Thread(new QueryEventWorker(queue, + queryExecutor, 50, TimeUnit.MILLISECONDS, shutdownSignal)); + try { + queryEventWorker.start(); + + // Provide a message indicating a query needs to be executing. + queue.put(executingEvent); + + // Verify the Query Executor was told to start the query. + assertTrue( startQueryInvoked.await(150, TimeUnit.MILLISECONDS) ); + } finally { + shutdownSignal.set(true); + queryEventWorker.join(); + } + } + + @Test + public void stoppedWork() throws Exception { + // The signal that will kill the working thread. + final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + + // The queue used to send the execute work to the thread. + final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1); + + // The message that indicates a query needs to be stopped. + final UUID queryId = UUID.randomUUID(); + final QueryEvent stoppedEvent = QueryEvent.stopped("rya", queryId); + + // Release a latch if the stopQuery method on the queryExecutor is invoked with the correct values. + final CountDownLatch stopQueryInvoked = new CountDownLatch(1); + final QueryExecutor queryExecutor = mock(QueryExecutor.class); + doAnswer(invocation -> { + stopQueryInvoked.countDown(); + return null; + }).when(queryExecutor).stopQuery(queryId); + + final Thread queryEventWorker = new Thread(new QueryEventWorker(queue, + queryExecutor, 50, TimeUnit.MILLISECONDS, shutdownSignal)); + try { + // The thread that will perform the QueryEventWorker task. + queryEventWorker.start(); + + // Provide a message indicating a query needs to be executing. + queue.put(stoppedEvent); + + // Verify the Query Executor was told to stop the query. + assertTrue( stopQueryInvoked.await(150, TimeUnit.MILLISECONDS) ); + } finally { + shutdownSignal.set(true); + queryEventWorker.join(); + } + } + + @Test + public void stopAllWork() throws Exception { + // The signal that will kill the working thread. + final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + + // The queue used to send the execute work to the thread. + final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1); + + // The message that indicates all queries for a rya instance need to be stopped. + final String ryaInstance = "rya"; + final QueryEvent stopAllEvent = QueryEvent.stopALL(ryaInstance); + + // Release a latch if the stopQuery method on the queryExecutor is invoked with the correct values. + final CountDownLatch testMethodInvoked = new CountDownLatch(1); + final QueryExecutor queryExecutor = mock(QueryExecutor.class); + doAnswer(invocation -> { + testMethodInvoked.countDown(); + return null; + }).when(queryExecutor).stopAll(ryaInstance); + + final Thread queryEventWorker = new Thread(new QueryEventWorker(queue, + queryExecutor, 50, TimeUnit.MILLISECONDS, shutdownSignal)); + try { + // The thread that will perform the QueryEventWorker task. + queryEventWorker.start(); + + // Provide a message indicating a query needs to be executing. + queue.put(stopAllEvent); + + // Verify the Query Executor was told to stop all the queries. + assertTrue( testMethodInvoked.await(150, TimeUnit.MILLISECONDS) ); + } finally { + shutdownSignal.set(true); + queryEventWorker.join(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java index a1203a0..04e70c0 100644 --- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java +++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java @@ -1,18 +1,20 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.rya.streams.querymanager; @@ -34,13 +36,10 @@ import org.apache.rya.streams.api.queries.QueryChangeLog; import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener; import org.junit.Test; -import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; - /** - * Test for the {@link QueryManager} + * Unit tests the methods of {@link QueryManager}. */ public class QueryManagerTest { - private static final Scheduler TEST_SCHEDULER = Scheduler.newFixedRateSchedule(0, 100, TimeUnit.MILLISECONDS); /** * Tests when the query manager is notified to create a new query, the query @@ -74,7 +73,7 @@ public class QueryManagerTest { return null; }).when(source).subscribe(any(SourceListener.class)); - final QueryManager qm = new QueryManager(qe, source, TEST_SCHEDULER); + final QueryManager qm = new QueryManager(qe, source, 50, TimeUnit.MILLISECONDS); try { qm.startAndWait(); queryStarted.await(5, TimeUnit.SECONDS); @@ -128,7 +127,7 @@ public class QueryManagerTest { return null; }).when(source).subscribe(any(SourceListener.class)); - final QueryManager qm = new QueryManager(qe, source, TEST_SCHEDULER); + final QueryManager qm = new QueryManager(qe, source, 50, TimeUnit.MILLISECONDS); try { qm.startAndWait(); queryDeleted.await(5, TimeUnit.SECONDS); @@ -183,7 +182,7 @@ public class QueryManagerTest { return null; }).when(source).subscribe(any(SourceListener.class)); - final QueryManager qm = new QueryManager(qe, source, TEST_SCHEDULER); + final QueryManager qm = new QueryManager(qe, source, 50, TimeUnit.MILLISECONDS); try { qm.startAndWait(); queryDeleted.await(10, TimeUnit.SECONDS); @@ -192,4 +191,4 @@ public class QueryManagerTest { qm.stopAndWait(); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/ThreadUtil.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/ThreadUtil.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/ThreadUtil.java new file mode 100644 index 0000000..9896e31 --- /dev/null +++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/ThreadUtil.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager; + +import static java.util.Objects.requireNonNull; + +/** + * Utilities that are useful for interacting with {@link Thread}s while testing. + */ +public class ThreadUtil { + + /** + * Private constructor to prevent instantiation. + */ + private ThreadUtil() { } + + /** + * A utility function that returns whether a thread is alive or not after waiting + * some specified period of time to join it. + * + * @param thread - The thread that will be joined. (not null) + * @param millis - How long to wait to join the thread. + * @return {@code true} if the thread is still alive, otherwise {@code false}. + */ + public static boolean stillAlive(final Thread thread, final long millis) { + requireNonNull(thread); + try { + thread.join(millis); + } catch (final InterruptedException e) { } + return thread.isAlive(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java index 3cbe894..f9c8a03 100644 --- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java +++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java @@ -35,6 +35,7 @@ import org.apache.rya.streams.api.interactor.LoadStatements; import org.apache.rya.streams.kafka.KafkaStreamsFactory; import org.apache.rya.streams.kafka.KafkaTopics; import org.apache.rya.streams.kafka.SingleThreadKafkaStreamsFactory; +import org.apache.rya.streams.kafka.interactor.CreateKafkaTopic; import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements; import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer; @@ -119,9 +120,10 @@ public class LocalQueryExecutorIT { expected.add(new VisibilityBindingSet(bs, "a")); // Start the executor that will be tested. + final CreateKafkaTopic createKafkaTopic = new CreateKafkaTopic( kafka.getZookeeperServers() ); final String kafkaServers = kafka.getKafkaHostname() + ":" + kafka.getKafkaPort(); final KafkaStreamsFactory jobFactory = new SingleThreadKafkaStreamsFactory(kafkaServers); - final QueryExecutor executor = new LocalQueryExecutor(jobFactory); + final QueryExecutor executor = new LocalQueryExecutor(createKafkaTopic, jobFactory); executor.startAndWait(); try { // Start the query. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java index 0df5794..c0f888e 100644 --- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java +++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java @@ -32,6 +32,7 @@ import java.util.UUID; import org.apache.kafka.streams.KafkaStreams; import org.apache.rya.streams.api.entity.StreamsQuery; import org.apache.rya.streams.kafka.KafkaStreamsFactory; +import org.apache.rya.streams.kafka.interactor.CreateKafkaTopic; import org.apache.rya.streams.querymanager.QueryExecutor; import org.junit.Test; @@ -44,7 +45,7 @@ public class LocalQueryExecutorTest { @Test(expected = IllegalStateException.class) public void startQuery_serviceNotStarted() throws Exception { - final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class)); + final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class)); executor.startQuery("rya", new StreamsQuery(UUID.randomUUID(), "query", true)); } @@ -60,7 +61,7 @@ public class LocalQueryExecutorTest { when(jobFactory.make(eq(ryaInstance), eq(query))).thenReturn(queryJob); // Start the executor that will be tested. - final QueryExecutor executor = new LocalQueryExecutor(jobFactory); + final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory); executor.startAndWait(); try { // Tell the executor to start the query. @@ -75,14 +76,14 @@ public class LocalQueryExecutorTest { @Test(expected = IllegalStateException.class) public void stopQuery_serviceNotStarted() throws Exception { - final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class)); + final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class)); executor.stopQuery(UUID.randomUUID()); } @Test public void stopQuery_queryNotRunning() throws Exception { // Start an executor. - final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class)); + final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class)); executor.startAndWait(); try { // Try to stop a query that was never stareted. @@ -104,7 +105,7 @@ public class LocalQueryExecutorTest { when(jobFactory.make(eq(ryaInstance), eq(query))).thenReturn(queryJob); // Start the executor that will be tested. - final QueryExecutor executor = new LocalQueryExecutor(jobFactory); + final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory); executor.startAndWait(); try { // Tell the executor to start the query. @@ -122,7 +123,7 @@ public class LocalQueryExecutorTest { @Test(expected = IllegalStateException.class) public void stopAll_serviceNotStarted() throws Exception { - final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class)); + final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class)); executor.stopAll("rya"); } @@ -141,7 +142,7 @@ public class LocalQueryExecutorTest { when(jobFactory.make(eq(ryaInstance), eq(query2))).thenReturn(queryJob2); // Start the executor that will be tested. - final QueryExecutor executor = new LocalQueryExecutor(jobFactory); + final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory); executor.startAndWait(); try { // Tell the executor to start the queries. @@ -180,7 +181,7 @@ public class LocalQueryExecutorTest { when(jobFactory.make(eq(ryaInstance2), eq(query2))).thenReturn(queryJob2); // Start the executor that will be tested. - final QueryExecutor executor = new LocalQueryExecutor(jobFactory); + final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory); executor.startAndWait(); try { // Tell the executor to start the queries. @@ -205,14 +206,14 @@ public class LocalQueryExecutorTest { @Test(expected = IllegalStateException.class) public void getRunningQueryIds_serviceNotStarted() throws Exception { - final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class)); + final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class)); executor.getRunningQueryIds(); } @Test public void getRunningQueryIds_noneStarted() throws Exception { // Start an executor. - final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class)); + final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class)); executor.startAndWait(); try { // Get the list of running queries. @@ -240,7 +241,7 @@ public class LocalQueryExecutorTest { when(jobFactory.make(eq(ryaInstance), eq(query3))).thenReturn(mock(KafkaStreams.class)); // Start the executor that will be tested. - final QueryExecutor executor = new LocalQueryExecutor(jobFactory); + final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory); executor.startAndWait(); try { // Start the queries. @@ -275,7 +276,7 @@ public class LocalQueryExecutorTest { when(jobFactory.make(eq(ryaInstance), eq(query3))).thenReturn(mock(KafkaStreams.class)); // Start the executor that will be tested. - final QueryExecutor executor = new LocalQueryExecutor(jobFactory); + final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory); executor.startAndWait(); try { // Start the queries. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java index f2b50ab..de6b9f3 100644 --- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java +++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java @@ -45,6 +45,11 @@ public class QueryManagerConfigMarshallerTest { " <port>6</port>\n" + " </kafka>\n" + " </queryChangeLogSource>\n" + + " <queryExecutor>\n" + + " <localKafkaStreams>\n" + + " <zookeepers>zoo1,zoo2,zoo3</zookeepers>\n" + + " </localKafkaStreams>\n" + + " </queryExecutor>\n" + " <performanceTunning>\n" + " <queryChanngeLogDiscoveryPeriod>\n" + " <value>1</value>\n" +