http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
 
b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
index 947a215..3a59636 100644
--- 
a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
+++ 
b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java
@@ -32,10 +32,15 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.rya.streams.api.entity.StreamsQuery;
 import org.apache.rya.streams.kafka.KafkaStreamsFactory;
 import 
org.apache.rya.streams.kafka.KafkaStreamsFactory.KafkaStreamsFactoryException;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.interactor.CreateKafkaTopic;
 import org.apache.rya.streams.querymanager.QueryExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.AbstractIdleService;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
@@ -50,6 +55,7 @@ import kafka.consumer.KafkaStream;
  */
 @DefaultAnnotation(NonNull.class)
 public class LocalQueryExecutor extends AbstractIdleService implements 
QueryExecutor {
+    private static final Logger log = 
LoggerFactory.getLogger(LocalQueryExecutor.class);
 
     /**
      * Provides thread safety when interacting with this class.
@@ -72,6 +78,11 @@ public class LocalQueryExecutor extends AbstractIdleService 
implements QueryExec
     private final Map<UUID, KafkaStreams> byQueryId = new HashMap<>();
 
     /**
+     * Used to create the input and output topics for a Kafka Streams job.
+     */
+    private final CreateKafkaTopic createKafkaTopic;
+
+    /**
      * Builds the {@link KafkaStreams} objects that execute {@link 
KafkaStream}s.
      */
     private final KafkaStreamsFactory streamsFactory;
@@ -79,23 +90,31 @@ public class LocalQueryExecutor extends AbstractIdleService 
implements QueryExec
     /**
      * Constructs an instance of {@link LocalQueryExecutor}.
      *
+     * @param createKafkaTopic - Used to create the input and output topics 
for a Kafka Streams job. (not null)
      * @param streamsFactory - Builds the {@link KafkaStreams} objects that 
execute {@link KafkaStream}s. (not null)
      */
-    public LocalQueryExecutor(final KafkaStreamsFactory streamsFactory) {
+    public LocalQueryExecutor(
+            final CreateKafkaTopic createKafkaTopic,
+            final KafkaStreamsFactory streamsFactory) {
+        this.createKafkaTopic = requireNonNull(createKafkaTopic);
         this.streamsFactory = requireNonNull(streamsFactory);
     }
 
     @Override
     protected void startUp() throws Exception {
-        // Nothing to do.
+        log.info("Local Query Executor starting up.");
     }
 
     @Override
     protected void shutDown() throws Exception {
+        log.info("Local Query Executor shutting down. Stopping all jobs...");
+
         // Stop all of the running queries.
         for(final KafkaStreams job : byQueryId.values()) {
             job.close();
         }
+
+        log.info("Local Query Executor shut down.");
     }
 
     @Override
@@ -106,6 +125,14 @@ public class LocalQueryExecutor extends 
AbstractIdleService implements QueryExec
 
         lock.lock();
         try {
+            // Make sure the Statements topic exists for the query.
+            final Set<String> topics = Sets.newHashSet(
+                    KafkaTopics.statementsTopic(ryaInstance),
+                    KafkaTopics.queryResultsTopic(query.getQueryId()));
+
+            // Make sure the Query Results topic exists for the query.
+            createKafkaTopic.createTopics(topics, 1, 1);
+
             // Setup the Kafka Streams job that will execute.
             final KafkaStreams streams = streamsFactory.make(ryaInstance, 
query);
             streams.start();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/main/xsd/QueryManagerConfig.xsd
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/query-manager/src/main/xsd/QueryManagerConfig.xsd 
b/extras/rya.streams/query-manager/src/main/xsd/QueryManagerConfig.xsd
index c1285d4..21170bb 100644
--- a/extras/rya.streams/query-manager/src/main/xsd/QueryManagerConfig.xsd
+++ b/extras/rya.streams/query-manager/src/main/xsd/QueryManagerConfig.xsd
@@ -30,6 +30,13 @@ under the License.
             </xs:choice>
           </xs:complexType>
         </xs:element>
+        <xs:element name="queryExecutor">
+            <xs:complexType>
+              <xs:choice>
+                <xs:element name="localKafkaStreams" type="localKafkaStreams"/>
+              </xs:choice>
+            </xs:complexType>
+        </xs:element>
         <xs:element name="performanceTunning">
           <xs:complexType>
             <xs:sequence>
@@ -65,6 +72,13 @@ under the License.
     </xs:sequence>
   </xs:complexType>
   
+  <!-- Define what a local Kafka Streams query executor looks like. -->
+  <xs:complexType name="localKafkaStreams">
+    <xs:sequence>
+      <xs:element name="zookeepers" type="xs:string"/>
+    </xs:sequence>
+  </xs:complexType>
+  
   <!-- Define the legal range for a TCP port. -->
   <xs:simpleType name="tcpPort">
     <xs:restriction base="xs:int">

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkGeneratorTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkGeneratorTest.java
 
b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkGeneratorTest.java
new file mode 100644
index 0000000..da9be78
--- /dev/null
+++ 
b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkGeneratorTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.querymanager.QueryManager.LogEvent;
+import org.apache.rya.streams.querymanager.QueryManager.LogEvent.LogEventType;
+import org.apache.rya.streams.querymanager.QueryManager.LogEventWorkGenerator;
+import org.junit.Test;
+
+/**
+ * Unit tests the methods of {@link LogEventWorkGenerator}.
+ */
+public class LogEventWorkGeneratorTest {
+
+    @Test
+    public void shutdownSignalKillsThread() {
+        // The signal that will kill the notifying thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue generated work is offered to.
+        final BlockingQueue<LogEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The listener that will perform the LogEventWorkGenerator work.
+        final LogEventWorkGenerator generator = new 
LogEventWorkGenerator(queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+        // A thread that will attempt to notify the generator with a created 
change log.
+        final Thread notifyThread = new Thread(() -> {
+            generator.notifyCreate("rya", mock(QueryChangeLog.class));
+        });
+
+        // Fill the queue so that nothing may be offered to it.
+        queue.offer(LogEvent.delete("rya"));
+
+        // Start the thread and show that it is still alive after the offer 
period.
+        notifyThread.start();
+        assertTrue( ThreadUtil.stillAlive(notifyThread, 200) );
+
+        // Set the shutdown signal to true and join the thread. If we were 
able to join, then it shut down.
+        shutdownSignal.set(true);
+        assertFalse( ThreadUtil.stillAlive(notifyThread, 1000) );
+    }
+
+    @Test
+    public void notifyCreate() throws Exception {
+        // The signal that will kill the notifying thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue generated work is offered to.
+        final BlockingQueue<LogEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The listener that will perform the LogEventWorkGenerator work.
+        final LogEventWorkGenerator generator = new 
LogEventWorkGenerator(queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+        // A thread that will attempt to notify the generator with a created 
change log.
+        final CountDownLatch notified = new CountDownLatch(1);
+        final Thread notifyThread = new Thread(() -> {
+            generator.notifyCreate("rya", mock(QueryChangeLog.class));
+            notified.countDown();
+        });
+
+        try {
+            // Start the thread that performs the notification.
+            notifyThread.start();
+
+            // Wait for the thread to indicate it has notified and check the 
queue for the value.
+            notified.await(200, TimeUnit.MILLISECONDS);
+            final LogEvent event = queue.poll(200, TimeUnit.MILLISECONDS);
+            assertEquals(LogEventType.CREATE, event.getEventType());
+            assertEquals("rya", event.getRyaInstanceName());
+        } finally {
+            shutdownSignal.set(true);
+            notifyThread.join();
+        }
+    }
+
+    @Test
+    public void notifyDelete() throws Exception {
+        // The signal that will kill the notifying thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue generated work is offered to.
+        final BlockingQueue<LogEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The listener that will perform the LogEventWorkGenerator work.
+        final LogEventWorkGenerator generator = new 
LogEventWorkGenerator(queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+        // A thread that will attempt to notify the generator with a deleted 
change log.
+        final CountDownLatch notified = new CountDownLatch(1);
+        final Thread notifyThread = new Thread(() -> {
+            generator.notifyDelete("rya");
+            notified.countDown();
+        });
+
+        try {
+            // Start the thread that performs the notification.
+            notifyThread.start();
+
+            // Wait for the thread to indicate it has notified and check the 
queue for the value.
+            notified.await(200, TimeUnit.MILLISECONDS);
+            final LogEvent event = queue.poll(200, TimeUnit.MILLISECONDS);
+            assertEquals(LogEventType.DELETE, event.getEventType());
+            assertEquals("rya", event.getRyaInstanceName());
+        } finally {
+            shutdownSignal.set(true);
+            notifyThread.join();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkerTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkerTest.java
 
b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkerTest.java
new file mode 100644
index 0000000..cb708ed
--- /dev/null
+++ 
b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/LogEventWorkerTest.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.InMemoryQueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.querymanager.QueryManager.LogEvent;
+import org.apache.rya.streams.querymanager.QueryManager.LogEventWorker;
+import org.apache.rya.streams.querymanager.QueryManager.QueryEvent;
+import org.junit.Test;
+
+/**
+ * Unit tests the methods of {@link LogEventWorker}.
+ */
+public class LogEventWorkerTest {
+
+    @Test
+    public void shutdownSignalKillsThread() {
+        // The signal that will kill the working thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The thread that will perform the LogEventWorker task.
+        final Thread logEventWorker = new Thread(new LogEventWorker(new 
ArrayBlockingQueue<>(1),
+                new ArrayBlockingQueue<>(1), 50, TimeUnit.MILLISECONDS, 
shutdownSignal));
+        logEventWorker.start();
+
+        // Wait longer than the poll time to see if the thread died. Show that 
it is still running.
+        assertTrue(ThreadUtil.stillAlive(logEventWorker, 200));
+
+        // Set the shutdown signal to true and join the thread. If we were 
able to join, then it shut down.
+        shutdownSignal.set(true);
+        assertFalse(ThreadUtil.stillAlive(logEventWorker, 500));
+    }
+
+    @Test
+    public void nofity_logCreated_doesNotExist() throws Exception {
+        // The signal that will kill the working thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue used to feed work.
+        final BlockingQueue<LogEvent> logEventQueue = new 
ArrayBlockingQueue<>(10);
+
+        // The queue work is written to.
+        final BlockingQueue<QueryEvent> queryEventQueue = new 
ArrayBlockingQueue<>(10);
+
+        // The Query Change Log that will be watched.
+        final QueryChangeLog changeLog = new InMemoryQueryChangeLog();
+
+        // Write a message that indicates a new query should be active.
+        final UUID firstQueryId = UUID.randomUUID();
+        changeLog.write(QueryChange.create(firstQueryId, "select * where { ?a 
?b ?c . }", true));
+
+        // Write a message that adds an active query, but then makes it 
inactive. Because both of these
+        // events are written to the log before the worker subscribes to the 
repository for updates, they
+        // must result in a single query stopped event.
+        final UUID secondQueryId = UUID.randomUUID();
+        changeLog.write(QueryChange.create(secondQueryId, "select * where { ?d 
?e ?f . }", true));
+        changeLog.write(QueryChange.update(secondQueryId, false));
+
+        // Start the worker that will be tested.
+        final Thread logEventWorker = new Thread(new 
LogEventWorker(logEventQueue,
+                queryEventQueue, 50, TimeUnit.MILLISECONDS, shutdownSignal));
+        logEventWorker.start();
+
+        try {
+            // Write a unit of work that indicates a log was created.
+            final LogEvent createLogEvent = LogEvent.create("rya", changeLog);
+            logEventQueue.offer(createLogEvent);
+
+            // We must see the following Query Events added to the work queue.
+            // Query 1, executing.
+            // Query 2, stopped.
+            Set<QueryEvent> expectedEvents = new HashSet<>();
+            expectedEvents.add(QueryEvent.executing("rya",
+                    new StreamsQuery(firstQueryId, "select * where { ?a ?b ?c 
. }", true)));
+            expectedEvents.add(QueryEvent.stopped("rya", secondQueryId));
+
+            Set<QueryEvent> queryEvents = new HashSet<>();
+            queryEvents.add( queryEventQueue.poll(500, TimeUnit.MILLISECONDS) 
);
+            queryEvents.add( queryEventQueue.poll(500, TimeUnit.MILLISECONDS) 
);
+
+            assertEquals(expectedEvents, queryEvents);
+
+            // Write an event to the change log that stops the first query.
+            changeLog.write(QueryChange.update(firstQueryId, false));
+
+            // Show it was also reflected in the changes.
+            // Query 1, stopped.
+            expectedEvents = new HashSet<>();
+            expectedEvents.add(QueryEvent.stopped("rya", firstQueryId));
+
+            queryEvents = new HashSet<>();
+            queryEvents.add( queryEventQueue.poll(500, TimeUnit.MILLISECONDS) 
);
+
+            assertEquals(expectedEvents, queryEvents);
+        } finally {
+            shutdownSignal.set(true);
+            logEventWorker.join();
+        }
+    }
+
+    @Test
+    public void nofity_logCreated_exists() throws Exception {
+        // The signal that will kill the working thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue used to feed work.
+        final BlockingQueue<LogEvent> logEventQueue = new 
ArrayBlockingQueue<>(10);
+
+        // The queue work is written to.
+        final BlockingQueue<QueryEvent> queryEventQueue = new 
ArrayBlockingQueue<>(10);
+
+        // The Query Change Log that will be watched.
+        final QueryChangeLog changeLog = new InMemoryQueryChangeLog();
+
+        // Write a message that indicates a new query should be active.
+        final UUID firstQueryId = UUID.randomUUID();
+        changeLog.write(QueryChange.create(firstQueryId, "select * where { ?a 
?b ?c . }", true));
+
+        // Start the worker that will be tested.
+        final Thread logEventWorker = new Thread(new 
LogEventWorker(logEventQueue,
+                queryEventQueue, 50, TimeUnit.MILLISECONDS, shutdownSignal));
+        logEventWorker.start();
+
+        try {
+            // Write a unit of work that indicates a log was created.
+            final LogEvent createLogEvent = LogEvent.create("rya", changeLog);
+            logEventQueue.offer(createLogEvent);
+
+            // Say the same log was created a second time.
+            logEventQueue.offer(createLogEvent);
+
+            // Show that only a single unit of work was added for the log. 
This indicates the
+            // second message was effectively skipped as it would have add its 
work added twice otherwise.
+            final Set<QueryEvent> expectedEvents = new HashSet<>();
+            expectedEvents.add(QueryEvent.executing("rya",
+                    new StreamsQuery(firstQueryId, "select * where { ?a ?b ?c 
. }", true)));
+
+            final Set<QueryEvent> queryEvents = new HashSet<>();
+            queryEvents.add( queryEventQueue.poll(500, TimeUnit.MILLISECONDS) 
);
+
+            assertNull(queryEventQueue.poll(500, TimeUnit.MILLISECONDS));
+            assertEquals(expectedEvents, queryEvents);
+        } finally {
+            shutdownSignal.set(true);
+            logEventWorker.join();
+        }
+    }
+
+    @Test
+    public void notify_logDeleted_exists() throws Exception {
+        // The signal that will kill the working thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue used to feed work.
+        final BlockingQueue<LogEvent> logEventQueue = new 
ArrayBlockingQueue<>(10);
+
+        // The queue work is written to.
+        final BlockingQueue<QueryEvent> queryEventQueue = new 
ArrayBlockingQueue<>(10);
+
+        // The Query Change Log that will be watched.
+        final QueryChangeLog changeLog = new InMemoryQueryChangeLog();
+
+        // Start the worker that will be tested.
+        final Thread logEventWorker = new Thread(new 
LogEventWorker(logEventQueue,
+                queryEventQueue, 50, TimeUnit.MILLISECONDS, shutdownSignal));
+        logEventWorker.start();
+
+        try {
+            // Write a unit of work that indicates a log was created.
+            final LogEvent createLogEvent = LogEvent.create("rya", changeLog);
+            logEventQueue.offer(createLogEvent);
+
+            // Write a unit of work that indicates a log was deleted.
+            logEventQueue.offer(LogEvent.delete("rya"));
+
+            // Show that a single unit of work was created for deleting 
everything for "rya".
+            assertEquals(QueryEvent.stopALL("rya"),  queryEventQueue.poll(500, 
TimeUnit.MILLISECONDS));
+            assertNull(queryEventQueue.poll(500, TimeUnit.MILLISECONDS));
+        } finally {
+            shutdownSignal.set(true);
+            logEventWorker.join();
+        }
+    }
+
+    @Test
+    public void notify_logDeleted_doesNotExist() throws Exception {
+        // The signal that will kill the working thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue used to feed work.
+        final BlockingQueue<LogEvent> logEventQueue = new 
ArrayBlockingQueue<>(10);
+
+        // The queue work is written to.
+        final BlockingQueue<QueryEvent> queryEventQueue = new 
ArrayBlockingQueue<>(10);
+
+        // Start the worker that will be tested.
+        final Thread logEventWorker = new Thread(new 
LogEventWorker(logEventQueue,
+                queryEventQueue, 50, TimeUnit.MILLISECONDS, shutdownSignal));
+        logEventWorker.start();
+
+        try {
+            // Write a unit of work that indicates a log was deleted. Since it 
was never created,
+            // this will not cause anything to be written to the QueryEvent 
queue.
+            logEventQueue.offer(LogEvent.delete("rya"));
+
+            // Show that a single unit of work was created for deleting 
everything for "rya".
+            assertNull(queryEventQueue.poll(500, TimeUnit.MILLISECONDS));
+        } finally {
+            shutdownSignal.set(true);
+            logEventWorker.join();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkGeneratorTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkGeneratorTest.java
 
b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkGeneratorTest.java
new file mode 100644
index 0000000..4495e19
--- /dev/null
+++ 
b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkGeneratorTest.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.ChangeLogEntry;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.querymanager.QueryManager.QueryEvent;
+import 
org.apache.rya.streams.querymanager.QueryManager.QueryEventWorkGenerator;
+import org.junit.Test;
+
+/**
+ * Unit tests the methods of {@link QueryEventWorkGenerator}.
+ */
+public class QueryEventWorkGeneratorTest {
+
+    @Test
+    public void shutdownSignalKillsThread() throws Exception {
+        // The signal that will kill the notifying thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue generated work is offered to.
+        final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The listener that will perform the QueryEventWorkGenerator work.
+        final QueryEventWorkGenerator generator =
+                new QueryEventWorkGenerator("rya", new CountDownLatch(1), 
queue, 50, TimeUnit.MILLISECONDS, shutdownSignal);
+
+        // A thread that will attempt to notify the generator with a created 
query.
+        final Thread notifyThread = new Thread(() -> {
+            generator.notify(mock(ChangeLogEntry.class), Optional.empty());
+        });
+
+        // Fill the queue so that nothing may be offered to it.
+        queue.offer(QueryEvent.stopALL("rya"));
+
+        // Start the thread and show that it is still alive after the offer 
period.
+        notifyThread.start();
+        assertTrue( ThreadUtil.stillAlive(notifyThread, 200) );
+
+        // Set the shutdown signal to true and join the thread. If we were 
able to join, then it shut down.
+        shutdownSignal.set(true);
+        assertFalse( ThreadUtil.stillAlive(notifyThread, 1000) );
+    }
+
+    @Test
+    public void waitsForSubscriptionWork() throws Exception {
+        // The signal that will kill the notifying thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue generated work is offered to.
+        final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The listener that will perform the QueryEventWorkGenerator work.
+        final CountDownLatch latch = new CountDownLatch(1);
+        final QueryEventWorkGenerator generator =
+                new QueryEventWorkGenerator("rya", latch, queue, 50, 
TimeUnit.MILLISECONDS, shutdownSignal);
+
+        // A thread that will attempt to notify the generator with a created 
query.
+        final UUID queryId = UUID.randomUUID();
+        final StreamsQuery query = new StreamsQuery(queryId, "query", true);
+        final Thread notifyThread = new Thread(() -> {
+            final QueryChange change = QueryChange.create(queryId, 
query.getSparql(), query.isActive());
+            final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, 
change);
+            generator.notify(entry, Optional.of(query));
+        });
+
+        // Start the thread.
+        notifyThread.start();
+
+        try {
+            // Wait longer than the blocking period and show the thread is 
still alive and nothing has been added
+            // to the work queue.
+            Thread.sleep(150);
+            assertTrue( notifyThread.isAlive() );
+
+            // Count down the latch.
+            latch.countDown();
+
+            // Show work was added to the queue and the notifying thread died.
+            final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS);
+            final QueryEvent expected = QueryEvent.executing("rya", new 
StreamsQuery(queryId, query.getSparql(), query.isActive()));
+            assertEquals(expected, event);
+        } finally {
+            shutdownSignal.set(true);
+            notifyThread.join();
+        }
+    }
+
+    @Test
+    public void notifyCreate() throws Exception {
+        // The signal that will kill the notifying thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue generated work is offered to.
+        final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The listener that will perform the QueryEventWorkGenerator work.
+        final CountDownLatch latch = new CountDownLatch(1);
+        latch.countDown();
+        final QueryEventWorkGenerator generator =
+                new QueryEventWorkGenerator("rya", latch, queue, 50, 
TimeUnit.MILLISECONDS, shutdownSignal);
+
+        // A thread that will attempt to notify the generator with a created 
query.
+        final UUID queryId = UUID.randomUUID();
+        final StreamsQuery query = new StreamsQuery(queryId, "query", true);
+        final Thread notifyThread = new Thread(() -> {
+            final QueryChange change = QueryChange.create(queryId, 
query.getSparql(), query.isActive());
+            final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, 
change);
+            generator.notify(entry, Optional.of(query));
+        });
+
+        // Start the thread.
+        notifyThread.start();
+
+        try {
+            // Show work was added to the queue and the notifying thread died.
+            final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS);
+            final QueryEvent expected = QueryEvent.executing("rya", new 
StreamsQuery(queryId, query.getSparql(), query.isActive()));
+            assertEquals(expected, event);
+        } finally {
+            shutdownSignal.set(true);
+            notifyThread.join();
+        }
+    }
+
+    @Test
+    public void notifyDelete() throws Exception {
+        // The signal that will kill the notifying thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue generated work is offered to.
+        final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The listener that will perform the QueryEventWorkGenerator work.
+        final CountDownLatch latch = new CountDownLatch(1);
+        latch.countDown();
+        final QueryEventWorkGenerator generator =
+                new QueryEventWorkGenerator("rya", latch, queue, 50, 
TimeUnit.MILLISECONDS, shutdownSignal);
+
+        // A thread that will attempt to notify the generator with a deleted 
query.
+        final UUID queryId = UUID.randomUUID();
+        final Thread notifyThread = new Thread(() -> {
+            final QueryChange change = QueryChange.delete(queryId);
+            final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, 
change);
+            generator.notify(entry, Optional.empty());
+        });
+
+        // Start the thread.
+        notifyThread.start();
+
+        try {
+            // Show work was added to the queue and the notifying thread died.
+            final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS);
+            final QueryEvent expected = QueryEvent.stopped("rya", queryId);
+            assertEquals(expected, event);
+        } finally {
+            shutdownSignal.set(true);
+            notifyThread.join();
+        }
+    }
+
+    @Test
+    public void notifyUpdate_isActive() throws Exception {
+        // The signal that will kill the notifying thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue generated work is offered to.
+        final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The listener that will perform the QueryEventWorkGenerator work.
+        final CountDownLatch latch = new CountDownLatch(1);
+        latch.countDown();
+        final QueryEventWorkGenerator generator =
+                new QueryEventWorkGenerator("rya", latch, queue, 50, 
TimeUnit.MILLISECONDS, shutdownSignal);
+
+        // A thread that will attempt to notify the generator with an update 
query change.
+        final UUID queryId = UUID.randomUUID();
+        final StreamsQuery query = new StreamsQuery(queryId, "query", true);
+        final Thread notifyThread = new Thread(() -> {
+            final QueryChange change = QueryChange.update(queryId, true);
+            final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, 
change);
+            generator.notify(entry, Optional.of(query));
+        });
+
+        // Start the thread.
+        notifyThread.start();
+
+        try {
+            // Show work was added to the queue and the notifying thread died.
+            final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS);
+            final QueryEvent expected = QueryEvent.executing("rya", new 
StreamsQuery(queryId, query.getSparql(), query.isActive()));
+            assertEquals(expected, event);
+        } finally {
+            shutdownSignal.set(true);
+            notifyThread.join();
+        }
+    }
+
+    @Test
+    public void notifyUpdate_isNotActive() throws Exception {
+        // The signal that will kill the notifying thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue generated work is offered to.
+        final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The listener that will perform the QueryEventWorkGenerator work.
+        final CountDownLatch latch = new CountDownLatch(1);
+        latch.countDown();
+        final QueryEventWorkGenerator generator =
+                new QueryEventWorkGenerator("rya", latch, queue, 50, 
TimeUnit.MILLISECONDS, shutdownSignal);
+
+        // A thread that will attempt to notify the generator with an update 
query change.
+        final UUID queryId = UUID.randomUUID();
+        final StreamsQuery query = new StreamsQuery(queryId, "query", false);
+        final Thread notifyThread = new Thread(() -> {
+            final QueryChange change = QueryChange.update(queryId, false);
+            final ChangeLogEntry<QueryChange> entry = new ChangeLogEntry<>(0, 
change);
+            generator.notify(entry, Optional.of(query));
+        });
+
+        // Start the thread.
+        notifyThread.start();
+
+        try {
+            // Show work was added to the queue and the notifying thread died.
+            final QueryEvent event = queue.poll(500, TimeUnit.MILLISECONDS);
+            final QueryEvent expected = QueryEvent.stopped("rya", queryId);
+            assertEquals(expected, event);
+        } finally {
+            shutdownSignal.set(true);
+            notifyThread.join();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkerTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkerTest.java
 
b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkerTest.java
new file mode 100644
index 0000000..33c0719
--- /dev/null
+++ 
b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryEventWorkerTest.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.querymanager.QueryManager.QueryEvent;
+import org.apache.rya.streams.querymanager.QueryManager.QueryEventWorker;
+import org.junit.Test;
+
+/**
+ * Unit tests the methods of {@link QueryManager.QueryEventWorker}.
+ */
+public class QueryEventWorkerTest {
+
+    @Test
+    public void shutdownSignalKillsThread() {
+        // The signal that will kill the working thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The thread that will perform the QueryEventWorker task.
+        final Thread queryEventWorker = new Thread(new QueryEventWorker(new 
ArrayBlockingQueue<>(1),
+                mock(QueryExecutor.class), 50, TimeUnit.MILLISECONDS, 
shutdownSignal));
+        queryEventWorker.start();
+
+        // Wait longer than the poll time to see if the thread died. Show that 
it is still running.
+        assertTrue(ThreadUtil.stillAlive(queryEventWorker, 200));
+
+        // Set the shutdown signal to true and join the thread. If we were 
able to join, then it shut down.
+        shutdownSignal.set(true);
+        assertFalse(ThreadUtil.stillAlive(queryEventWorker, 1000));
+    }
+
+    @Test
+    public void executingWork() throws Exception {
+        // The signal that will kill the working thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue used to send the execute work to the thread.
+        final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The message that indicates a query needs to be executed.
+        final String ryaInstance = "rya";
+        final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), 
"sparql", true);
+        final QueryEvent executingEvent = QueryEvent.executing(ryaInstance, 
query);
+
+        // Release a latch if the startQuery method on the queryExecutor is 
invoked with the correct values.
+        final CountDownLatch startQueryInvoked = new CountDownLatch(1);
+        final QueryExecutor queryExecutor = mock(QueryExecutor.class);
+        doAnswer(invocation -> {
+            startQueryInvoked.countDown();
+            return null;
+        }).when(queryExecutor).startQuery(ryaInstance, query);
+
+        // The thread that will perform the QueryEventWorker task.
+        final Thread queryEventWorker = new Thread(new QueryEventWorker(queue,
+                queryExecutor, 50, TimeUnit.MILLISECONDS, shutdownSignal));
+        try {
+            queryEventWorker.start();
+
+            // Provide a message indicating a query needs to be executing.
+            queue.put(executingEvent);
+
+            // Verify the Query Executor was told to start the query.
+            assertTrue( startQueryInvoked.await(150, TimeUnit.MILLISECONDS) );
+        } finally {
+            shutdownSignal.set(true);
+            queryEventWorker.join();
+        }
+    }
+
+    @Test
+    public void stoppedWork() throws Exception {
+        // The signal that will kill the working thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue used to send the execute work to the thread.
+        final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The message that indicates a query needs to be stopped.
+        final UUID queryId = UUID.randomUUID();
+        final QueryEvent stoppedEvent = QueryEvent.stopped("rya", queryId);
+
+        // Release a latch if the stopQuery method on the queryExecutor is 
invoked with the correct values.
+        final CountDownLatch stopQueryInvoked = new CountDownLatch(1);
+        final QueryExecutor queryExecutor = mock(QueryExecutor.class);
+        doAnswer(invocation -> {
+            stopQueryInvoked.countDown();
+            return null;
+        }).when(queryExecutor).stopQuery(queryId);
+
+        final Thread queryEventWorker = new Thread(new QueryEventWorker(queue,
+                queryExecutor, 50, TimeUnit.MILLISECONDS, shutdownSignal));
+        try {
+            // The thread that will perform the QueryEventWorker task.
+            queryEventWorker.start();
+
+            // Provide a message indicating a query needs to be executing.
+            queue.put(stoppedEvent);
+
+            // Verify the Query Executor was told to stop the query.
+            assertTrue( stopQueryInvoked.await(150, TimeUnit.MILLISECONDS) );
+        } finally {
+            shutdownSignal.set(true);
+            queryEventWorker.join();
+        }
+    }
+
+    @Test
+    public void stopAllWork() throws Exception {
+        // The signal that will kill the working thread.
+        final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+        // The queue used to send the execute work to the thread.
+        final BlockingQueue<QueryEvent> queue = new ArrayBlockingQueue<>(1);
+
+        // The message that indicates all queries for a rya instance need to 
be stopped.
+        final String ryaInstance = "rya";
+        final QueryEvent stopAllEvent = QueryEvent.stopALL(ryaInstance);
+
+        // Release a latch if the stopQuery method on the queryExecutor is 
invoked with the correct values.
+        final CountDownLatch testMethodInvoked = new CountDownLatch(1);
+        final QueryExecutor queryExecutor = mock(QueryExecutor.class);
+        doAnswer(invocation -> {
+            testMethodInvoked.countDown();
+            return null;
+        }).when(queryExecutor).stopAll(ryaInstance);
+
+        final Thread queryEventWorker = new Thread(new QueryEventWorker(queue,
+                queryExecutor, 50, TimeUnit.MILLISECONDS, shutdownSignal));
+        try {
+            // The thread that will perform the QueryEventWorker task.
+            queryEventWorker.start();
+
+            // Provide a message indicating a query needs to be executing.
+            queue.put(stopAllEvent);
+
+            // Verify the Query Executor was told to stop all the queries.
+            assertTrue( testMethodInvoked.await(150, TimeUnit.MILLISECONDS) );
+        } finally {
+            shutdownSignal.set(true);
+            queryEventWorker.join();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java
 
b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java
index a1203a0..04e70c0 100644
--- 
a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java
+++ 
b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java
@@ -1,18 +1,20 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 package org.apache.rya.streams.querymanager;
 
@@ -34,13 +36,10 @@ import org.apache.rya.streams.api.queries.QueryChangeLog;
 import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
 import org.junit.Test;
 
-import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
-
 /**
- * Test for the {@link QueryManager}
+ * Unit tests the methods of {@link QueryManager}.
  */
 public class QueryManagerTest {
-    private static final Scheduler TEST_SCHEDULER = 
Scheduler.newFixedRateSchedule(0, 100, TimeUnit.MILLISECONDS);
 
     /**
      * Tests when the query manager is notified to create a new query, the 
query
@@ -74,7 +73,7 @@ public class QueryManagerTest {
             return null;
         }).when(source).subscribe(any(SourceListener.class));
 
-        final QueryManager qm = new QueryManager(qe, source, TEST_SCHEDULER);
+        final QueryManager qm = new QueryManager(qe, source, 50, 
TimeUnit.MILLISECONDS);
         try {
             qm.startAndWait();
             queryStarted.await(5, TimeUnit.SECONDS);
@@ -128,7 +127,7 @@ public class QueryManagerTest {
             return null;
         }).when(source).subscribe(any(SourceListener.class));
 
-        final QueryManager qm = new QueryManager(qe, source, TEST_SCHEDULER);
+        final QueryManager qm = new QueryManager(qe, source, 50, 
TimeUnit.MILLISECONDS);
         try {
             qm.startAndWait();
             queryDeleted.await(5, TimeUnit.SECONDS);
@@ -183,7 +182,7 @@ public class QueryManagerTest {
             return null;
         }).when(source).subscribe(any(SourceListener.class));
 
-        final QueryManager qm = new QueryManager(qe, source, TEST_SCHEDULER);
+        final QueryManager qm = new QueryManager(qe, source, 50, 
TimeUnit.MILLISECONDS);
         try {
             qm.startAndWait();
             queryDeleted.await(10, TimeUnit.SECONDS);
@@ -192,4 +191,4 @@ public class QueryManagerTest {
             qm.stopAndWait();
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/ThreadUtil.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/ThreadUtil.java
 
b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/ThreadUtil.java
new file mode 100644
index 0000000..9896e31
--- /dev/null
+++ 
b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/ThreadUtil.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Utilities that are useful for interacting with {@link Thread}s while 
testing.
+ */
+public class ThreadUtil {
+
+    /**
+     * Private constructor to prevent instantiation.
+     */
+    private ThreadUtil() { }
+
+    /**
+     * A utility function that returns whether a thread is alive or not after 
waiting
+     * some specified period of time to join it.
+     *
+     * @param thread - The thread that will be joined. (not null)
+     * @param millis - How long to wait to join the thread.
+     * @return {@code true} if the thread is still alive, otherwise {@code 
false}.
+     */
+    public static boolean stillAlive(final Thread thread, final long millis) {
+        requireNonNull(thread);
+        try {
+            thread.join(millis);
+        } catch (final InterruptedException e) { }
+        return thread.isAlive();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
 
b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
index 3cbe894..f9c8a03 100644
--- 
a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
+++ 
b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
@@ -35,6 +35,7 @@ import org.apache.rya.streams.api.interactor.LoadStatements;
 import org.apache.rya.streams.kafka.KafkaStreamsFactory;
 import org.apache.rya.streams.kafka.KafkaTopics;
 import org.apache.rya.streams.kafka.SingleThreadKafkaStreamsFactory;
+import org.apache.rya.streams.kafka.interactor.CreateKafkaTopic;
 import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements;
 import 
org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
 import 
org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
@@ -119,9 +120,10 @@ public class LocalQueryExecutorIT {
         expected.add(new VisibilityBindingSet(bs, "a"));
 
         // Start the executor that will be tested.
+        final CreateKafkaTopic createKafkaTopic = new CreateKafkaTopic( 
kafka.getZookeeperServers() );
         final String kafkaServers = kafka.getKafkaHostname() + ":" + 
kafka.getKafkaPort();
         final KafkaStreamsFactory jobFactory = new 
SingleThreadKafkaStreamsFactory(kafkaServers);
-        final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+        final QueryExecutor executor = new 
LocalQueryExecutor(createKafkaTopic, jobFactory);
         executor.startAndWait();
         try {
             // Start the query.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java
 
b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java
index 0df5794..c0f888e 100644
--- 
a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java
+++ 
b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java
@@ -32,6 +32,7 @@ import java.util.UUID;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.rya.streams.api.entity.StreamsQuery;
 import org.apache.rya.streams.kafka.KafkaStreamsFactory;
+import org.apache.rya.streams.kafka.interactor.CreateKafkaTopic;
 import org.apache.rya.streams.querymanager.QueryExecutor;
 import org.junit.Test;
 
@@ -44,7 +45,7 @@ public class LocalQueryExecutorTest {
 
     @Test(expected = IllegalStateException.class)
     public void startQuery_serviceNotStarted() throws Exception {
-        final QueryExecutor executor = new 
LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+        final QueryExecutor executor = new 
LocalQueryExecutor(mock(CreateKafkaTopic.class), 
mock(KafkaStreamsFactory.class));
         executor.startQuery("rya", new StreamsQuery(UUID.randomUUID(), 
"query", true));
     }
 
@@ -60,7 +61,7 @@ public class LocalQueryExecutorTest {
         when(jobFactory.make(eq(ryaInstance), eq(query))).thenReturn(queryJob);
 
         // Start the executor that will be tested.
-        final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+        final QueryExecutor executor = new 
LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
         executor.startAndWait();
         try {
             // Tell the executor to start the query.
@@ -75,14 +76,14 @@ public class LocalQueryExecutorTest {
 
     @Test(expected = IllegalStateException.class)
     public void stopQuery_serviceNotStarted() throws Exception {
-        final QueryExecutor executor = new 
LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+        final QueryExecutor executor = new 
LocalQueryExecutor(mock(CreateKafkaTopic.class), 
mock(KafkaStreamsFactory.class));
         executor.stopQuery(UUID.randomUUID());
     }
 
     @Test
     public void stopQuery_queryNotRunning() throws Exception {
         // Start an executor.
-        final QueryExecutor executor = new 
LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+        final QueryExecutor executor = new 
LocalQueryExecutor(mock(CreateKafkaTopic.class), 
mock(KafkaStreamsFactory.class));
         executor.startAndWait();
         try {
             // Try to stop a query that was never stareted.
@@ -104,7 +105,7 @@ public class LocalQueryExecutorTest {
         when(jobFactory.make(eq(ryaInstance), eq(query))).thenReturn(queryJob);
 
         // Start the executor that will be tested.
-        final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+        final QueryExecutor executor = new 
LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
         executor.startAndWait();
         try {
             // Tell the executor to start the query.
@@ -122,7 +123,7 @@ public class LocalQueryExecutorTest {
 
     @Test(expected = IllegalStateException.class)
     public void stopAll_serviceNotStarted() throws Exception {
-        final QueryExecutor executor = new 
LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+        final QueryExecutor executor = new 
LocalQueryExecutor(mock(CreateKafkaTopic.class), 
mock(KafkaStreamsFactory.class));
         executor.stopAll("rya");
     }
 
@@ -141,7 +142,7 @@ public class LocalQueryExecutorTest {
         when(jobFactory.make(eq(ryaInstance), 
eq(query2))).thenReturn(queryJob2);
 
         // Start the executor that will be tested.
-        final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+        final QueryExecutor executor = new 
LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
         executor.startAndWait();
         try {
             // Tell the executor to start the queries.
@@ -180,7 +181,7 @@ public class LocalQueryExecutorTest {
         when(jobFactory.make(eq(ryaInstance2), 
eq(query2))).thenReturn(queryJob2);
 
         // Start the executor that will be tested.
-        final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+        final QueryExecutor executor = new 
LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
         executor.startAndWait();
         try {
             // Tell the executor to start the queries.
@@ -205,14 +206,14 @@ public class LocalQueryExecutorTest {
 
     @Test(expected = IllegalStateException.class)
     public void getRunningQueryIds_serviceNotStarted() throws Exception {
-        final QueryExecutor executor = new 
LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+        final QueryExecutor executor = new 
LocalQueryExecutor(mock(CreateKafkaTopic.class), 
mock(KafkaStreamsFactory.class));
         executor.getRunningQueryIds();
     }
 
     @Test
     public void getRunningQueryIds_noneStarted() throws Exception {
         // Start an executor.
-        final QueryExecutor executor = new 
LocalQueryExecutor(mock(KafkaStreamsFactory.class));
+        final QueryExecutor executor = new 
LocalQueryExecutor(mock(CreateKafkaTopic.class), 
mock(KafkaStreamsFactory.class));
         executor.startAndWait();
         try {
             // Get the list of running queries.
@@ -240,7 +241,7 @@ public class LocalQueryExecutorTest {
         when(jobFactory.make(eq(ryaInstance), 
eq(query3))).thenReturn(mock(KafkaStreams.class));
 
         // Start the executor that will be tested.
-        final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+        final QueryExecutor executor = new 
LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
         executor.startAndWait();
         try {
             // Start the queries.
@@ -275,7 +276,7 @@ public class LocalQueryExecutorTest {
         when(jobFactory.make(eq(ryaInstance), 
eq(query3))).thenReturn(mock(KafkaStreams.class));
 
         // Start the executor that will be tested.
-        final QueryExecutor executor = new LocalQueryExecutor(jobFactory);
+        final QueryExecutor executor = new 
LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
         executor.startAndWait();
         try {
             // Start the queries.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java
 
b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java
index f2b50ab..de6b9f3 100644
--- 
a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java
+++ 
b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/xml/QueryManagerConfigMarshallerTest.java
@@ -45,6 +45,11 @@ public class QueryManagerConfigMarshallerTest {
                 "            <port>6</port>\n" +
                 "        </kafka>\n" +
                 "    </queryChangeLogSource>\n" +
+                "    <queryExecutor>\n" +
+                "        <localKafkaStreams>\n" +
+                "            <zookeepers>zoo1,zoo2,zoo3</zookeepers>\n" +
+                "        </localKafkaStreams>\n" +
+                "    </queryExecutor>\n" +
                 "    <performanceTunning>\n" +
                 "        <queryChanngeLogDiscoveryPeriod>\n" +
                 "            <value>1</value>\n" +

Reply via email to