Rya 451 Query manager QueryManager with tests updated InMemoryQueryRepository and its tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/e355f73a Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/e355f73a Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/e355f73a Branch: refs/heads/master Commit: e355f73ae6c56eb439e1d56722cd20c5287cbe04 Parents: a11ca4a Author: Andrew Smith <[email protected]> Authored: Tue Jan 30 14:01:54 2018 -0500 Committer: Valiyil <[email protected]> Committed: Fri Mar 9 12:59:47 2018 -0500 ---------------------------------------------------------------------- .../rya/streams/api/entity/StreamsQuery.java | 15 ++ .../api/queries/InMemoryQueryRepository.java | 7 +- .../api/queries/QueryChangeLogListener.java | 8 +- .../queries/InMemoryQueryRepositoryTest.java | 55 ++-- extras/rya.streams/query-manager/pom.xml | 2 +- .../rya/streams/querymanager/QueryManager.java | 255 +++++++++++++++++++ .../streams/querymanager/QueryManagerTest.java | 195 ++++++++++++++ 7 files changed, 504 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e355f73a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java index bd750a6..7194834 100644 --- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java @@ -85,4 +85,19 @@ public class StreamsQuery { } return false; } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder(); + sb.append("ID: "); + sb.append(getQueryId().toString() + "\n"); + sb.append("Query: "); + sb.append(getSparql() + "\n"); + sb.append("Is "); + if (!isActive) { + sb.append(" Not "); + } + sb.append("Running.\n"); + return sb.toString(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e355f73a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java index dca040f..5fb0297 100644 --- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java @@ -188,8 +188,8 @@ public class InMemoryQueryRepository extends AbstractScheduledService implements // Our internal cache is already up to date, so just return its values. return queriesCache.values() - .stream() - .collect(Collectors.toSet()); + .stream() + .collect(Collectors.toSet()); } finally { lock.unlock(); @@ -253,7 +253,8 @@ public class InMemoryQueryRepository extends AbstractScheduledService implements break; } - listeners.forEach(listener -> listener.notify(entry)); + final Optional<StreamsQuery> newQueryState = Optional.ofNullable(queriesCache.get(queryId)); + listeners.forEach(listener -> listener.notify(entry, newQueryState)); cachePosition = Optional.of( entry.getPosition() ); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e355f73a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLogListener.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLogListener.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLogListener.java index 2b61227..2f0bcc3 100644 --- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLogListener.java +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLogListener.java @@ -18,6 +18,10 @@ */ package org.apache.rya.streams.api.queries; +import java.util.Optional; + +import org.apache.rya.streams.api.entity.StreamsQuery; + import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; @@ -36,6 +40,8 @@ public interface QueryChangeLogListener { * on the repository. * * @param queryChangeEvent - The event that occurred. (not null) + * @param newQueryState - The new state of the query after the query change event, this will be + * absent if the change type is DELETE. (not null) */ - public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent); + public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent, final Optional<StreamsQuery> newQueryState); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e355f73a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java index 76c3216..3b3d48a 100644 --- a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java +++ b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java @@ -165,13 +165,14 @@ public class InMemoryQueryRepositoryTest { // Add a query to it. final StreamsQuery query = queries.add("query 1", true); - final Set<StreamsQuery> existing = queries.subscribe(new QueryChangeLogListener() { - @Override - public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent) { - final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(1L, - QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true)); - assertEquals(expected, queryChangeEvent); - } + final Set<StreamsQuery> existing = queries.subscribe((queryChangeEvent, newQueryState) -> { + final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(1L, + QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true)); + final Optional<StreamsQuery> expectedQueryState = Optional.of( + new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true)); + + assertEquals(expected, queryChangeEvent); + assertEquals(expectedQueryState, newQueryState); }); assertEquals(Sets.newHashSet(query), existing); @@ -195,26 +196,28 @@ public class InMemoryQueryRepositoryTest { //show listener on repo that query was added to is being notified of the new query. final CountDownLatch repo1Latch = new CountDownLatch(1); - queries.subscribe(new QueryChangeLogListener() { - @Override - public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent) { - final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(0L, - QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true)); - assertEquals(expected, queryChangeEvent); - repo1Latch.countDown(); - } + queries.subscribe((queryChangeEvent, newQueryState) -> { + final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(0L, + QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true)); + final Optional<StreamsQuery> expectedQueryState = Optional.of( + new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true)); + + assertEquals(expected, queryChangeEvent); + assertEquals(expectedQueryState, newQueryState); + repo1Latch.countDown(); }); //show listener not on the repo that query was added to is being notified as well. final CountDownLatch repo2Latch = new CountDownLatch(1); - queries2.subscribe(new QueryChangeLogListener() { - @Override - public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent) { - final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(0L, - QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true)); - assertEquals(expected, queryChangeEvent); - repo2Latch.countDown(); - } + queries2.subscribe((queryChangeEvent, newQueryState) -> { + final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(0L, + QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true)); + final Optional<StreamsQuery> expectedQueryState = Optional.of( + new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true)); + + assertEquals(expected, queryChangeEvent); + assertEquals(expectedQueryState, newQueryState); + repo2Latch.countDown(); }); queries.add("query 2", true); @@ -222,7 +225,6 @@ public class InMemoryQueryRepositoryTest { assertTrue(repo1Latch.await(5, TimeUnit.SECONDS)); assertTrue(repo2Latch.await(5, TimeUnit.SECONDS)); } catch(final InterruptedException e ) { - System.out.println("PING"); } finally { queries.stop(); queries2.stop(); @@ -233,10 +235,7 @@ public class InMemoryQueryRepositoryTest { public void subscribe_notStarted() throws Exception { // Setup a totally in memory QueryRepository. final QueryRepository queries = new InMemoryQueryRepository(new InMemoryQueryChangeLog(), SCHEDULE); - queries.subscribe(new QueryChangeLogListener() { - @Override - public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent) {} - }); + queries.subscribe((queryChangeEvent, newQueryState) -> {}); queries.add("query 2", true); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e355f73a/extras/rya.streams/query-manager/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/pom.xml b/extras/rya.streams/query-manager/pom.xml index d321ab5..2141a3a 100644 --- a/extras/rya.streams/query-manager/pom.xml +++ b/extras/rya.streams/query-manager/pom.xml @@ -39,7 +39,7 @@ under the License. <groupId>org.apache.rya</groupId> <artifactId>rya.streams.kafka</artifactId> </dependency> - + <!-- Apache Daemon dependencies --> <dependency> <groupId>commons-daemon</groupId> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e355f73a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java new file mode 100644 index 0000000..30b4538 --- /dev/null +++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.rya.streams.querymanager; + +import static java.util.Objects.requireNonNull; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.queries.ChangeLogEntry; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChange.ChangeType; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryChangeLogListener; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener; +import org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.AbstractIdleService; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A service for managing {@link StreamsQuery} running on a Rya Streams system. + * <p> + * Only one QueryManager needs to be running to manage any number of rya + * instances/rya streams instances. + */ +@DefaultAnnotation(NonNull.class) +public class QueryManager extends AbstractIdleService { + private static final Logger LOG = LoggerFactory.getLogger(QueryManager.class); + + private final QueryExecutor queryExecutor; + private final Scheduler scheduler; + + /** + * Map of Rya Instance name to {@link QueryRepository}. + */ + private final Map<String, QueryRepository> queryRepos = new HashMap<>(); + + private final ReentrantLock lock = new ReentrantLock(); + + private final QueryChangeLogSource source; + + /** + * Creates a new {@link QueryManager}. + * + * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not null) + * @param source - The {@link QueryChangeLogSource} of QueryChangeLogs. (not null) + * @param scheduler - The {@link Scheduler} used to discover query changes + * within the {@link QueryChangeLog}s (not null) + */ + public QueryManager(final QueryExecutor queryExecutor, final QueryChangeLogSource source, final Scheduler scheduler) { + this.source = requireNonNull(source); + this.queryExecutor = requireNonNull(queryExecutor); + this.scheduler = requireNonNull(scheduler); + } + + /** + * Starts running a query. + * + * @param ryaInstanceName - The Rya instance the query belongs to. (not null) + * @param query - The query to run.(not null) + */ + private void runQuery(final String ryaInstanceName, final StreamsQuery query) { + requireNonNull(ryaInstanceName); + requireNonNull(query); + LOG.info("Starting Query: " + query.toString() + " on the rya instance: " + ryaInstanceName); + + try { + queryExecutor.startQuery(ryaInstanceName, query); + } catch (final QueryExecutorException e) { + LOG.error("Failed to start query.", e); + } + } + + /** + * Stops the specified query from running. + * + * @param queryId - The ID of the query to stop running. (not null) + */ + private void stopQuery(final UUID queryId) { + requireNonNull(queryId); + + LOG.info("Stopping query: " + queryId.toString()); + + try { + queryExecutor.stopQuery(queryId); + } catch (final QueryExecutorException e) { + LOG.error("Failed to stop query.", e); + } + } + + @Override + protected void startUp() throws Exception { + lock.lock(); + try { + LOG.info("Starting Query Manager."); + queryExecutor.startAndWait(); + source.startAndWait(); + + // subscribe to the sources to be notified of changes. + source.subscribe(new QueryManagerSourceListener()); + } finally { + lock.unlock(); + } + } + + @Override + protected void shutDown() throws Exception { + lock.lock(); + try { + LOG.info("Stopping Query Manager."); + source.stopAndWait(); + queryExecutor.stopAndWait(); + } finally { + lock.unlock(); + } + } + + /** + * An implementation of {@link QueryChangeLogListener} for the + * {@link QueryManager}. + * <p> + * When notified of a {@link ChangeType} performs one of the following: + * <li>{@link ChangeType#CREATE}: Creates a new query using the + * {@link QueryExecutor} provided to the {@link QueryManager}</li> + * <li>{@link ChangeType#DELETE}: Deletes a running query by stopping the + * {@link QueryExecutor} service of the queryID in the event</li> + * <li>{@link ChangeType#UPDATE}: If the query is running and the update is + * to stop the query, stops the query. Otherwise, if the query is not + * running, it is removed.</li> + */ + private class QueryExecutionForwardingListener implements QueryChangeLogListener { + private final String ryaInstanceName; + + /** + * Creates a new {@link QueryExecutionForwardingListener}. + * + * @param ryaInstanceName - The rya instance the query change is + * performed on. (not null) + */ + public QueryExecutionForwardingListener(final String ryaInstanceName) { + this.ryaInstanceName = requireNonNull(ryaInstanceName); + } + + @Override + public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent, final Optional<StreamsQuery> newQueryState) { + LOG.debug("New query change event."); + final QueryChange entry = queryChangeEvent.getEntry(); + + lock.lock(); + try { + + switch (entry.getChangeType()) { + case CREATE: + if(!newQueryState.isPresent()) { + LOG.error("The query with ID: " + entry.getQueryId() + " must be present with the change to be created."); + LOG.debug("newQueryState is not allowed to be absent with a CREATE QueryChange, there might be a bug in the QueryRepository."); + } else { + runQuery(ryaInstanceName, newQueryState.get()); + } + break; + case DELETE: + stopQuery(entry.getQueryId()); + break; + case UPDATE: + if (!newQueryState.isPresent()) { + LOG.error("The query with ID: " + entry.getQueryId() + " must be provided with the update, cannot perform update."); + LOG.debug("newQueryState is not allowed to be absent with a UPDATE QueryChange, there might be a bug in the QueryRepository."); + } else { + final StreamsQuery updatedQuery = newQueryState.get(); + if (updatedQuery.isActive()) { + runQuery(ryaInstanceName, updatedQuery); + LOG.info("Starting query: " + updatedQuery.toString()); + } else { + stopQuery(updatedQuery.getQueryId()); + LOG.info("Stopping query: " + updatedQuery.toString()); + } + } + break; + } + } finally { + lock.unlock(); + } + } + } + + /** + * Listener used by the {@link QueryManager} to be notified when + * {@link QueryChangeLog}s are created or deleted. + */ + private class QueryManagerSourceListener implements SourceListener { + @Override + public void notifyCreate(final String ryaInstanceName, final QueryChangeLog log) { + lock.lock(); + try { + LOG.info("Discovered new Query Change Log for Rya Instance " + ryaInstanceName + "."); + final QueryRepository repo = new InMemoryQueryRepository(log, scheduler); + repo.startAndWait(); + final Set<StreamsQuery> queries = repo.subscribe(new QueryExecutionForwardingListener(ryaInstanceName)); + queries.forEach(query -> { + if (query.isActive()) { + try { + queryExecutor.startQuery(ryaInstanceName, query); + } catch (IllegalStateException | QueryExecutorException e) { + LOG.error("Unable to start query for rya instance " + ryaInstanceName, e); + } + } + }); + queryRepos.put(ryaInstanceName, repo); + } finally { + lock.unlock(); + } + } + + @Override + public void notifyDelete(final String ryaInstanceName) { + lock.lock(); + try { + LOG.info("Notified of deleting QueryChangeLog, stopping all queries belonging to the change log for " + + ryaInstanceName + "."); + queryExecutor.stopAll(ryaInstanceName); + } catch (final QueryExecutorException e) { + LOG.error("Failed to stop all queries belonging to: " + ryaInstanceName, e); + } finally { + lock.unlock(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e355f73a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java new file mode 100644 index 0000000..a1203a0 --- /dev/null +++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.rya.streams.querymanager; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.queries.InMemoryQueryChangeLog; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener; +import org.junit.Test; + +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; + +/** + * Test for the {@link QueryManager} + */ +public class QueryManagerTest { + private static final Scheduler TEST_SCHEDULER = Scheduler.newFixedRateSchedule(0, 100, TimeUnit.MILLISECONDS); + + /** + * Tests when the query manager is notified to create a new query, the query + * is created and started. + */ + @Test + public void testCreateQuery() throws Exception { + //The new QueryChangeLog + final QueryChangeLog newChangeLog = new InMemoryQueryChangeLog(); + final String ryaInstance = "ryaTestInstance"; + final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "some query", true); + + // when the query executor is told to start the test query on the test + // rya instance, count down on the countdown latch + final QueryExecutor qe = mock(QueryExecutor.class); + when(qe.isRunning()).thenReturn(true); + + final CountDownLatch queryStarted = new CountDownLatch(1); + doAnswer(invocation -> { + queryStarted.countDown(); + return null; + }).when(qe).startQuery(eq(ryaInstance), eq(query)); + final QueryChangeLogSource source = mock(QueryChangeLogSource.class); + + //When the QueryChangeLogSource is subscribed to in the QueryManager, mock notify of a new QueryChangeLog + doAnswer(invocation -> { + //The listener created by the Query Manager + final SourceListener listener = (SourceListener) invocation.getArguments()[0]; + listener.notifyCreate(ryaInstance, newChangeLog); + newChangeLog.write(QueryChange.create(query.getQueryId(), query.getSparql(), query.isActive())); + return null; + }).when(source).subscribe(any(SourceListener.class)); + + final QueryManager qm = new QueryManager(qe, source, TEST_SCHEDULER); + try { + qm.startAndWait(); + queryStarted.await(5, TimeUnit.SECONDS); + verify(qe).startQuery(ryaInstance, query); + } finally { + qm.stopAndWait(); + } + } + + /** + * Tests when the query manager is notified to delete a new query, the query + * is stopped and deleted. + */ + @Test + public void testDeleteQuery() throws Exception { + //The new QueryChangeLog + final QueryChangeLog newChangeLog = new InMemoryQueryChangeLog(); + final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "some query", true); + final String ryaInstance = "ryaTestInstance"; + + // when the query executor is told to start the test query on the test + // rya instance, count down on the countdown latch + final QueryExecutor qe = mock(QueryExecutor.class); + when(qe.isRunning()).thenReturn(true); + + final CountDownLatch queryStarted = new CountDownLatch(1); + final CountDownLatch queryDeleted = new CountDownLatch(1); + doAnswer(invocation -> { + queryDeleted.countDown(); + return null; + }).when(qe).stopQuery(query.getQueryId()); + final QueryChangeLogSource source = mock(QueryChangeLogSource.class); + + // when the query executor is told to start the test query on the test + // rya instance, count down on the countdown latch + doAnswer(invocation -> { + queryStarted.countDown(); + return null; + }).when(qe).startQuery(eq(ryaInstance), eq(query)); + + //When the QueryChangeLogSource is subscribed to in the QueryManager, mock notify of a new QueryChangeLog + // add the query, so it can be removed + doAnswer(invocation -> { + //The listener created by the Query Manager + final SourceListener listener = (SourceListener) invocation.getArguments()[0]; + listener.notifyCreate(ryaInstance, newChangeLog); + Thread.sleep(1000); + newChangeLog.write(QueryChange.create(query.getQueryId(), query.getSparql(), query.isActive())); + queryStarted.await(5, TimeUnit.SECONDS); + newChangeLog.write(QueryChange.delete(query.getQueryId())); + return null; + }).when(source).subscribe(any(SourceListener.class)); + + final QueryManager qm = new QueryManager(qe, source, TEST_SCHEDULER); + try { + qm.startAndWait(); + queryDeleted.await(5, TimeUnit.SECONDS); + verify(qe).stopQuery(query.getQueryId()); + } finally { + qm.stopAndWait(); + } + } + + /** + * Tests when the query manager is notified to update an existing query, the + * query is stopped. + */ + @Test + public void testUpdateQuery() throws Exception { + // The new QueryChangeLog + final QueryChangeLog newChangeLog = new InMemoryQueryChangeLog(); + final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "some query", true); + final String ryaInstance = "ryaTestInstance"; + + // when the query executor is told to start the test query on the test + // rya instance, count down on the countdown latch + final QueryExecutor qe = mock(QueryExecutor.class); + when(qe.isRunning()).thenReturn(true); + + final CountDownLatch queryStarted = new CountDownLatch(1); + final CountDownLatch queryDeleted = new CountDownLatch(1); + doAnswer(invocation -> { + queryDeleted.countDown(); + return null; + }).when(qe).stopQuery(query.getQueryId()); + final QueryChangeLogSource source = mock(QueryChangeLogSource.class); + + // when the query executor is told to start the test query on the test + // rya instance, count down on the countdown latch + doAnswer(invocation -> { + queryStarted.countDown(); + return null; + }).when(qe).startQuery(eq(ryaInstance), eq(query)); + + // When the QueryChangeLogSource is subscribed to in the QueryManager, + // mock notify of a new QueryChangeLog + // add the query, so it can be removed + doAnswer(invocation -> { + // The listener created by the Query Manager + final SourceListener listener = (SourceListener) invocation.getArguments()[0]; + listener.notifyCreate(ryaInstance, newChangeLog); + Thread.sleep(1000); + newChangeLog.write(QueryChange.create(query.getQueryId(), query.getSparql(), query.isActive())); + queryStarted.await(5, TimeUnit.SECONDS); + newChangeLog.write(QueryChange.update(query.getQueryId(), false)); + return null; + }).when(source).subscribe(any(SourceListener.class)); + + final QueryManager qm = new QueryManager(qe, source, TEST_SCHEDULER); + try { + qm.startAndWait(); + queryDeleted.await(10, TimeUnit.SECONDS); + verify(qe).stopQuery(query.getQueryId()); + } finally { + qm.stopAndWait(); + } + } +}
