Rya 452 Updated QueryRepository Updated QueryRepository to be a Service Updated InMemoryQueryRepository to be an AbstractScheduledService Added listeners to InMemoryQueryRepository
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/36af1153 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/36af1153 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/36af1153 Branch: refs/heads/master Commit: 36af1153758c943e08808b720adece86278de41f Parents: eb07bf6 Author: Andrew Smith <smith...@gmail.com> Authored: Tue Jan 23 15:20:50 2018 -0500 Committer: Valiyil <puja.vali...@parsons.com> Committed: Fri Mar 9 12:59:37 2018 -0500 ---------------------------------------------------------------------- .../api/queries/InMemoryQueryRepository.java | 90 ++++++++- .../api/queries/QueryChangeLogListener.java | 41 ++++ .../streams/api/queries/QueryRepository.java | 38 +++- .../queries/InMemoryQueryRepositoryTest.java | 194 ++++++++++++++----- .../streams/client/command/AddQueryCommand.java | 8 +- .../client/command/DeleteQueryCommand.java | 7 +- .../client/command/ListQueriesCommand.java | 7 +- .../streams/client/command/RunQueryCommand.java | 9 +- .../client/command/StreamResultsCommand.java | 7 +- .../client/command/AddQueryCommandIT.java | 11 +- .../client/command/DeleteQueryCommandIT.java | 11 +- .../client/command/ListQueryCommandIT.java | 11 +- .../client/command/RunQueryCommandIT.java | 5 +- .../kafka/interactor/KafkaRunQueryIT.java | 4 +- 14 files changed, 349 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java index f4b7b25..dca040f 100644 --- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java @@ -20,7 +20,9 @@ package org.apache.rya.streams.api.queries; import static java.util.Objects.requireNonNull; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -33,6 +35,8 @@ import org.apache.rya.streams.api.queries.QueryChangeLog.QueryChangeLogException import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.util.concurrent.AbstractScheduledService; + import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; import info.aduna.iteration.CloseableIteration; @@ -46,7 +50,7 @@ import info.aduna.iteration.CloseableIteration; * Thread safe. */ @DefaultAnnotation(NonNull.class) -public class InMemoryQueryRepository implements QueryRepository { +public class InMemoryQueryRepository extends AbstractScheduledService implements QueryRepository { private static final Logger LOG = LoggerFactory.getLogger(InMemoryQueryRepository.class); private final ReentrantLock lock = new ReentrantLock(); @@ -67,20 +71,34 @@ public class InMemoryQueryRepository implements QueryRepository { private final Map<UUID, StreamsQuery> queriesCache = new HashMap<>(); /** + * The listeners to be notified when new QueryChangeLogs come in. + */ + private final List<QueryChangeLogListener> listeners = new ArrayList<>(); + + /** + * The {@link Scheduler} the repository uses to periodically poll for query updates. + */ + private final Scheduler scheduler; + + /** * Constructs an instance of {@link InMemoryQueryRepository}. * * @param changeLog - The change log that this repository will maintain and be based on. (not null) + * @param scheduler - The {@link Scheduler} this service uses to periodically check for query updates. (not null) */ - public InMemoryQueryRepository(final QueryChangeLog changeLog) { + public InMemoryQueryRepository(final QueryChangeLog changeLog, final Scheduler scheduler) { this.changeLog = requireNonNull(changeLog); + this.scheduler = requireNonNull(scheduler); } @Override - public StreamsQuery add(final String query, final boolean isActive) throws QueryRepositoryException { + public StreamsQuery add(final String query, final boolean isActive) + throws QueryRepositoryException, IllegalStateException { requireNonNull(query); lock.lock(); try { + checkState(); // First record the change to the log. final UUID queryId = UUID.randomUUID(); final QueryChange change = QueryChange.create(queryId, query, isActive); @@ -100,11 +118,12 @@ public class InMemoryQueryRepository implements QueryRepository { } @Override - public Optional<StreamsQuery> get(final UUID queryId) throws QueryRepositoryException { + public Optional<StreamsQuery> get(final UUID queryId) throws QueryRepositoryException, IllegalStateException { requireNonNull(queryId); lock.lock(); try { + checkState(); // Update the cache to represent what is currently in the log. updateCache(); @@ -115,11 +134,13 @@ public class InMemoryQueryRepository implements QueryRepository { } @Override - public void updateIsActive(final UUID queryId, final boolean isActive) throws QueryRepositoryException { + public void updateIsActive(final UUID queryId, final boolean isActive) + throws QueryRepositoryException, IllegalStateException { requireNonNull(queryId); lock.lock(); try { + checkState(); // Update the cache to represent what is currently in the log. updateCache(); @@ -140,11 +161,12 @@ public class InMemoryQueryRepository implements QueryRepository { } @Override - public void delete(final UUID queryId) throws QueryRepositoryException { + public void delete(final UUID queryId) throws QueryRepositoryException, IllegalStateException { requireNonNull(queryId); lock.lock(); try { + checkState(); // First record the change to the log. final QueryChange change = QueryChange.delete(queryId); changeLog.write(change); @@ -157,9 +179,10 @@ public class InMemoryQueryRepository implements QueryRepository { } @Override - public Set<StreamsQuery> list() throws QueryRepositoryException { + public Set<StreamsQuery> list() throws QueryRepositoryException, IllegalStateException { lock.lock(); try { + checkState(); // Update the cache to represent what is currently in the log. updateCache(); @@ -174,7 +197,8 @@ public class InMemoryQueryRepository implements QueryRepository { } @Override - public void close() throws Exception { + protected void shutDown() throws Exception { + super.shutDown(); lock.lock(); try { changeLog.close(); @@ -229,6 +253,8 @@ public class InMemoryQueryRepository implements QueryRepository { break; } + listeners.forEach(listener -> listener.notify(entry)); + cachePosition = Optional.of( entry.getPosition() ); } @@ -247,4 +273,52 @@ public class InMemoryQueryRepository implements QueryRepository { } } } + + @Override + protected void runOneIteration() throws Exception { + lock.lock(); + try { + updateCache(); + } finally { + lock.unlock(); + } + } + + @Override + protected Scheduler scheduler() { + return scheduler; + } + + @Override + public Set<StreamsQuery> subscribe(final QueryChangeLogListener listener) { + //locks to prevent the current state from changing while subscribing. + lock.lock(); + try { + listeners.add(listener); + + //return the current state of the query repository + return queriesCache.values() + .stream() + .collect(Collectors.toSet()); + } finally { + lock.unlock(); + } + } + + @Override + public void unsubscribe(final QueryChangeLogListener listener) { + lock.lock(); + try { + listeners.remove(listener); + } finally { + lock.unlock(); + } + } + + private void checkState() { + if (!super.isRunning() && !listeners.isEmpty()) { + throw new IllegalStateException( + "The Query Repository is subscribed to, but the service has not been started."); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLogListener.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLogListener.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLogListener.java new file mode 100644 index 0000000..2b61227 --- /dev/null +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLogListener.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.api.queries; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Listener to be notified when {@link QueryChange}s occur on a {@link QueryChangeLog}. + */ +@DefaultAnnotation(NonNull.class) +public interface QueryChangeLogListener { + /** + * Notifies the listener that a query change event has occurred in the change log. + * <p> + * <b>Note:</b> + * <p> + * The QueryRepository blocks when notifying this listener. Long lasting operations + * should not be performed within this function. Doing so will block all operations + * on the repository. + * + * @param queryChangeEvent - The event that occurred. (not null) + */ + public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent); +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java index fd51b2f..4d8b2db 100644 --- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryRepository.java @@ -25,14 +25,20 @@ import java.util.UUID; import org.apache.rya.streams.api.entity.StreamsQuery; import org.apache.rya.streams.api.exception.RyaStreamsException; +import com.google.common.util.concurrent.Service; + import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; /** * Repository for adding, deleting, and listing active queries in Rya Streams. + * + * This service only needs to be started if it is being subscribed to. An + * {@link IllegalStateException} will be thrown if the service is subscribed to + * and used without being started. */ @DefaultAnnotation(NonNull.class) -public interface QueryRepository extends AutoCloseable { +public interface QueryRepository extends Service { /** * Adds a new query to Rya Streams. @@ -42,8 +48,9 @@ public interface QueryRepository extends AutoCloseable { * otherwise {@code false}. * @return The {@link StreamsQuery} used in Rya Streams. * @throws QueryRepositoryException Could not add the query. + * @throws IllegalStateException The Service has not been started, but has been subscribed to. */ - public StreamsQuery add(final String query, boolean isActive) throws QueryRepositoryException; + public StreamsQuery add(final String query, boolean isActive) throws QueryRepositoryException, IllegalStateException; /** * Updates the isActive state of a {@link StreamsQuery}. Setting this value to {@code true} @@ -53,8 +60,9 @@ public interface QueryRepository extends AutoCloseable { * @param queryId - Identifies which query will be updated. (not null) * @param isActive - The new isActive state for the query. * @throws QueryRepositoryException If the query does not exist or something else caused the change to fail. + * @throws IllegalStateException The Service has not been started, but has been subscribed to. */ - public void updateIsActive(UUID queryId, boolean isActive) throws QueryRepositoryException; + public void updateIsActive(UUID queryId, boolean isActive) throws QueryRepositoryException, IllegalStateException; /** * Get an existing query from Rya Streams. @@ -62,24 +70,42 @@ public interface QueryRepository extends AutoCloseable { * @param queryId - Identifies which query will be fetched. * @return the {@link StreamsQuery} for the id if one exists; otherwise empty. * @throws QueryRepositoryException The query could not be fetched. + * @throws IllegalStateException The Service has not been started, but has been subscribed to. */ - public Optional<StreamsQuery> get(UUID queryId) throws QueryRepositoryException; + public Optional<StreamsQuery> get(UUID queryId) throws QueryRepositoryException, IllegalStateException; /** * Removes an existing query from Rya Streams. * * @param queryID - The {@link UUID} of the query to remove. (not null) * @throws QueryRepositoryException Could not delete the query. + * @throws IllegalStateException The Service has not been started, but has been subscribed to. */ - public void delete(UUID queryID) throws QueryRepositoryException; + public void delete(UUID queryID) throws QueryRepositoryException, IllegalStateException; /** * Lists all existing queries in Rya Streams. * * @return - A List of the current {@link StreamsQuery}s * @throws QueryRepositoryException The {@link StreamsQuery}s could not be listed. + * @throws IllegalStateException The Service has not been started, but has been subscribed to. + */ + public Set<StreamsQuery> list() throws QueryRepositoryException, IllegalStateException; + + /** + * Subscribes a {@link QueryChangeLogListener} to the {@link QueryRepository}. + * + * @param listener - The {@link QueryChangeLogListener} to subscribe to this {@link QueryRepository}. (not null) + * @return The current state of the repository in the form of {@link StreamsQuery}s. + */ + public Set<StreamsQuery> subscribe(final QueryChangeLogListener listener); + + /** + * Unsubscribe a {@link QueryChangeLogListener} from the {@link QueryRepository}. + * + * @param listener - The {@link QueryChangeLogListener} to unsubscribe from this {@link QueryRepository}. (not null) */ - public Set<StreamsQuery> list() throws QueryRepositoryException; + public void unsubscribe(final QueryChangeLogListener listener); /** * A function of {@link QueryRepository} was unable to perform a function. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java index 22e616d..76c3216 100644 --- a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java +++ b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java @@ -20,6 +20,7 @@ package org.apache.rya.streams.api.queries; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -27,56 +28,62 @@ import java.util.HashSet; import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.rya.streams.api.entity.StreamsQuery; import org.apache.rya.streams.api.queries.QueryChangeLog.QueryChangeLogException; import org.junit.Test; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; + /** * Unit tests the methods of {@link InMemoryQueryRepository}. */ public class InMemoryQueryRepositoryTest { + private static final Scheduler SCHEDULE = Scheduler.newFixedRateSchedule(0L, 100, TimeUnit.MILLISECONDS); @Test public void canReadAddedQueries() throws Exception { // Setup a totally in memory QueryRepository. - try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) { - // Add some queries to it. - final Set<StreamsQuery> expected = new HashSet<>(); - expected.add( queries.add("query 1", true) ); - expected.add( queries.add("query 2", false) ); - expected.add( queries.add("query 3", true) ); - - // Show they are in the list of all queries. - final Set<StreamsQuery> stored = queries.list(); - assertEquals(expected, stored); - } + final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog(), SCHEDULE ); + // Add some queries to it. + final Set<StreamsQuery> expected = new HashSet<>(); + expected.add( queries.add("query 1", true) ); + expected.add( queries.add("query 2", false) ); + expected.add( queries.add("query 3", true) ); + + // Show they are in the list of all queries. + final Set<StreamsQuery> stored = queries.list(); + assertEquals(expected, stored); } @Test public void deletedQueriesDisappear() throws Exception { // Setup a totally in memory QueryRepository. - try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) { - // Add some queries to it. The second one we will delete. - final Set<StreamsQuery> expected = new HashSet<>(); - expected.add( queries.add("query 1", true) ); - final UUID deletedMeId = queries.add("query 2", false).getQueryId(); - expected.add( queries.add("query 3", true) ); - - // Delete the second query. - queries.delete( deletedMeId ); - - // Show only queries 1 and 3 are in the list. - final Set<StreamsQuery> stored = queries.list(); - assertEquals(expected, stored); - } + final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog(), SCHEDULE ); + // Add some queries to it. The second one we will delete. + final Set<StreamsQuery> expected = new HashSet<>(); + expected.add( queries.add("query 1", true) ); + final UUID deletedMeId = queries.add("query 2", false).getQueryId(); + expected.add( queries.add("query 3", true) ); + + // Delete the second query. + queries.delete( deletedMeId ); + + // Show only queries 1 and 3 are in the list. + final Set<StreamsQuery> stored = queries.list(); + assertEquals(expected, stored); } @Test public void initializedWithPopulatedChangeLog() throws Exception { // Setup a totally in memory QueryRepository. Hold onto the change log so that we can use it again later. final QueryChangeLog changeLog = new InMemoryQueryChangeLog(); - try(final QueryRepository queries = new InMemoryQueryRepository( changeLog )) { + final QueryRepository queries = new InMemoryQueryRepository( changeLog, SCHEDULE ); + try { + queries.startAndWait(); // Add some queries and deletes to it. final Set<StreamsQuery> expected = new HashSet<>(); expected.add( queries.add("query 1", true) ); @@ -85,11 +92,16 @@ public class InMemoryQueryRepositoryTest { queries.delete( deletedMeId ); // Create a new totally in memory QueryRepository. - try(final QueryRepository initializedQueries = new InMemoryQueryRepository( changeLog )) { + final QueryRepository initializedQueries = new InMemoryQueryRepository( changeLog, SCHEDULE ); + try { // Listing the queries should work using an initialized change log. final Set<StreamsQuery> stored = initializedQueries.list(); assertEquals(expected, stored); + } finally { + queries.stop(); } + } finally { + queries.stop(); } } @@ -100,50 +112,132 @@ public class InMemoryQueryRepositoryTest { when(changeLog.readFromStart()).thenThrow(new QueryChangeLogException("Mocked exception.")); // Create the QueryRepository and invoke one of the methods. - try(final QueryRepository queries = new InMemoryQueryRepository( changeLog )) { - queries.list(); - } + final QueryRepository queries = new InMemoryQueryRepository( changeLog, SCHEDULE ); + queries.list(); } @Test public void get_present() throws Exception { // Setup a totally in memory QueryRepository. - try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) { - // Add a query to it. - final StreamsQuery query = queries.add("query 1", true); + final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog(), SCHEDULE ); + // Add a query to it. + final StreamsQuery query = queries.add("query 1", true); - // Show the fetched query matches the expected ones. - final Optional<StreamsQuery> fetched = queries.get(query.getQueryId()); - assertEquals(query, fetched.get()); - } + // Show the fetched query matches the expected ones. + final Optional<StreamsQuery> fetched = queries.get(query.getQueryId()); + assertEquals(query, fetched.get()); } @Test public void get_notPresent() throws Exception { // Setup a totally in memory QueryRepository. - try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) { - // Fetch a query that was never added to the repository. - final Optional<StreamsQuery> query = queries.get(UUID.randomUUID()); + final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog(), SCHEDULE ); + // Fetch a query that was never added to the repository. + final Optional<StreamsQuery> query = queries.get(UUID.randomUUID()); - // Show it could not be found. - assertFalse(query.isPresent()); - } + // Show it could not be found. + assertFalse(query.isPresent()); } @Test public void update() throws Exception { // Setup a totally in memory QueryRepository. - try(final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() )) { + final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog(), SCHEDULE ); + // Add a query to it. + final StreamsQuery query = queries.add("query 1", true); + + // Change the isActive state of that query. + queries.updateIsActive(query.getQueryId(), false); + + // Show the fetched query matches the expected one. + final Optional<StreamsQuery> fetched = queries.get(query.getQueryId()); + final StreamsQuery expected = new StreamsQuery(query.getQueryId(), query.getSparql(), false); + assertEquals(expected, fetched.get()); + } + + @Test + public void updateListenerNotify() throws Exception { + // Setup a totally in memory QueryRepository. + final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog(), SCHEDULE ); + try { + queries.startAndWait(); + // Add a query to it. final StreamsQuery query = queries.add("query 1", true); - // Change the isActive state of that query. - queries.updateIsActive(query.getQueryId(), false); + final Set<StreamsQuery> existing = queries.subscribe(new QueryChangeLogListener() { + @Override + public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent) { + final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(1L, + QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true)); + assertEquals(expected, queryChangeEvent); + } + }); + + assertEquals(Sets.newHashSet(query), existing); + + queries.add("query 2", true); + } finally { + queries.stop(); + } + } - // Show the fetched query matches the expected one. - final Optional<StreamsQuery> fetched = queries.get(query.getQueryId()); - final StreamsQuery expected = new StreamsQuery(query.getQueryId(), query.getSparql(), false); - assertEquals(expected, fetched.get()); + @Test + public void updateListenerNotify_multiClient() throws Exception { + // Setup a totally in memory QueryRepository. + final QueryChangeLog changeLog = new InMemoryQueryChangeLog(); + final QueryRepository queries = new InMemoryQueryRepository( changeLog, SCHEDULE ); + final QueryRepository queries2 = new InMemoryQueryRepository( changeLog, SCHEDULE ); + + try { + queries.startAndWait(); + queries2.startAndWait(); + + //show listener on repo that query was added to is being notified of the new query. + final CountDownLatch repo1Latch = new CountDownLatch(1); + queries.subscribe(new QueryChangeLogListener() { + @Override + public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent) { + final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(0L, + QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true)); + assertEquals(expected, queryChangeEvent); + repo1Latch.countDown(); + } + }); + + //show listener not on the repo that query was added to is being notified as well. + final CountDownLatch repo2Latch = new CountDownLatch(1); + queries2.subscribe(new QueryChangeLogListener() { + @Override + public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent) { + final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(0L, + QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true)); + assertEquals(expected, queryChangeEvent); + repo2Latch.countDown(); + } + }); + + queries.add("query 2", true); + + assertTrue(repo1Latch.await(5, TimeUnit.SECONDS)); + assertTrue(repo2Latch.await(5, TimeUnit.SECONDS)); + } catch(final InterruptedException e ) { + System.out.println("PING"); + } finally { + queries.stop(); + queries2.stop(); } } + + @Test(expected = IllegalStateException.class) + public void subscribe_notStarted() throws Exception { + // Setup a totally in memory QueryRepository. + final QueryRepository queries = new InMemoryQueryRepository(new InMemoryQueryChangeLog(), SCHEDULE); + queries.subscribe(new QueryChangeLogListener() { + @Override + public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent) {} + }); + + queries.add("query 2", true); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java index 275a975..9273c33 100644 --- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java +++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/AddQueryCommand.java @@ -20,6 +20,8 @@ package org.apache.rya.streams.client.command; import static java.util.Objects.requireNonNull; +import java.util.concurrent.TimeUnit; + import org.apache.rya.streams.api.entity.StreamsQuery; import org.apache.rya.streams.api.exception.RyaStreamsException; import org.apache.rya.streams.api.interactor.AddQuery; @@ -35,6 +37,7 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import com.beust.jcommander.ParameterException; import com.google.common.base.Strings; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; @@ -115,8 +118,11 @@ public class AddQueryCommand implements RyaStreamsCommand { final String topic = KafkaTopics.queryChangeLogTopic(params.ryaInstance); final QueryChangeLog queryChangeLog = KafkaQueryChangeLogFactory.make(bootstrapServers, topic); + //The AddQuery command doesn't use the scheduled service feature. + final Scheduler scheduler = Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS); + final QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog, scheduler); // Execute the add query command. - try(QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog)) { + try { final AddQuery addQuery = new DefaultAddQuery(queryRepo); try { final StreamsQuery query = addQuery.addQuery(params.query, Boolean.parseBoolean(params.isActive)); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java index 2aeb90c..0d96df0 100644 --- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java +++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/DeleteQueryCommand.java @@ -21,6 +21,7 @@ package org.apache.rya.streams.client.command; import static java.util.Objects.requireNonNull; import java.util.UUID; +import java.util.concurrent.TimeUnit; import org.apache.rya.streams.api.exception.RyaStreamsException; import org.apache.rya.streams.api.interactor.DeleteQuery; @@ -36,6 +37,7 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import com.beust.jcommander.ParameterException; import com.google.common.base.Strings; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; @@ -113,8 +115,11 @@ public class DeleteQueryCommand implements RyaStreamsCommand { final String topic = KafkaTopics.queryChangeLogTopic(params.ryaInstance); final QueryChangeLog queryChangeLog = KafkaQueryChangeLogFactory.make(bootstrapServers, topic); + //The DeleteQuery command doesn't use the scheduled service feature. + final Scheduler scheduler = Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS); + final QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog, scheduler); // Execute the delete query command. - try(QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog)) { + try { final DeleteQuery deleteQuery = new DefaultDeleteQuery(queryRepo); try { deleteQuery.delete(UUID.fromString(params.queryId)); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java index 670007b..cd78975 100644 --- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java +++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java @@ -21,6 +21,7 @@ package org.apache.rya.streams.client.command; import static java.util.Objects.requireNonNull; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.rya.streams.api.entity.StreamsQuery; import org.apache.rya.streams.api.exception.RyaStreamsException; @@ -35,6 +36,7 @@ import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory; import com.beust.jcommander.JCommander; import com.beust.jcommander.ParameterException; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; @@ -83,8 +85,11 @@ public class ListQueriesCommand implements RyaStreamsCommand { final String topic = KafkaTopics.queryChangeLogTopic(params.ryaInstance); final QueryChangeLog queryChangeLog = KafkaQueryChangeLogFactory.make(bootstrapServers, topic); + //The ListQueries command doesn't use the scheduled service feature. + final Scheduler scheduler = Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS); + final QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog, scheduler); // Execute the list queries command. - try(QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog)) { + try { final ListQueries listQueries = new DefaultListQueries(queryRepo); try { final Set<StreamsQuery> queries = listQueries.all(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java index 8f7f162..ddaf647 100644 --- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java +++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.TimeUnit; import org.apache.rya.streams.api.entity.StreamsQuery; import org.apache.rya.streams.api.queries.InMemoryQueryRepository; @@ -39,6 +40,7 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import com.beust.jcommander.ParameterException; import com.google.common.base.Strings; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; @@ -117,8 +119,11 @@ public class RunQueryCommand implements RyaStreamsCommand { final String topic = KafkaTopics.queryChangeLogTopic(params.ryaInstance); final QueryChangeLog queryChangeLog = KafkaQueryChangeLogFactory.make(bootstrapServers, topic); + //The RunQuery command doesn't use the scheduled service feature. + final Scheduler scheduler = Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS); + final QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog, scheduler); // Look up the query to be executed from the change log. - try(QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog)) { + try { try { final UUID queryId = UUID.fromString( params.queryId ); final Optional<StreamsQuery> query = queryRepo.get(queryId); @@ -145,7 +150,7 @@ public class RunQueryCommand implements RyaStreamsCommand { } catch(final Exception e) { throw new ExecutionException("Could not execute the Run Query command.", e); } - } catch(final ArgumentsException | ExecutionException e) { + } catch(final ExecutionException e) { // Rethrow the exceptions that are advertised by execute. throw e; } catch (final Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java index 7c548f1..3612dd0 100644 --- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java +++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/StreamResultsCommand.java @@ -22,6 +22,7 @@ import static java.util.Objects.requireNonNull; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.rya.streams.api.entity.QueryResultStream; @@ -45,6 +46,7 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import com.beust.jcommander.ParameterException; import com.google.common.base.Strings; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; @@ -132,9 +134,12 @@ public class StreamResultsCommand implements RyaStreamsCommand { throw new ArgumentsException("Invalid Query ID " + params.queryId); } + //The DeleteQuery command doesn't use the scheduled service feature. + final Scheduler scheduler = Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS); + final QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog, scheduler); // Fetch the SPARQL of the query whose results will be streamed. final String sparql; - try(QueryRepository queryRepo = new InMemoryQueryRepository(queryChangeLog)) { + try { final Optional<StreamsQuery> sQuery = queryRepo.get(queryId); if(!sQuery.isPresent()) { throw new ExecutionException("Could not read the results for query with ID " + queryId + http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java index 8b4f074..3bfbadc 100644 --- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java +++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryCommandIT.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import java.util.Set; import java.util.UUID; +import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.producer.Producer; @@ -38,11 +39,12 @@ import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserialize import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer; import org.apache.rya.test.kafka.KafkaTestInstanceRule; import org.apache.rya.test.kafka.KafkaTestUtil; -import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; + /** * integration Test for adding a new query through a command. */ @@ -64,12 +66,7 @@ public class AddQueryCommandIT { final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class); final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class); final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic); - queryRepo = new InMemoryQueryRepository(changeLog); - } - - @After - public void cleanup() throws Exception { - queryRepo.close(); + queryRepo = new InMemoryQueryRepository(changeLog, Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS)); } @Test http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java index 6083543..7bec080 100644 --- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java +++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/DeleteQueryCommandIT.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertNotEquals; import java.util.Set; import java.util.UUID; +import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.producer.Producer; @@ -39,11 +40,12 @@ import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserialize import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer; import org.apache.rya.test.kafka.KafkaTestInstanceRule; import org.apache.rya.test.kafka.KafkaTestUtil; -import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; + /** * Integration Test for deleting a query from Rya Streams through a command. */ @@ -66,12 +68,7 @@ public class DeleteQueryCommandIT { final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class); final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class); final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic); - queryRepo = new InMemoryQueryRepository(changeLog); - } - - @After - public void cleanup() throws Exception { - queryRepo.close(); + queryRepo = new InMemoryQueryRepository(changeLog, Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS)); } @Test http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java index 1399142..f6ceb75 100644 --- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java +++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/ListQueryCommandIT.java @@ -19,6 +19,7 @@ package org.apache.rya.streams.client.command; import java.util.UUID; +import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.producer.Producer; @@ -34,11 +35,12 @@ import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserialize import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer; import org.apache.rya.test.kafka.KafkaTestInstanceRule; import org.apache.rya.test.kafka.KafkaTestUtil; -import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; + /** * integration Test for listing queries through a command. */ @@ -60,12 +62,7 @@ public class ListQueryCommandIT { final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class); final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class); final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic); - queryRepo = new InMemoryQueryRepository(changeLog); - } - - @After - public void cleanup() throws Exception { - queryRepo.close(); + queryRepo = new InMemoryQueryRepository(changeLog, Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS)); } @Test http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java index 3389d6b..7e3b8bc 100644 --- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java +++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals; import java.util.ArrayList; import java.util.List; import java.util.UUID; +import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.producer.Producer; @@ -56,6 +57,7 @@ import org.openrdf.model.impl.ValueFactoryImpl; import org.openrdf.query.impl.MapBindingSet; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; /** * Integration tests the methods of {@link RunQueryCommand}. @@ -81,7 +83,7 @@ public class RunQueryCommandIT { final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class); final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class); final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic); - queryRepo = new InMemoryQueryRepository(changeLog); + queryRepo = new InMemoryQueryRepository(changeLog, Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS)); // Initialize the Statements Producer and the Results Consumer. stmtProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityStatementSerializer.class); @@ -92,7 +94,6 @@ public class RunQueryCommandIT { public void cleanup() throws Exception{ stmtProducer.close(); resultConsumer.close(); - queryRepo.close(); } @Test(expected = ExecutionException.class) http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/36af1153/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java index 9a773f0..5dbd27f 100644 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals; import java.util.ArrayList; import java.util.List; import java.util.UUID; +import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.producer.Producer; @@ -52,6 +53,7 @@ import org.openrdf.model.impl.ValueFactoryImpl; import org.openrdf.query.impl.MapBindingSet; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; /** * Integration tests the methods of {@link KafkaRunQuery}. @@ -83,7 +85,7 @@ public class KafkaRunQueryIT { final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); // This query is completely in memory, so it doesn't need to be closed. - final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() ); + final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog(), Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS) ); // Add the query to the query repository. final StreamsQuery sQuery = queries.add("SELECT * WHERE { ?person <urn:worksAt> ?business . }", true);