Rya 452 Updated QueryRepository

Updated QueryRepository to be a Service
Updated InMemoryQueryRepository to be an AbstractScheduledService
Added listeners to InMemoryQueryRepository

Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/36af1153
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/36af1153
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/36af1153

Branch: refs/heads/master
Commit: 36af1153758c943e08808b720adece86278de41f
Parents: eb07bf6
Author: Andrew Smith <smith...@gmail.com>
Authored: Tue Jan 23 15:20:50 2018 -0500
Committer: Valiyil <puja.vali...@parsons.com>
Committed: Fri Mar 9 12:59:37 2018 -0500

----------------------------------------------------------------------
 .../api/queries/InMemoryQueryRepository.java    |  90 ++++++++-
 .../api/queries/QueryChangeLogListener.java     |  41 ++++
 .../streams/api/queries/QueryRepository.java    |  38 +++-
 .../queries/InMemoryQueryRepositoryTest.java    | 194 ++++++++++++++-----
 .../streams/client/command/AddQueryCommand.java |   8 +-
 .../client/command/DeleteQueryCommand.java      |   7 +-
 .../client/command/ListQueriesCommand.java      |   7 +-
 .../streams/client/command/RunQueryCommand.java |   9 +-
 .../client/command/StreamResultsCommand.java    |   7 +-
 .../client/command/AddQueryCommandIT.java       |  11 +-
 .../client/command/DeleteQueryCommandIT.java    |  11 +-
 .../client/command/ListQueryCommandIT.java      |  11 +-
 .../client/command/RunQueryCommandIT.java       |   5 +-
 .../kafka/interactor/KafkaRunQueryIT.java       |   4 +-
 14 files changed, 349 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
 
b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
index f4b7b25..dca040f 100644
--- 
a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
+++ 
b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
@@ -20,7 +20,9 @@ package org.apache.rya.streams.api.queries;
 
 import static java.util.Objects.requireNonNull;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -33,6 +35,8 @@ import 
org.apache.rya.streams.api.queries.QueryChangeLog.QueryChangeLogException
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.util.concurrent.AbstractScheduledService;
+
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
 import info.aduna.iteration.CloseableIteration;
@@ -46,7 +50,7 @@ import info.aduna.iteration.CloseableIteration;
  * Thread safe.
  */
 @DefaultAnnotation(NonNull.class)
-public class InMemoryQueryRepository implements QueryRepository {
+public class InMemoryQueryRepository extends AbstractScheduledService 
implements QueryRepository {
     private static final Logger LOG = 
LoggerFactory.getLogger(InMemoryQueryRepository.class);
 
     private final ReentrantLock lock = new ReentrantLock();
@@ -67,20 +71,34 @@ public class InMemoryQueryRepository implements 
QueryRepository {
     private final Map<UUID, StreamsQuery> queriesCache = new HashMap<>();
 
     /**
+     * The listeners to be notified when new QueryChangeLogs come in.
+     */
+    private final List<QueryChangeLogListener> listeners = new ArrayList<>();
+
+    /**
+     * The {@link Scheduler} the repository uses to periodically poll for 
query updates.
+     */
+    private final Scheduler scheduler;
+
+    /**
      * Constructs an instance of {@link InMemoryQueryRepository}.
      *
      * @param changeLog - The change log that this repository will maintain 
and be based on. (not null)
+     * @param scheduler - The {@link Scheduler} this service uses to 
periodically check for query updates. (not null)
      */
-    public InMemoryQueryRepository(final QueryChangeLog changeLog) {
+    public InMemoryQueryRepository(final QueryChangeLog changeLog, final 
Scheduler scheduler) {
         this.changeLog = requireNonNull(changeLog);
+        this.scheduler = requireNonNull(scheduler);
     }
 
     @Override
-    public StreamsQuery add(final String query, final boolean isActive) throws 
QueryRepositoryException {
+    public StreamsQuery add(final String query, final boolean isActive)
+            throws QueryRepositoryException, IllegalStateException {
         requireNonNull(query);
 
         lock.lock();
         try {
+            checkState();
             // First record the change to the log.
             final UUID queryId = UUID.randomUUID();
             final QueryChange change = QueryChange.create(queryId, query, 
isActive);
@@ -100,11 +118,12 @@ public class InMemoryQueryRepository implements 
QueryRepository {
     }
 
     @Override
-    public Optional<StreamsQuery> get(final UUID queryId) throws 
QueryRepositoryException {
+    public Optional<StreamsQuery> get(final UUID queryId) throws 
QueryRepositoryException, IllegalStateException {
         requireNonNull(queryId);
 
         lock.lock();
         try {
+            checkState();
             // Update the cache to represent what is currently in the log.
             updateCache();
 
@@ -115,11 +134,13 @@ public class InMemoryQueryRepository implements 
QueryRepository {
     }
 
     @Override
-    public void updateIsActive(final UUID queryId, final boolean isActive) 
throws QueryRepositoryException {
+    public void updateIsActive(final UUID queryId, final boolean isActive)
+            throws QueryRepositoryException, IllegalStateException {
         requireNonNull(queryId);
 
         lock.lock();
         try {
+            checkState();
             // Update the cache to represent what is currently in the log.
             updateCache();
 
@@ -140,11 +161,12 @@ public class InMemoryQueryRepository implements 
QueryRepository {
     }
 
     @Override
-    public void delete(final UUID queryId) throws QueryRepositoryException {
+    public void delete(final UUID queryId) throws QueryRepositoryException, 
IllegalStateException {
         requireNonNull(queryId);
 
         lock.lock();
         try {
+            checkState();
             // First record the change to the log.
             final QueryChange change = QueryChange.delete(queryId);
             changeLog.write(change);
@@ -157,9 +179,10 @@ public class InMemoryQueryRepository implements 
QueryRepository {
     }
 
     @Override
-    public Set<StreamsQuery> list() throws QueryRepositoryException {
+    public Set<StreamsQuery> list() throws QueryRepositoryException, 
IllegalStateException {
         lock.lock();
         try {
+            checkState();
             // Update the cache to represent what is currently in the log.
             updateCache();
 
@@ -174,7 +197,8 @@ public class InMemoryQueryRepository implements 
QueryRepository {
     }
 
     @Override
-    public void close() throws Exception {
+    protected void shutDown() throws Exception {
+        super.shutDown();
         lock.lock();
         try {
             changeLog.close();
@@ -229,6 +253,8 @@ public class InMemoryQueryRepository implements 
QueryRepository {
                         break;
                 }
 
+                listeners.forEach(listener -> listener.notify(entry));
+
                 cachePosition = Optional.of( entry.getPosition() );
             }
 
@@ -247,4 +273,52 @@ public class InMemoryQueryRepository implements 
QueryRepository {
             }
         }
     }
+
+    @Override
+    protected void runOneIteration() throws Exception {
+        lock.lock();
+        try {
+            updateCache();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    protected Scheduler scheduler() {
+        return scheduler;
+    }
+
+    @Override
+    public Set<StreamsQuery> subscribe(final QueryChangeLogListener listener) {
+        //locks to prevent the current state from changing while subscribing.
+        lock.lock();
+        try {
+            listeners.add(listener);
+
+            //return the current state of the query repository
+            return queriesCache.values()
+                    .stream()
+                    .collect(Collectors.toSet());
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public void unsubscribe(final QueryChangeLogListener listener) {
+        lock.lock();
+        try {
+            listeners.remove(listener);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void checkState() {
+        if (!super.isRunning() && !listeners.isEmpty()) {
+            throw new IllegalStateException(
+                    "The Query Repository is subscribed to, but the service 
has not been started.");
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLogListener.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLogListener.java
 
b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLogListener.java
new file mode 100644
index 0000000..2b61227
--- /dev/null
+++ 
b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLogListener.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.api.queries;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Listener to be notified when {@link QueryChange}s occur on a {@link 
QueryChangeLog}.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface QueryChangeLogListener {
+    /**
+     * Notifies the listener that a query change event has occurred in the 
change log.
+     * <p>
+     * <b>Note:</b>
+     * <p>
+     * The QueryRepository blocks when notifying this listener.  Long lasting 
operations
+     * should not be performed within this function.  Doing so will block all 
operations
+     * on the repository.
+     *
+     * @param queryChangeEvent - The event that occurred. (not null)
+     */
+    public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
 
b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
index fd51b2f..4d8b2db 100644
--- 
a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
+++ 
b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java
@@ -25,14 +25,20 @@ import java.util.UUID;
 import org.apache.rya.streams.api.entity.StreamsQuery;
 import org.apache.rya.streams.api.exception.RyaStreamsException;
 
+import com.google.common.util.concurrent.Service;
+
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
 
 /**
  * Repository for adding, deleting, and listing active queries in Rya Streams.
+ *
+ * This service only needs to be started if it is being subscribed to. An
+ * {@link IllegalStateException} will be thrown if the service is subscribed to
+ * and used without being started.
  */
 @DefaultAnnotation(NonNull.class)
-public interface QueryRepository extends AutoCloseable {
+public interface QueryRepository extends Service {
 
     /**
      * Adds a new query to Rya Streams.
@@ -42,8 +48,9 @@ public interface QueryRepository extends AutoCloseable {
      *   otherwise {@code false}.
      * @return The {@link StreamsQuery} used in Rya Streams.
      * @throws QueryRepositoryException Could not add the query.
+     * @throws IllegalStateException The Service has not been started, but has 
been subscribed to.
      */
-    public StreamsQuery add(final String query, boolean isActive) throws 
QueryRepositoryException;
+    public StreamsQuery add(final String query, boolean isActive) throws 
QueryRepositoryException, IllegalStateException;
 
     /**
      * Updates the isActive state of a {@link StreamsQuery}. Setting this 
value to {@code true}
@@ -53,8 +60,9 @@ public interface QueryRepository extends AutoCloseable {
      * @param queryId - Identifies which query will be updated. (not null)
      * @param isActive - The new isActive state for the query.
      * @throws QueryRepositoryException If the query does not exist or 
something else caused the change to fail.
+     * @throws IllegalStateException The Service has not been started, but has 
been subscribed to.
      */
-    public void updateIsActive(UUID queryId, boolean isActive) throws 
QueryRepositoryException;
+    public void updateIsActive(UUID queryId, boolean isActive) throws 
QueryRepositoryException, IllegalStateException;
 
     /**
      * Get an existing query from Rya Streams.
@@ -62,24 +70,42 @@ public interface QueryRepository extends AutoCloseable {
      * @param queryId - Identifies which query will be fetched.
      * @return the {@link StreamsQuery} for the id if one exists; otherwise 
empty.
      * @throws QueryRepositoryException The query could not be fetched.
+     * @throws IllegalStateException The Service has not been started, but has 
been subscribed to.
      */
-    public Optional<StreamsQuery> get(UUID queryId) throws 
QueryRepositoryException;
+    public Optional<StreamsQuery> get(UUID queryId) throws 
QueryRepositoryException, IllegalStateException;
 
     /**
      * Removes an existing query from Rya Streams.
      *
      * @param queryID - The {@link UUID} of the query to remove. (not null)
      * @throws QueryRepositoryException Could not delete the query.
+     * @throws IllegalStateException The Service has not been started, but has 
been subscribed to.
      */
-    public void delete(UUID queryID) throws QueryRepositoryException;
+    public void delete(UUID queryID) throws QueryRepositoryException, 
IllegalStateException;
 
     /**
      * Lists all existing queries in Rya Streams.
      *
      * @return - A List of the current {@link StreamsQuery}s
      * @throws QueryRepositoryException The {@link StreamsQuery}s could not be 
listed.
+     * @throws IllegalStateException The Service has not been started, but has 
been subscribed to.
+     */
+    public Set<StreamsQuery> list() throws QueryRepositoryException, 
IllegalStateException;
+
+    /**
+     * Subscribes a {@link QueryChangeLogListener} to the {@link 
QueryRepository}.
+     *
+     * @param listener - The {@link QueryChangeLogListener} to subscribe to 
this {@link QueryRepository}. (not null)
+     * @return The current state of the repository in the form of {@link 
StreamsQuery}s.
+     */
+    public Set<StreamsQuery> subscribe(final QueryChangeLogListener listener);
+
+    /**
+     * Unsubscribe a {@link QueryChangeLogListener} from the {@link 
QueryRepository}.
+     *
+     * @param listener - The {@link QueryChangeLogListener} to unsubscribe 
from this {@link QueryRepository}. (not null)
      */
-    public Set<StreamsQuery> list() throws QueryRepositoryException;
+    public void unsubscribe(final QueryChangeLogListener listener);
 
     /**
      * A function of {@link QueryRepository} was unable to perform a function.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
 
b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
index 22e616d..76c3216 100644
--- 
a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
+++ 
b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
@@ -20,6 +20,7 @@ package org.apache.rya.streams.api.queries;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -27,56 +28,62 @@ import java.util.HashSet;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.rya.streams.api.entity.StreamsQuery;
 import 
org.apache.rya.streams.api.queries.QueryChangeLog.QueryChangeLogException;
 import org.junit.Test;
 
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+
 /**
  * Unit tests the methods of {@link InMemoryQueryRepository}.
  */
 public class InMemoryQueryRepositoryTest {
+    private static final Scheduler SCHEDULE = 
Scheduler.newFixedRateSchedule(0L, 100, TimeUnit.MILLISECONDS);
 
     @Test
     public void canReadAddedQueries() throws Exception {
         // Setup a totally in memory QueryRepository.
-        try(final QueryRepository queries = new InMemoryQueryRepository( new 
InMemoryQueryChangeLog() )) {
-            // Add some queries to it.
-            final Set<StreamsQuery> expected = new HashSet<>();
-            expected.add( queries.add("query 1", true) );
-            expected.add( queries.add("query 2", false) );
-            expected.add( queries.add("query 3", true) );
-
-            // Show they are in the list of all queries.
-            final Set<StreamsQuery> stored = queries.list();
-            assertEquals(expected, stored);
-        }
+        final QueryRepository queries = new InMemoryQueryRepository( new 
InMemoryQueryChangeLog(), SCHEDULE );
+        // Add some queries to it.
+        final Set<StreamsQuery> expected = new HashSet<>();
+        expected.add( queries.add("query 1", true) );
+        expected.add( queries.add("query 2", false) );
+        expected.add( queries.add("query 3", true) );
+
+        // Show they are in the list of all queries.
+        final Set<StreamsQuery> stored = queries.list();
+        assertEquals(expected, stored);
     }
 
     @Test
     public void deletedQueriesDisappear() throws Exception {
         // Setup a totally in memory QueryRepository.
-        try(final QueryRepository queries = new InMemoryQueryRepository( new 
InMemoryQueryChangeLog() )) {
-            // Add some queries to it. The second one we will delete.
-            final Set<StreamsQuery> expected = new HashSet<>();
-            expected.add( queries.add("query 1", true) );
-            final UUID deletedMeId = queries.add("query 2", 
false).getQueryId();
-            expected.add( queries.add("query 3", true) );
-
-            // Delete the second query.
-            queries.delete( deletedMeId );
-
-            // Show only queries 1 and 3 are in the list.
-            final Set<StreamsQuery> stored = queries.list();
-            assertEquals(expected, stored);
-        }
+        final QueryRepository queries = new InMemoryQueryRepository( new 
InMemoryQueryChangeLog(), SCHEDULE );
+        // Add some queries to it. The second one we will delete.
+        final Set<StreamsQuery> expected = new HashSet<>();
+        expected.add( queries.add("query 1", true) );
+        final UUID deletedMeId = queries.add("query 2", false).getQueryId();
+        expected.add( queries.add("query 3", true) );
+
+        // Delete the second query.
+        queries.delete( deletedMeId );
+
+        // Show only queries 1 and 3 are in the list.
+        final Set<StreamsQuery> stored = queries.list();
+        assertEquals(expected, stored);
     }
 
     @Test
     public void initializedWithPopulatedChangeLog() throws Exception {
         // Setup a totally in memory QueryRepository. Hold onto the change log 
so that we can use it again later.
         final QueryChangeLog changeLog = new InMemoryQueryChangeLog();
-        try(final QueryRepository queries = new InMemoryQueryRepository( 
changeLog )) {
+        final QueryRepository queries = new InMemoryQueryRepository( 
changeLog, SCHEDULE );
+        try {
+            queries.startAndWait();
             // Add some queries and deletes to it.
             final Set<StreamsQuery> expected = new HashSet<>();
             expected.add( queries.add("query 1", true) );
@@ -85,11 +92,16 @@ public class InMemoryQueryRepositoryTest {
             queries.delete( deletedMeId );
 
             // Create a new totally in memory QueryRepository.
-            try(final QueryRepository initializedQueries = new 
InMemoryQueryRepository( changeLog )) {
+            final QueryRepository initializedQueries = new 
InMemoryQueryRepository( changeLog, SCHEDULE );
+            try {
                 // Listing the queries should work using an initialized change 
log.
                 final Set<StreamsQuery> stored = initializedQueries.list();
                 assertEquals(expected, stored);
+            } finally {
+                queries.stop();
             }
+        } finally {
+            queries.stop();
         }
     }
 
@@ -100,50 +112,132 @@ public class InMemoryQueryRepositoryTest {
         when(changeLog.readFromStart()).thenThrow(new 
QueryChangeLogException("Mocked exception."));
 
         // Create the QueryRepository and invoke one of the methods.
-        try(final QueryRepository queries = new InMemoryQueryRepository( 
changeLog )) {
-            queries.list();
-        }
+        final QueryRepository queries = new InMemoryQueryRepository( 
changeLog, SCHEDULE );
+        queries.list();
     }
 
     @Test
     public void get_present() throws Exception {
         // Setup a totally in memory QueryRepository.
-        try(final QueryRepository queries = new InMemoryQueryRepository( new 
InMemoryQueryChangeLog() )) {
-            // Add a query to it.
-            final StreamsQuery query = queries.add("query 1", true);
+        final QueryRepository queries = new InMemoryQueryRepository( new 
InMemoryQueryChangeLog(), SCHEDULE );
+        // Add a query to it.
+        final StreamsQuery query = queries.add("query 1", true);
 
-            // Show the fetched query matches the expected ones.
-            final Optional<StreamsQuery> fetched = 
queries.get(query.getQueryId());
-            assertEquals(query, fetched.get());
-        }
+        // Show the fetched query matches the expected ones.
+        final Optional<StreamsQuery> fetched = queries.get(query.getQueryId());
+        assertEquals(query, fetched.get());
     }
 
     @Test
     public void get_notPresent() throws Exception {
         // Setup a totally in memory QueryRepository.
-        try(final QueryRepository queries = new InMemoryQueryRepository( new 
InMemoryQueryChangeLog() )) {
-            // Fetch a query that was never added to the repository.
-            final Optional<StreamsQuery> query = 
queries.get(UUID.randomUUID());
+        final QueryRepository queries = new InMemoryQueryRepository( new 
InMemoryQueryChangeLog(), SCHEDULE );
+        // Fetch a query that was never added to the repository.
+        final Optional<StreamsQuery> query = queries.get(UUID.randomUUID());
 
-            // Show it could not be found.
-            assertFalse(query.isPresent());
-        }
+        // Show it could not be found.
+        assertFalse(query.isPresent());
     }
 
     @Test
     public void update() throws Exception {
         // Setup a totally in memory QueryRepository.
-        try(final QueryRepository queries = new InMemoryQueryRepository( new 
InMemoryQueryChangeLog() )) {
+        final QueryRepository queries = new InMemoryQueryRepository( new 
InMemoryQueryChangeLog(), SCHEDULE );
+        // Add a query to it.
+        final StreamsQuery query = queries.add("query 1", true);
+
+        // Change the isActive state of that query.
+        queries.updateIsActive(query.getQueryId(), false);
+
+        // Show the fetched query matches the expected one.
+        final Optional<StreamsQuery> fetched = queries.get(query.getQueryId());
+        final StreamsQuery expected = new StreamsQuery(query.getQueryId(), 
query.getSparql(), false);
+        assertEquals(expected, fetched.get());
+    }
+
+    @Test
+    public void updateListenerNotify() throws Exception {
+        // Setup a totally in memory QueryRepository.
+        final QueryRepository queries = new InMemoryQueryRepository( new 
InMemoryQueryChangeLog(), SCHEDULE );
+        try {
+            queries.startAndWait();
+
             // Add a query to it.
             final StreamsQuery query = queries.add("query 1", true);
 
-            // Change the isActive state of that query.
-            queries.updateIsActive(query.getQueryId(), false);
+            final Set<StreamsQuery> existing = queries.subscribe(new 
QueryChangeLogListener() {
+                @Override
+                public void notify(final ChangeLogEntry<QueryChange> 
queryChangeEvent) {
+                    final ChangeLogEntry<QueryChange> expected = new 
ChangeLogEntry<QueryChange>(1L,
+                            
QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
+                    assertEquals(expected, queryChangeEvent);
+                }
+            });
+
+            assertEquals(Sets.newHashSet(query), existing);
+
+            queries.add("query 2", true);
+        } finally {
+            queries.stop();
+        }
+    }
 
-            // Show the fetched query matches the expected one.
-            final Optional<StreamsQuery> fetched = 
queries.get(query.getQueryId());
-            final StreamsQuery expected = new StreamsQuery(query.getQueryId(), 
query.getSparql(), false);
-            assertEquals(expected, fetched.get());
+    @Test
+    public void updateListenerNotify_multiClient() throws Exception {
+        // Setup a totally in memory QueryRepository.
+        final QueryChangeLog changeLog = new InMemoryQueryChangeLog();
+        final QueryRepository queries = new InMemoryQueryRepository( 
changeLog, SCHEDULE );
+        final QueryRepository queries2 = new InMemoryQueryRepository( 
changeLog, SCHEDULE );
+
+        try {
+            queries.startAndWait();
+            queries2.startAndWait();
+
+            //show listener on repo that query was added to is being notified 
of the new query.
+            final CountDownLatch repo1Latch = new CountDownLatch(1);
+            queries.subscribe(new QueryChangeLogListener() {
+                @Override
+                public void notify(final ChangeLogEntry<QueryChange> 
queryChangeEvent) {
+                    final ChangeLogEntry<QueryChange> expected = new 
ChangeLogEntry<QueryChange>(0L,
+                            
QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
+                    assertEquals(expected, queryChangeEvent);
+                    repo1Latch.countDown();
+                }
+            });
+
+            //show listener not on the repo that query was added to is being 
notified as well.
+            final CountDownLatch repo2Latch = new CountDownLatch(1);
+            queries2.subscribe(new QueryChangeLogListener() {
+                @Override
+                public void notify(final ChangeLogEntry<QueryChange> 
queryChangeEvent) {
+                    final ChangeLogEntry<QueryChange> expected = new 
ChangeLogEntry<QueryChange>(0L,
+                            
QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true));
+                    assertEquals(expected, queryChangeEvent);
+                    repo2Latch.countDown();
+                }
+            });
+
+            queries.add("query 2", true);
+
+            assertTrue(repo1Latch.await(5, TimeUnit.SECONDS));
+            assertTrue(repo2Latch.await(5, TimeUnit.SECONDS));
+        } catch(final InterruptedException e ) {
+            System.out.println("PING");
+        } finally {
+            queries.stop();
+            queries2.stop();
         }
     }
+
+    @Test(expected = IllegalStateException.class)
+    public void subscribe_notStarted() throws Exception {
+        // Setup a totally in memory QueryRepository.
+        final QueryRepository queries = new InMemoryQueryRepository(new 
InMemoryQueryChangeLog(), SCHEDULE);
+        queries.subscribe(new QueryChangeLogListener() {
+            @Override
+            public void notify(final ChangeLogEntry<QueryChange> 
queryChangeEvent) {}
+        });
+
+        queries.add("query 2", true);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
 
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
index 275a975..9273c33 100644
--- 
a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
+++ 
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java
@@ -20,6 +20,8 @@ package org.apache.rya.streams.client.command;
 
 import static java.util.Objects.requireNonNull;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.rya.streams.api.entity.StreamsQuery;
 import org.apache.rya.streams.api.exception.RyaStreamsException;
 import org.apache.rya.streams.api.interactor.AddQuery;
@@ -35,6 +37,7 @@ import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
 import com.google.common.base.Strings;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
@@ -115,8 +118,11 @@ public class AddQueryCommand implements RyaStreamsCommand {
         final String topic = 
KafkaTopics.queryChangeLogTopic(params.ryaInstance);
         final QueryChangeLog queryChangeLog = 
KafkaQueryChangeLogFactory.make(bootstrapServers, topic);
 
+        //The AddQuery command doesn't use the scheduled service feature.
+        final Scheduler scheduler = Scheduler.newFixedRateSchedule(0L, 5, 
TimeUnit.SECONDS);
+        final QueryRepository queryRepo = new 
InMemoryQueryRepository(queryChangeLog, scheduler);
         // Execute the add query command.
-        try(QueryRepository queryRepo = new 
InMemoryQueryRepository(queryChangeLog)) {
+        try {
             final AddQuery addQuery = new DefaultAddQuery(queryRepo);
             try {
                 final StreamsQuery query = addQuery.addQuery(params.query, 
Boolean.parseBoolean(params.isActive));

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java
 
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java
index 2aeb90c..0d96df0 100644
--- 
a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java
+++ 
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java
@@ -21,6 +21,7 @@ package org.apache.rya.streams.client.command;
 import static java.util.Objects.requireNonNull;
 
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.rya.streams.api.exception.RyaStreamsException;
 import org.apache.rya.streams.api.interactor.DeleteQuery;
@@ -36,6 +37,7 @@ import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
 import com.google.common.base.Strings;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
@@ -113,8 +115,11 @@ public class DeleteQueryCommand implements 
RyaStreamsCommand {
         final String topic = 
KafkaTopics.queryChangeLogTopic(params.ryaInstance);
         final QueryChangeLog queryChangeLog = 
KafkaQueryChangeLogFactory.make(bootstrapServers, topic);
 
+        //The DeleteQuery command doesn't use the scheduled service feature.
+        final Scheduler scheduler = Scheduler.newFixedRateSchedule(0L, 5, 
TimeUnit.SECONDS);
+        final QueryRepository queryRepo = new 
InMemoryQueryRepository(queryChangeLog, scheduler);
         // Execute the delete query command.
-        try(QueryRepository queryRepo = new 
InMemoryQueryRepository(queryChangeLog)) {
+        try {
             final DeleteQuery deleteQuery = new DefaultDeleteQuery(queryRepo);
             try {
                 deleteQuery.delete(UUID.fromString(params.queryId));

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
 
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
index 670007b..cd78975 100644
--- 
a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
+++ 
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
@@ -21,6 +21,7 @@ package org.apache.rya.streams.client.command;
 import static java.util.Objects.requireNonNull;
 
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.rya.streams.api.entity.StreamsQuery;
 import org.apache.rya.streams.api.exception.RyaStreamsException;
@@ -35,6 +36,7 @@ import 
org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.ParameterException;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
@@ -83,8 +85,11 @@ public class ListQueriesCommand implements RyaStreamsCommand 
{
         final String topic = 
KafkaTopics.queryChangeLogTopic(params.ryaInstance);
         final QueryChangeLog queryChangeLog = 
KafkaQueryChangeLogFactory.make(bootstrapServers, topic);
 
+        //The ListQueries command doesn't use the scheduled service feature.
+        final Scheduler scheduler = Scheduler.newFixedRateSchedule(0L, 5, 
TimeUnit.SECONDS);
+        final QueryRepository queryRepo = new 
InMemoryQueryRepository(queryChangeLog, scheduler);
         // Execute the list queries command.
-        try(QueryRepository queryRepo = new 
InMemoryQueryRepository(queryChangeLog)) {
+        try {
             final ListQueries listQueries = new DefaultListQueries(queryRepo);
             try {
                 final Set<StreamsQuery> queries = listQueries.all();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
 
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
index 8f7f162..ddaf647 100644
--- 
a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
+++ 
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.rya.streams.api.entity.StreamsQuery;
 import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
@@ -39,6 +40,7 @@ import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
 import com.google.common.base.Strings;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
@@ -117,8 +119,11 @@ public class RunQueryCommand implements RyaStreamsCommand {
         final String topic = 
KafkaTopics.queryChangeLogTopic(params.ryaInstance);
         final QueryChangeLog queryChangeLog = 
KafkaQueryChangeLogFactory.make(bootstrapServers, topic);
 
+        //The RunQuery command doesn't use the scheduled service feature.
+        final Scheduler scheduler = Scheduler.newFixedRateSchedule(0L, 5, 
TimeUnit.SECONDS);
+        final QueryRepository queryRepo = new 
InMemoryQueryRepository(queryChangeLog, scheduler);
         // Look up the query to be executed from the change log.
-        try(QueryRepository queryRepo = new 
InMemoryQueryRepository(queryChangeLog)) {
+        try {
             try {
                 final UUID queryId = UUID.fromString( params.queryId );
                 final Optional<StreamsQuery> query = queryRepo.get(queryId);
@@ -145,7 +150,7 @@ public class RunQueryCommand implements RyaStreamsCommand {
             } catch(final Exception e) {
                 throw new ExecutionException("Could not execute the Run Query 
command.", e);
             }
-        } catch(final ArgumentsException | ExecutionException e) {
+        } catch(final ExecutionException e) {
             // Rethrow the exceptions that are advertised by execute.
             throw e;
         } catch (final Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
 
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
index 7c548f1..3612dd0 100644
--- 
a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
+++ 
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java
@@ -22,6 +22,7 @@ import static java.util.Objects.requireNonNull;
 
 import java.util.Optional;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.rya.streams.api.entity.QueryResultStream;
@@ -45,6 +46,7 @@ import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
 import com.google.common.base.Strings;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
@@ -132,9 +134,12 @@ public class StreamResultsCommand implements 
RyaStreamsCommand {
             throw new ArgumentsException("Invalid Query ID " + params.queryId);
         }
 
+        //The DeleteQuery command doesn't use the scheduled service feature.
+        final Scheduler scheduler = Scheduler.newFixedRateSchedule(0L, 5, 
TimeUnit.SECONDS);
+        final QueryRepository queryRepo = new 
InMemoryQueryRepository(queryChangeLog, scheduler);
         // Fetch the SPARQL of the query whose results will be streamed.
         final String sparql;
-        try(QueryRepository queryRepo = new 
InMemoryQueryRepository(queryChangeLog)) {
+        try {
             final Optional<StreamsQuery> sQuery = queryRepo.get(queryId);
             if(!sQuery.isPresent()) {
                 throw new ExecutionException("Could not read the results for 
query with ID " + queryId +

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
 
b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
index 8b4f074..3bfbadc 100644
--- 
a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
+++ 
b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
 
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.producer.Producer;
@@ -38,11 +39,12 @@ import 
org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserialize
 import 
org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
 import org.apache.rya.test.kafka.KafkaTestUtil;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+
 /**
  * integration Test for adding a new query through a command.
  */
@@ -64,12 +66,7 @@ public class AddQueryCommandIT {
         final Producer<?, QueryChange> queryProducer = 
KafkaTestUtil.makeProducer(kafka, StringSerializer.class, 
QueryChangeSerializer.class);
         final Consumer<?, QueryChange>queryConsumer = 
KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, 
QueryChangeDeserializer.class);
         final QueryChangeLog changeLog = new 
KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
-        queryRepo = new InMemoryQueryRepository(changeLog);
-    }
-
-    @After
-    public void cleanup() throws Exception {
-        queryRepo.close();
+        queryRepo = new InMemoryQueryRepository(changeLog, 
Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
 
b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
index 6083543..7bec080 100644
--- 
a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
+++ 
b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertNotEquals;
 
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.producer.Producer;
@@ -39,11 +40,12 @@ import 
org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserialize
 import 
org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
 import org.apache.rya.test.kafka.KafkaTestUtil;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+
 /**
  * Integration Test for deleting a query from Rya Streams through a command.
  */
@@ -66,12 +68,7 @@ public class DeleteQueryCommandIT {
         final Producer<?, QueryChange> queryProducer = 
KafkaTestUtil.makeProducer(kafka, StringSerializer.class, 
QueryChangeSerializer.class);
         final Consumer<?, QueryChange>queryConsumer = 
KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, 
QueryChangeDeserializer.class);
         final QueryChangeLog changeLog = new 
KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
-        queryRepo = new InMemoryQueryRepository(changeLog);
-    }
-
-    @After
-    public void cleanup() throws Exception {
-        queryRepo.close();
+        queryRepo = new InMemoryQueryRepository(changeLog, 
Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
 
b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
index 1399142..f6ceb75 100644
--- 
a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
+++ 
b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java
@@ -19,6 +19,7 @@
 package org.apache.rya.streams.client.command;
 
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.producer.Producer;
@@ -34,11 +35,12 @@ import 
org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserialize
 import 
org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
 import org.apache.rya.test.kafka.KafkaTestUtil;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+
 /**
  * integration Test for listing queries through a command.
  */
@@ -60,12 +62,7 @@ public class ListQueryCommandIT {
         final Producer<?, QueryChange> queryProducer = 
KafkaTestUtil.makeProducer(kafka, StringSerializer.class, 
QueryChangeSerializer.class);
         final Consumer<?, QueryChange>queryConsumer = 
KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, 
QueryChangeDeserializer.class);
         final QueryChangeLog changeLog = new 
KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
-        queryRepo = new InMemoryQueryRepository(changeLog);
-    }
-
-    @After
-    public void cleanup() throws Exception {
-        queryRepo.close();
+        queryRepo = new InMemoryQueryRepository(changeLog, 
Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
 
b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
index 3389d6b..7e3b8bc 100644
--- 
a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
+++ 
b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.producer.Producer;
@@ -56,6 +57,7 @@ import org.openrdf.model.impl.ValueFactoryImpl;
 import org.openrdf.query.impl.MapBindingSet;
 
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
 
 /**
  * Integration tests the methods of {@link RunQueryCommand}.
@@ -81,7 +83,7 @@ public class RunQueryCommandIT {
         final Producer<?, QueryChange> queryProducer = 
KafkaTestUtil.makeProducer(kafka, StringSerializer.class, 
QueryChangeSerializer.class);
         final Consumer<?, QueryChange>queryConsumer = 
KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, 
QueryChangeDeserializer.class);
         final QueryChangeLog changeLog = new 
KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
-        queryRepo = new InMemoryQueryRepository(changeLog);
+        queryRepo = new InMemoryQueryRepository(changeLog, 
Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS));
 
         // Initialize the Statements Producer and the Results Consumer.
         stmtProducer = KafkaTestUtil.makeProducer(kafka, 
StringSerializer.class, VisibilityStatementSerializer.class);
@@ -92,7 +94,6 @@ public class RunQueryCommandIT {
     public void cleanup() throws Exception{
         stmtProducer.close();
         resultConsumer.close();
-        queryRepo.close();
     }
 
     @Test(expected = ExecutionException.class)

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
index 9a773f0..5dbd27f 100644
--- 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
+++ 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.producer.Producer;
@@ -52,6 +53,7 @@ import org.openrdf.model.impl.ValueFactoryImpl;
 import org.openrdf.query.impl.MapBindingSet;
 
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
 
 /**
  * Integration tests the methods of {@link KafkaRunQuery}.
@@ -83,7 +85,7 @@ public class KafkaRunQueryIT {
         final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
 
         // This query is completely in memory, so it doesn't need to be closed.
-        final QueryRepository queries = new InMemoryQueryRepository( new 
InMemoryQueryChangeLog() );
+        final QueryRepository queries = new InMemoryQueryRepository( new 
InMemoryQueryChangeLog(), Scheduler.newFixedRateSchedule(0L, 5, 
TimeUnit.SECONDS) );
 
         // Add the query to the query repository.
         final StreamsQuery sQuery = queries.add("SELECT * WHERE { ?person 
<urn:worksAt> ?business . }", true);

Reply via email to