RYA-451 Fixing threading issues with the QueryManager class.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/1cd8db32 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/1cd8db32 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/1cd8db32 Branch: refs/heads/master Commit: 1cd8db32e04f116934fef22fe9b465f7cd807755 Parents: a342fe2 Author: kchilton2 <kevin.e.chil...@gmail.com> Authored: Fri Feb 2 22:47:59 2018 -0500 Committer: Valiyil <puja.vali...@parsons.com> Committed: Fri Mar 9 12:59:55 2018 -0500 ---------------------------------------------------------------------- .../rya/streams/api/entity/StreamsQuery.java | 4 +- .../api/queries/InMemoryQueryRepository.java | 30 +- .../rya/streams/api/queries/QueryChange.java | 10 + .../queries/InMemoryQueryRepositoryTest.java | 16 +- .../client/command/ListQueriesCommand.java | 12 +- .../streams/client/command/RunQueryCommand.java | 2 +- .../apache/rya/streams/kafka/KafkaTopics.java | 4 +- .../kafka/SingleThreadKafkaStreamsFactory.java | 2 +- .../kafka/interactor/CreateKafkaTopic.java | 60 ++ .../query-manager/src/main/README.txt | 7 +- .../src/main/config/configuration.xml | 10 +- .../query-manager/src/main/config/log4j.xml | 17 + .../querymanager/QueryChangeLogSource.java | 5 +- .../rya/streams/querymanager/QueryManager.java | 929 ++++++++++++++++--- .../querymanager/QueryManagerDaemon.java | 8 +- .../kafka/KafkaQueryChangeLogSource.java | 46 +- .../querymanager/kafka/LocalQueryExecutor.java | 31 +- .../src/main/xsd/QueryManagerConfig.xsd | 14 + .../querymanager/LogEventWorkGeneratorTest.java | 136 +++ .../querymanager/LogEventWorkerTest.java | 245 +++++ .../QueryEventWorkGeneratorTest.java | 265 ++++++ .../querymanager/QueryEventWorkerTest.java | 172 ++++ .../streams/querymanager/QueryManagerTest.java | 41 +- .../rya/streams/querymanager/ThreadUtil.java | 48 + .../kafka/LocalQueryExecutorIT.java | 4 +- .../kafka/LocalQueryExecutorTest.java | 25 +- .../xml/QueryManagerConfigMarshallerTest.java | 5 + 27 files changed, 1907 insertions(+), 241 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java index 7194834..11423bd 100644 --- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java @@ -85,7 +85,7 @@ public class StreamsQuery { } return false; } - + @Override public String toString() { final StringBuilder sb = new StringBuilder(); @@ -95,7 +95,7 @@ public class StreamsQuery { sb.append(getSparql() + "\n"); sb.append("Is "); if (!isActive) { - sb.append(" Not "); + sb.append("Not "); } sb.append("Running.\n"); return sb.toString(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java index 5fb0297..95c1922 100644 --- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java @@ -51,9 +51,9 @@ import info.aduna.iteration.CloseableIteration; */ @DefaultAnnotation(NonNull.class) public class InMemoryQueryRepository extends AbstractScheduledService implements QueryRepository { - private static final Logger LOG = LoggerFactory.getLogger(InMemoryQueryRepository.class); + private static final Logger log = LoggerFactory.getLogger(InMemoryQueryRepository.class); - private final ReentrantLock lock = new ReentrantLock(); + private final ReentrantLock lock = new ReentrantLock(true); /** * The change log that is the ground truth for describing what the queries look like. @@ -198,7 +198,6 @@ public class InMemoryQueryRepository extends AbstractScheduledService implements @Override protected void shutDown() throws Exception { - super.shutDown(); lock.lock(); try { changeLog.close(); @@ -211,11 +210,12 @@ public class InMemoryQueryRepository extends AbstractScheduledService implements * Updates the {@link #queriesCache} to reflect the latest position within the {@link #changeLog}. */ private void updateCache() { - requireNonNull(changeLog); + log.trace("updateCache() - Enter"); CloseableIteration<ChangeLogEntry<QueryChange>, QueryChangeLogException> it = null; try { // Iterate over everything since the last position that was handled within the change log. + log.debug("Starting cache position:" + cachePosition); if(cachePosition.isPresent()) { it = changeLog.readFromPosition(cachePosition.get() + 1); } else { @@ -228,6 +228,8 @@ public class InMemoryQueryRepository extends AbstractScheduledService implements final QueryChange change = entry.getEntry(); final UUID queryId = change.getQueryId(); + log.debug("Updating the cache to reflect:\n" + change); + switch(change.getChangeType()) { case CREATE: final StreamsQuery query = new StreamsQuery( @@ -253,15 +255,17 @@ public class InMemoryQueryRepository extends AbstractScheduledService implements break; } + log.debug("Notifying listeners with the updated state."); final Optional<StreamsQuery> newQueryState = Optional.ofNullable(queriesCache.get(queryId)); listeners.forEach(listener -> listener.notify(entry, newQueryState)); cachePosition = Optional.of( entry.getPosition() ); + log.debug("New chache position: " + cachePosition); } } catch (final QueryChangeLogException e) { // Rethrow the exception because the object the supplier tried to create could not be created. - throw new RuntimeException("Could not initialize the " + InMemoryQueryRepository.class.getName(), e); + throw new RuntimeException("Could not update the cache of " + InMemoryQueryRepository.class.getName(), e); } finally { // Try to close the iteration if it was opened. @@ -270,18 +274,22 @@ public class InMemoryQueryRepository extends AbstractScheduledService implements it.close(); } } catch (final QueryChangeLogException e) { - LOG.error("Could not close the " + CloseableIteration.class.getName(), e); + log.error("Could not close the " + CloseableIteration.class.getName(), e); } + + log.trace("updateCache() - Exit"); } } @Override protected void runOneIteration() throws Exception { + log.trace("runOneIteration() - Enter"); lock.lock(); try { updateCache(); } finally { lock.unlock(); + log.trace("runOneIteration() - Exit"); } } @@ -292,17 +300,25 @@ public class InMemoryQueryRepository extends AbstractScheduledService implements @Override public Set<StreamsQuery> subscribe(final QueryChangeLogListener listener) { + log.trace("subscribe(listener) - Enter"); + //locks to prevent the current state from changing while subscribing. lock.lock(); + log.trace("subscribe(listener) - Acquired lock"); try { listeners.add(listener); + log.trace("subscribe(listener) - Listener Registered"); //return the current state of the query repository - return queriesCache.values() + final Set<StreamsQuery> queries = queriesCache.values() .stream() .collect(Collectors.toSet()); + log.trace("subscribe(listener) - Returning " + queries.size() + " existing queries"); + return queries; } finally { + log.trace("subscribe(listener) - Releasing lock"); lock.unlock(); + log.trace("subscribe(listener) - Exit"); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java index d283957..d34a394 100644 --- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java @@ -110,6 +110,16 @@ public final class QueryChange implements Serializable { return false; } + @Override + public String toString() { + return "QueryChange: {" + + " Query ID: " + queryId + ",\n" + + " Change Type: " + changeType + ",\n" + + " Is Active: " + isActive + ",\n" + + " SPARQL: " + sparql + "\n" + + "}"; + } + /** * Create a {@link QueryChange} that represents a new SPARQL query that will be managed by Rya Streams. * http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java index 3b3d48a..d7e116b 100644 --- a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java +++ b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java @@ -93,13 +93,9 @@ public class InMemoryQueryRepositoryTest { // Create a new totally in memory QueryRepository. final QueryRepository initializedQueries = new InMemoryQueryRepository( changeLog, SCHEDULE ); - try { - // Listing the queries should work using an initialized change log. - final Set<StreamsQuery> stored = initializedQueries.list(); - assertEquals(expected, stored); - } finally { - queries.stop(); - } + // Listing the queries should work using an initialized change log. + final Set<StreamsQuery> stored = initializedQueries.list(); + assertEquals(expected, stored); } finally { queries.stop(); } @@ -166,7 +162,7 @@ public class InMemoryQueryRepositoryTest { final StreamsQuery query = queries.add("query 1", true); final Set<StreamsQuery> existing = queries.subscribe((queryChangeEvent, newQueryState) -> { - final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(1L, + final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<>(1L, QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true)); final Optional<StreamsQuery> expectedQueryState = Optional.of( new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true)); @@ -197,7 +193,7 @@ public class InMemoryQueryRepositoryTest { //show listener on repo that query was added to is being notified of the new query. final CountDownLatch repo1Latch = new CountDownLatch(1); queries.subscribe((queryChangeEvent, newQueryState) -> { - final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(0L, + final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<>(0L, QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true)); final Optional<StreamsQuery> expectedQueryState = Optional.of( new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true)); @@ -210,7 +206,7 @@ public class InMemoryQueryRepositoryTest { //show listener not on the repo that query was added to is being notified as well. final CountDownLatch repo2Latch = new CountDownLatch(1); queries2.subscribe((queryChangeEvent, newQueryState) -> { - final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<QueryChange>(0L, + final ChangeLogEntry<QueryChange> expected = new ChangeLogEntry<>(0L, QueryChange.create(queryChangeEvent.getEntry().getQueryId(), "query 2", true)); final Optional<StreamsQuery> expectedQueryState = Optional.of( new StreamsQuery(queryChangeEvent.getEntry().getQueryId(), "query 2", true)); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java index cd78975..a5507a6 100644 --- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java +++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java @@ -23,6 +23,7 @@ import static java.util.Objects.requireNonNull; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang.StringUtils; import org.apache.rya.streams.api.entity.StreamsQuery; import org.apache.rya.streams.api.exception.RyaStreamsException; import org.apache.rya.streams.api.interactor.ListQueries; @@ -112,12 +113,11 @@ public class ListQueriesCommand implements RyaStreamsCommand { sb.append("Queries in Rya Streams:\n"); sb.append("---------------------------------------------------------\n"); queries.forEach(query -> { - sb.append("ID: "); - sb.append(query.getQueryId()); - sb.append("\t\t"); - sb.append("Query: "); - sb.append(query.getSparql()); - sb.append("\n"); + sb.append("ID: ").append(query.getQueryId()) + .append(" ") + .append("Is Active: ").append(query.isActive()) + .append(StringUtils.rightPad("" + query.isActive(), 9)) + .append("Query: ").append(query.getSparql()).append("\n"); }); return sb.toString(); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java index ddaf647..7b311f6 100644 --- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java +++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/RunQueryCommand.java @@ -136,7 +136,7 @@ public class RunQueryCommand implements RyaStreamsCommand { final Set<String> topics = new HashSet<>(); topics.add( KafkaTopics.statementsTopic(params.ryaInstance) ); topics.add( KafkaTopics.queryResultsTopic(queryId) ); - KafkaTopics.createTopic(params.zookeeperServers, topics, 1, 1); + KafkaTopics.createTopics(params.zookeeperServers, topics, 1, 1); // Run the query that uses those topics. final KafkaRunQuery runQuery = new KafkaRunQuery( http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java index 095465c..989799a 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java @@ -55,7 +55,7 @@ public class KafkaTopics { } /** - * Get the Rya instance name from a Kafka topic name that has been used for a {@link QueryChnageLog}. + * Get the Rya instance name from a Kafka topic name that has been used for a {@link QueryChangeLog}. * <p/> * This is the inverse function of {@link #queryChangeLogTopic(String)}. * @@ -106,7 +106,7 @@ public class KafkaTopics { * @param partitions - The number of partitions that each of the topics will have. * @param replicationFactor - The replication factor of the topics that are created. */ - public static void createTopic( + public static void createTopics( final String zookeeperServers, final Set<String> topicNames, final int partitions, http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java index 7ab7e90..8093951 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java @@ -83,7 +83,7 @@ public class SingleThreadKafkaStreamsFactory implements KafkaStreamsFactory { try { final TopologyBuilder topologyBuilder = topologyFactory.build(query.getSparql(), statementsTopic, resultsTopic, new RandomUUIDFactory()); return new KafkaStreams(topologyBuilder, new StreamsConfig(streamsProps)); - } catch (MalformedQueryException | TopologyBuilderException e) { + } catch (final MalformedQueryException | TopologyBuilderException e) { throw new KafkaStreamsFactoryException("Could not create a KafkaStreams processing topology for query " + query.getQueryId(), e); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/CreateKafkaTopic.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/CreateKafkaTopic.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/CreateKafkaTopic.java new file mode 100644 index 0000000..771e1c8 --- /dev/null +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/CreateKafkaTopic.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.interactor; + +import static java.util.Objects.requireNonNull; + +import java.util.Set; + +import org.apache.rya.streams.kafka.KafkaTopics; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Creates topics in Kafka. + */ +@DefaultAnnotation(NonNull.class) +public class CreateKafkaTopic { + + private final String zookeeperServers; + + /** + * Constructs an instance of {@link CreateKafkaTopic}. + * + * @param zookeeperServers - The Zookeeper servers that are used to manage the Kafka instance. (not null) + */ + public CreateKafkaTopic(final String zookeeperServers) { + this.zookeeperServers = requireNonNull(zookeeperServers); + } + + /** + * Creates a set of Kafka topics for each topic that does not already exist. + * + * @param topicNames - The topics that will be created. (not null) + * @param partitions - The number of partitions that each of the topics will have. + * @param replicationFactor - The replication factor of the topics that are created. + */ + public void createTopics( + final Set<String> topicNames, + final int partitions, + final int replicationFactor) { + KafkaTopics.createTopics(zookeeperServers, topicNames, partitions, replicationFactor); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/main/README.txt ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/src/main/README.txt b/extras/rya.streams/query-manager/src/main/README.txt index 3b2dbfe..93d5ac5 100644 --- a/extras/rya.streams/query-manager/src/main/README.txt +++ b/extras/rya.streams/query-manager/src/main/README.txt @@ -41,8 +41,11 @@ Java 8 yum install -y ${project.artifactId}-${project.version}.noarch.rpm 3. Update the configuration file: - Replace "[Kafka Broker Hostname]" with the IP address of the Kafka broker. - Replace the Kafka port if using something other than the default of 9092. + A. Replace "[Zookeepers used to manage Kafka. E.g.: zoo1,zoo2,zoo3]" with + the zookeepers used to manage the Kafka cluster. It is a comma separated + list. + B. Replace "[Kafka Broker Hostname]" with the IP address of the Kafka broker. + C. Replace the Kafka port if using something other than the default of 9092. 4. Start the service: systemctl start rya-streams-query-manager.service http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/main/config/configuration.xml ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/src/main/config/configuration.xml b/extras/rya.streams/query-manager/src/main/config/configuration.xml index 96da501..7077125 100644 --- a/extras/rya.streams/query-manager/src/main/config/configuration.xml +++ b/extras/rya.streams/query-manager/src/main/config/configuration.xml @@ -18,7 +18,7 @@ specific language governing permissions and limitations under the License. --> <queryManagerConfig> - <!-- The Query Change Log Sources. The source defines a system where Rya + <!-- The Query Change Log Sources. The source defines a system where Rya - Streams Query Change Logs are managed. The query manager will manage - queries for all Rya instances whose change logs are stored within the - source. @@ -29,6 +29,14 @@ under the License. <port>9092</port> </kafka> </queryChangeLogSource> + + <!-- The Query Executor. The executor defines a system for executing the + Rya Streams queries. --> + <queryExecutor> + <localKafkaStreams> + <zookeepers>[Zookeepers used to manage Kafka. E.g.: zoo1,zoo2,zoo3]</zookeepers> + </localKafkaStreams> + </queryExecutor> <!-- This section defines performance related tuning values. Sensible - default have been provided to simplify configuration. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/main/config/log4j.xml ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/src/main/config/log4j.xml b/extras/rya.streams/query-manager/src/main/config/log4j.xml index 2021638..96ea56c 100644 --- a/extras/rya.streams/query-manager/src/main/config/log4j.xml +++ b/extras/rya.streams/query-manager/src/main/config/log4j.xml @@ -28,6 +28,23 @@ under the License. </layout> </appender> + <!-- Kafka configuration configs are loud. --> + <logger name="org.apache.kafka.streams.StreamsConfig"> + <level value="OFF"/> + </logger> + <logger name="org.apache.kafka.clients.consumer.ConsumerConfig"> + <level value="OFF"/> + </logger> + <logger name="org.apache.kafka.clients.producer.ProducerConfig"> + <level value="OFF"/> + </logger> + + <!-- Change this level to DEBUG to see more information about what the + QueryManager is doing. --> + <logger name="org.apache.rya.streams.querymanager.QueryManager"> + <level value="INFO"/> + </logger> + <root> <level value="INFO" /> <appender-ref ref="console" /> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryChangeLogSource.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryChangeLogSource.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryChangeLogSource.java index 73e5d12..eb5ca08 100644 --- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryChangeLogSource.java +++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryChangeLogSource.java @@ -51,8 +51,9 @@ public interface QueryChangeLogSource extends Service { public void unsubscribe(final SourceListener listener); /** - * A listener that is notified when a {@link QueryChangeLog} has - * been added or removed from a {@link QueryChangeLogSource}. + * A listener that is notified when a {@link QueryChangeLog} has been added or + * removed from a {@link QueryChangeLogSource}. The listener receives the only + * copy of the change log and is responsible for shutting it down. */ @DefaultAnnotation(NonNull.class) public interface SourceListener { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java index 30b4538..e6bd800 100644 --- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java +++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java @@ -1,18 +1,20 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.rya.streams.querymanager; @@ -20,16 +22,23 @@ import static java.util.Objects.requireNonNull; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.UUID; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.rya.streams.api.entity.StreamsQuery; import org.apache.rya.streams.api.queries.ChangeLogEntry; import org.apache.rya.streams.api.queries.InMemoryQueryRepository; import org.apache.rya.streams.api.queries.QueryChange; -import org.apache.rya.streams.api.queries.QueryChange.ChangeType; import org.apache.rya.streams.api.queries.QueryChangeLog; import org.apache.rya.streams.api.queries.QueryChangeLogListener; import org.apache.rya.streams.api.queries.QueryRepository; @@ -38,8 +47,10 @@ import org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.util.concurrent.AbstractIdleService; +import com.google.common.base.Preconditions; import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; +import com.google.common.util.concurrent.AbstractService; +import com.google.common.util.concurrent.UncheckedExecutionException; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; @@ -51,205 +62,823 @@ import edu.umd.cs.findbugs.annotations.NonNull; * instances/rya streams instances. */ @DefaultAnnotation(NonNull.class) -public class QueryManager extends AbstractIdleService { - private static final Logger LOG = LoggerFactory.getLogger(QueryManager.class); +public class QueryManager extends AbstractService { + private static final Logger log = LoggerFactory.getLogger(QueryManager.class); + + /** + * The source of {@link QueryChangeLog}s. Each log discovered is bound to a specific + * Rya instnace. + */ + private final QueryChangeLogSource changeLogSource; + /** + * The engine that is responsible for executing {@link StreamsQuery}s. + */ private final QueryExecutor queryExecutor; - private final Scheduler scheduler; /** - * Map of Rya Instance name to {@link QueryRepository}. + * How long blocking operations will be attempted before potentially trying again. */ - private final Map<String, QueryRepository> queryRepos = new HashMap<>(); + private final long blockingValue; - private final ReentrantLock lock = new ReentrantLock(); + /** + * The units for {@link #blockingValue}. + */ + private final TimeUnit blockingUnits; - private final QueryChangeLogSource source; + /** + * Used to inform threads that the application is shutting down, so they must stop work. + */ + private final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + + /** + * This thread pool manages the two thread used to work the {@link LogEvent}s + * and the {@link QueryEvent}s. + */ + private final ExecutorService executor = Executors.newFixedThreadPool(2); /** * Creates a new {@link QueryManager}. * * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not null) * @param source - The {@link QueryChangeLogSource} of QueryChangeLogs. (not null) - * @param scheduler - The {@link Scheduler} used to discover query changes - * within the {@link QueryChangeLog}s (not null) + * @param blockingValue - How long blocking operations will try before looping. (> 0) + * @param blockingUnits - The units of the {@code blockingValue}. (not null) */ - public QueryManager(final QueryExecutor queryExecutor, final QueryChangeLogSource source, final Scheduler scheduler) { - this.source = requireNonNull(source); + public QueryManager( + final QueryExecutor queryExecutor, + final QueryChangeLogSource source, + final long blockingValue, + final TimeUnit blockingUnits) { + this.changeLogSource = requireNonNull(source); this.queryExecutor = requireNonNull(queryExecutor); - this.scheduler = requireNonNull(scheduler); + Preconditions.checkArgument(blockingValue > 0, "The blocking value must be > 0. Was: " + blockingValue); + this.blockingValue = blockingValue; + this.blockingUnits = requireNonNull(blockingUnits); + } + + @Override + protected void doStart() { + log.info("Starting a QueryManager."); + + // A work queue of discovered Query Change Logs that need to be handled. + // This queue exists so that the source notifying thread may be released + // immediately instead of calling into blocking functions. + final BlockingQueue<LogEvent> logEvents = new ArrayBlockingQueue<>(1024); + + // A work queue of discovered Query Changes from the monitored Query Change Logs + // that need to be handled. This queue exists so that the Query Repository notifying + // thread may be released immediately instead of calling into blocking functions. + final BlockingQueue<QueryEvent> queryEvents = new ArrayBlockingQueue<>(1024); + + try { + // Start up a LogEventWorker using the executor service. + executor.submit(new LogEventWorker(logEvents, queryEvents, blockingValue, blockingUnits, shutdownSignal)); + + // Start up a QueryEvent Worker using the executor service. + executor.submit(new QueryEventWorker(queryEvents, queryExecutor, blockingValue, blockingUnits, shutdownSignal)); + + // Start up the query execution framework. + queryExecutor.startAndWait(); + + // Startup the source that discovers new Query Change Logs. + changeLogSource.startAndWait(); + + // Subscribe the source a listener that writes to the LogEventWorker's work queue. + changeLogSource.subscribe(new LogEventWorkGenerator(logEvents, blockingValue, blockingUnits, shutdownSignal)); + } catch(final RejectedExecutionException | UncheckedExecutionException e) { + log.error("Could not start up a QueryManager.", e); + notifyFailed(e); + } + + // Notify the service was successfully started. + notifyStarted(); + + log.info("QueryManager has finished starting."); + } + + @Override + protected void doStop() { + log.info("Stopping a QueryManager."); + + // Set the shutdown flag so that all components that rely on that signal will stop processing. + shutdownSignal.set(true); + + // Stop the workers and wait for them to die. + executor.shutdownNow(); + try { + if(!executor.awaitTermination(10, TimeUnit.SECONDS)) { + log.warn("Waited 10 seconds for the worker threads to die, but they are still running."); + } + } catch (final InterruptedException e) { + log.warn("Waited 10 seconds for the worker threads to die, but they are still running."); + } + + // Stop the source of new Change Logs. + try { + changeLogSource.stopAndWait(); + } catch(final UncheckedExecutionException e) { + log.warn("Could not stop the Change Log Source.", e); + } + + // Stop the query execution framework. + try { + queryExecutor.stopAndWait(); + } catch(final UncheckedExecutionException e) { + log.warn("Could not stop the Query Executor", e); + } + + // Notify the service was successfully stopped. + notifyStopped(); + + log.info("QueryManager has finished stopping."); } /** - * Starts running a query. + * Offer a unit of work to a blocking queue until it is either accepted, or the + * shutdown signal is set. * - * @param ryaInstanceName - The Rya instance the query belongs to. (not null) - * @param query - The query to run.(not null) + * @param workQueue - The blocking work queue to write to. (not null) + * @param event - The event that will be offered to the work queue. (not null) + * @param offerValue - How long to wait when offering new work. + * @param offerUnits - The unit for the {@code offerValue}. (not null) + * @param shutdownSignal - Used to signal application shutdown has started, so + * this method may terminate without ever placing the event on the queue. (not null) + * @return {@code true} if the evet nwas added to the queue, otherwise false. */ - private void runQuery(final String ryaInstanceName, final StreamsQuery query) { - requireNonNull(ryaInstanceName); - requireNonNull(query); - LOG.info("Starting Query: " + query.toString() + " on the rya instance: " + ryaInstanceName); + private static <T> boolean offerUntilAcceptedOrShutdown( + final BlockingQueue<T> workQueue, + final T event, + final long offerValue, + final TimeUnit offerUnits, + final AtomicBoolean shutdownSignal) { + requireNonNull(workQueue); + requireNonNull(event); + requireNonNull(shutdownSignal); - try { - queryExecutor.startQuery(ryaInstanceName, query); - } catch (final QueryExecutorException e) { - LOG.error("Failed to start query.", e); + boolean submitted = false; + while(!submitted && !shutdownSignal.get()) { + try { + submitted = workQueue.offer(event, offerValue, offerUnits); + if(!submitted) { + log.debug("An event could not be added to a work queue after waiting 5 seconds. Trying again..."); + } + } catch (final InterruptedException e) { + log.debug("An event could not be added to a work queue after waiting 5 seconds. Trying again..."); + } } + return submitted; } /** - * Stops the specified query from running. - * - * @param queryId - The ID of the query to stop running. (not null) + * An observation that a {@link QueryChangeLog} was created within or + * removed from a {@link QueryChangeLogSource}. */ - private void stopQuery(final UUID queryId) { - requireNonNull(queryId); + @DefaultAnnotation(NonNull.class) + static class LogEvent { + + /** + * The types of events that may be observed. + */ + static enum LogEventType { + /** + * A {@link QueryChangeLog} was created within a {@link QueryChangeLogSource}. + */ + CREATE, + + /** + * A {@link QueryChangeLog} was deleted from a {@link QueryChangeLogSource}. + */ + DELETE; + } - LOG.info("Stopping query: " + queryId.toString()); + private final String ryaInstance; + private final LogEventType eventType; + private final Optional<QueryChangeLog> log; - try { - queryExecutor.stopQuery(queryId); - } catch (final QueryExecutorException e) { - LOG.error("Failed to stop query.", e); + /** + * Constructs an instance of {@link LogEvent}. + * + * @param ryaInstance - The Rya Instance the log is/was for. (not null) + * @param eventType - The type of event that was observed. (not null) + * @param log - The log if this is a create event. (not null) + */ + private LogEvent(final String ryaInstance, final LogEventType eventType, final Optional<QueryChangeLog> log) { + this.ryaInstance = requireNonNull(ryaInstance); + this.eventType = requireNonNull(eventType); + this.log = requireNonNull(log); + } + + /** + * @return The Rya Instance whose log was either created or deleted. + */ + public String getRyaInstanceName() { + return ryaInstance; + } + + /** + * @return The type of event that was observed. + */ + public LogEventType getEventType() { + return eventType; + } + + /** + * @return The {@link QueryChangeLog} if this is a CREATE event. + */ + public Optional<QueryChangeLog> getQueryChangeLog() { + return log; + } + + @Override + public String toString() { + return "LogEvent {\n" + + " Rya Instance: " + ryaInstance + ",\n" + + " Event Type: " + eventType + "\n" + + "}"; + } + + /** + * Make a {@link LogEvent} that indicates a {@link QueryChangeLog} was created within a + * {@link QueryChangeLogSource}. + * + * @param ryaInstance - The Rya Instance the created log is for. (not null) + * @param log - The created {@link QueryChangeLog. (not null) + * @return A {@link LogEvent} built using the provided values. + */ + public static LogEvent create(final String ryaInstance, final QueryChangeLog log) { + return new LogEvent(ryaInstance, LogEventType.CREATE, Optional.of(log)); + } + + /** + * Make a {@link LogEvent} that indicates a {@link QueryChangeLog} was deleted from + * a {@link QueryChangeLogSource}. + * + * @param ryaInstance - The Rya Instance whose log was deleted. (not null) + * @return A {@link LogEvent} built using the provided values. + */ + public static LogEvent delete(final String ryaInstance) { + return new LogEvent(ryaInstance, LogEventType.DELETE, Optional.empty()); } } - @Override - protected void startUp() throws Exception { - lock.lock(); - try { - LOG.info("Starting Query Manager."); - queryExecutor.startAndWait(); - source.startAndWait(); + /** + * An observation that a {@link StreamsQuery} needs to be executing or not + * via the provided {@link QueryExecutor}. + */ + @DefaultAnnotation(NonNull.class) + static class QueryEvent { + + /** + * The type of events that may be observed. + */ + public static enum QueryEventType { + /** + * Indicates a {@link StreamsQuery} needs to be executing. + */ + EXECUTING, + + /** + * Indicates a {@link StreamsQuery} needs to be stopped. + */ + STOPPED, + + /** + * Indicates all {@link StreamsQuery}s for a Rya instance need to be stopped. + */ + STOP_ALL; + } + + private final String ryaInstance; + private final QueryEventType type; + private final Optional<UUID> queryId; + private final Optional<StreamsQuery> query; + + /** + * Constructs an instance of {@link QueryEvent}. + * + * @param ryaInstance - The Rya instance that generated the event. (not null) + * @param type - Indicates whether the query needs to be executing or not. (not null) + * @param queryId - If stopped, the ID of the query that must not be running. (not null) + * @param query - If executing, the StreamsQuery that defines what should be executing. (not null) + */ + private QueryEvent( + final String ryaInstance, + final QueryEventType type, + final Optional<UUID> queryId, + final Optional<StreamsQuery> query) { + this.ryaInstance = requireNonNull(ryaInstance); + this.type = requireNonNull(type); + this.queryId = requireNonNull(queryId); + this.query = requireNonNull(query); + } + + /** + * @return The Rya instance that generated the event. + */ + public String getRyaInstance() { + return ryaInstance; + } + + /** + * @return Indicates whether the query needs to be executing or not. + */ + public QueryEventType getType() { + return type; + } + + /** + * @return If stopped, the ID of the query that must not be running. Otherwise absent. + */ + public Optional<UUID> getQueryId() { + return queryId; + } + + /** + * @return If executing, the StreamsQuery that defines what should be executing. Otherwise absent. + */ + public Optional<StreamsQuery> getStreamsQuery() { + return query; + } - // subscribe to the sources to be notified of changes. - source.subscribe(new QueryManagerSourceListener()); - } finally { - lock.unlock(); + @Override + public int hashCode() { + return Objects.hash(ryaInstance, type, queryId, query); + } + + @Override + public boolean equals(final Object o) { + if(o instanceof QueryEvent) { + final QueryEvent other = (QueryEvent) o; + return Objects.equals(ryaInstance, other.ryaInstance) && + Objects.equals(type, other.type) && + Objects.equals(queryId, other.queryId) && + Objects.equals(query, other.query); + } + return false; + } + + @Override + public String toString() { + final StringBuilder string = new StringBuilder(); + string.append("Query Event {\n") + .append(" Rya Instance: ").append(ryaInstance).append(",\n") + .append(" Type: ").append(type).append(",\n"); + switch(type) { + case EXECUTING: + append(string, query.get()); + break; + case STOPPED: + string.append(" Query ID: ").append(queryId.get()).append("\n"); + break; + case STOP_ALL: + break; + default: + // Default to showing everything that is in the object. + string.append(" Query ID: ").append(queryId.get()).append("\n"); + append(string, query.get()); + break; + } + string.append("}"); + return string.toString(); + } + + private void append(final StringBuilder string, final StreamsQuery query) { + requireNonNull(string); + requireNonNull(query); + string.append(" Streams Query {\n") + .append(" Query ID: ").append(query.getQueryId()).append(",\n") + .append(" Is Active: ").append(query.isActive()).append(",\n") + .append(" SPARQL: ").append(query.getSparql()).append("\n") + .append(" }"); + } + + /** + * Create a {@link QueryEvent} that indicates a query needs to be executing. + * + * @param ryaInstance - The Rya instance that generated the event. (not null) + * @param query - The StreamsQuery that defines what should be executing. (not null) + * @return A {@link QueryEvent} built using the provided values. + */ + public static QueryEvent executing(final String ryaInstance, final StreamsQuery query) { + return new QueryEvent(ryaInstance, QueryEventType.EXECUTING, Optional.empty(), Optional.of(query)); + } + + /** + * Create a {@link QueryEvent} that indicates a query needs to be stopped. + * + * @param ryaInstance - The Rya instance that generated the event. (not null) + * @param queryId - The ID of the query that must not be running. (not null) + * @return A {@link QueryEvent} built using the provided values. + */ + public static QueryEvent stopped(final String ryaInstance, final UUID queryId) { + return new QueryEvent(ryaInstance, QueryEventType.STOPPED, Optional.of(queryId), Optional.empty()); + } + + /** + * Create a {@link QueryEvent} that indicates all queries for a Rya instance needs to be stopped. + * + * @param ryaInstance - The Rya instance that generated the event. (not null) + * @return A {@link QueryEvent} built using the provided values. + */ + public static QueryEvent stopALL(final String ryaInstance) { + return new QueryEvent(ryaInstance, QueryEventType.STOP_ALL, Optional.empty(), Optional.empty()); } } - @Override - protected void shutDown() throws Exception { - lock.lock(); - try { - LOG.info("Stopping Query Manager."); - source.stopAndWait(); - queryExecutor.stopAndWait(); - } finally { - lock.unlock(); + /** + * Listens to a {@link QueryChangeLogSource} and adds observations to the provided + * work queue. It does so until the provided shutdown signal is set. + */ + @DefaultAnnotation(NonNull.class) + static class LogEventWorkGenerator implements SourceListener { + + private final BlockingQueue<LogEvent> workQueue; + private final AtomicBoolean shutdownSignal; + private final long offerValue; + private final TimeUnit offerUnits; + + /** + * Constructs an instance of {@link QueryManagerSourceListener}. + * + * @param workQueue - A blocking queue that will have {@link LogEvent}s offered to it. (not null) + * @param offerValue - How long to wait when offering new work. + * @param offerUnits - The unit for the {@code offerValue}. (not null) + * @param shutdownSignal - Indicates to this listener that it needs to stop adding events + * to the work queue because the application is shutting down. (not null) + */ + public LogEventWorkGenerator( + final BlockingQueue<LogEvent> workQueue, + final long offerValue, + final TimeUnit offerUnits, + final AtomicBoolean shutdownSignal) { + this.workQueue = requireNonNull(workQueue); + this.shutdownSignal = requireNonNull(shutdownSignal); + this.offerValue = offerValue; + this.offerUnits = requireNonNull(offerUnits); + } + + @Override + public void notifyCreate(final String ryaInstanceName, final QueryChangeLog changeLog) { + log.info("A new Query Change Log has been discovered for Rya Instance " + ryaInstanceName + ". All " + + "queries that are set to active within it will be started."); + + // Create an event that summarizes this notification. + final LogEvent event = LogEvent.create(ryaInstanceName, changeLog); + + // Offer it to the worker until there is room for it in the work queue, or we are shutting down. + offerUntilAcceptedOrShutdown(workQueue, event, offerValue, offerUnits, shutdownSignal); + } + + @Override + public void notifyDelete(final String ryaInstanceName) { + log.info("The Query Change Log for Rya Instance " + ryaInstanceName + " has been deleted. All of the " + + "queries related to that instance will be stopped."); + + // Create an event that summarizes this notification. + final LogEvent event = LogEvent.delete(ryaInstanceName); + + // Offer it to the worker until there is room for it in the work queue, or we are shutting down. + offerUntilAcceptedOrShutdown(workQueue, event, offerValue, offerUnits, shutdownSignal); } } /** - * An implementation of {@link QueryChangeLogListener} for the - * {@link QueryManager}. - * <p> - * When notified of a {@link ChangeType} performs one of the following: - * <li>{@link ChangeType#CREATE}: Creates a new query using the - * {@link QueryExecutor} provided to the {@link QueryManager}</li> - * <li>{@link ChangeType#DELETE}: Deletes a running query by stopping the - * {@link QueryExecutor} service of the queryID in the event</li> - * <li>{@link ChangeType#UPDATE}: If the query is running and the update is - * to stop the query, stops the query. Otherwise, if the query is not - * running, it is removed.</li> + * Processes a work queue of {@link LogEvent}s. + * <p/> + * Whenever a new log has been created, then it registers a {@link QueryEventWorkGenerator} + * that generates {@link QueryEvent}s based on the content and updates to the discovered + * {@link QueryChagneLog}. + * <p/> + * Whenever a log is deleted, then the generator is stopped and a stop all {@link QueryEvent} + * is written to the work queue. */ - private class QueryExecutionForwardingListener implements QueryChangeLogListener { - private final String ryaInstanceName; + @DefaultAnnotation(NonNull.class) + static class LogEventWorker implements Runnable { /** - * Creates a new {@link QueryExecutionForwardingListener}. + * A map of Rya Instance name to he Query Repository for that instance. + */ + private final Map<String, QueryRepository> repos = new HashMap<>(); + + private final BlockingQueue<LogEvent> logWorkQueue; + private final BlockingQueue<QueryEvent> queryWorkQueue; + private final long blockingValue; + private final TimeUnit blockingUnits; + private final AtomicBoolean shutdownSignal; + + /** + * Constructs an instance of {@link LogEventWorker}. * - * @param ryaInstanceName - The rya instance the query change is - * performed on. (not null) + * @param logWorkQueue - A queue of {@link LogEvent}s that will be worked by this object. (not null) + * @param queryWorkQueue - A queue where {@link QueryEvent}s will be placed by this object. (not null) + * @param blockingValue - How long to wait when polling/offering new work. + * @param blockingUnits - The unit for the {@code blockingValue}. (not null) + * @param shutdownSignal - Indicates when the application has been shutdown, so the executing thread + * may exit the {@link #run()} method. (not null) */ - public QueryExecutionForwardingListener(final String ryaInstanceName) { - this.ryaInstanceName = requireNonNull(ryaInstanceName); + public LogEventWorker( + final BlockingQueue<LogEvent> logWorkQueue, + final BlockingQueue<QueryEvent> queryWorkQueue, + final long blockingValue, + final TimeUnit blockingUnits, + final AtomicBoolean shutdownSignal) { + this.logWorkQueue = requireNonNull(logWorkQueue); + this.queryWorkQueue = requireNonNull(queryWorkQueue); + this.blockingValue = blockingValue; + this.blockingUnits = requireNonNull(blockingUnits); + this.shutdownSignal = requireNonNull(shutdownSignal); } @Override - public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent, final Optional<StreamsQuery> newQueryState) { - LOG.debug("New query change event."); - final QueryChange entry = queryChangeEvent.getEntry(); + public void run() { + // Run until the shutdown signal is set. + while(!shutdownSignal.get()) { + try { + // Pull a unit of work from the queue. + log.debug("LogEventWorker - Polling the work queue for a new LogEvent."); + final LogEvent logEvent = logWorkQueue.poll(blockingValue, blockingUnits); + if(logEvent == null) { + // Poll again if nothing was found. + continue; + } - lock.lock(); - try { + log.info("LogEventWorker - handling: \n" + logEvent); + final String ryaInstance = logEvent.getRyaInstanceName(); - switch (entry.getChangeType()) { - case CREATE: - if(!newQueryState.isPresent()) { - LOG.error("The query with ID: " + entry.getQueryId() + " must be present with the change to be created."); - LOG.debug("newQueryState is not allowed to be absent with a CREATE QueryChange, there might be a bug in the QueryRepository."); - } else { - runQuery(ryaInstanceName, newQueryState.get()); - } - break; - case DELETE: - stopQuery(entry.getQueryId()); - break; - case UPDATE: - if (!newQueryState.isPresent()) { - LOG.error("The query with ID: " + entry.getQueryId() + " must be provided with the update, cannot perform update."); - LOG.debug("newQueryState is not allowed to be absent with a UPDATE QueryChange, there might be a bug in the QueryRepository."); - } else { - final StreamsQuery updatedQuery = newQueryState.get(); - if (updatedQuery.isActive()) { - runQuery(ryaInstanceName, updatedQuery); - LOG.info("Starting query: " + updatedQuery.toString()); - } else { - stopQuery(updatedQuery.getQueryId()); - LOG.info("Stopping query: " + updatedQuery.toString()); + switch(logEvent.getEventType()) { + case CREATE: + // If we see a create message for a Rya Instance we are already maintaining, + // then don't do anything. + if(repos.containsKey(ryaInstance)) { + log.warn("LogEventWorker - A repository is already being managed for the Rya Instance " + + ryaInstance + ". This message will be ignored."); + continue; } - } - break; + + // Create and start a QueryRepository for the discovered log. Hold onto the repository + // so that it may be shutdown later. + final Scheduler scheduler = Scheduler.newFixedRateSchedule(0, blockingValue, blockingUnits); + final QueryRepository repo = new InMemoryQueryRepository(logEvent.getQueryChangeLog().get(), scheduler); + repo.startAndWait(); + repos.put(ryaInstance, repo); + + // Subscribe a worker that adds the Query Events to the queryWorkQueue queue. + // A count down latch is used to ensure the returned set of queries are handled + // prior to any notifications from the repository. + final CountDownLatch subscriptionWorkFinished = new CountDownLatch(1); + final QueryEventWorkGenerator queryWorkGenerator = + new QueryEventWorkGenerator(ryaInstance, subscriptionWorkFinished, queryWorkQueue, + blockingValue, blockingUnits, shutdownSignal); + + log.debug("LogEventWorker - Setting up a QueryWorkGenerator..."); + final Set<StreamsQuery> queries = repo.subscribe(queryWorkGenerator); + log.debug("LogEventWorker - Finished setting up a QueryWorkGenerator."); + + // Handle the view of the queries within the repository as it existed when + // the subscription was registered. + queries.stream() + .forEach(query -> { + // Create a QueryEvent that represents the active state of the existing query. + final QueryEvent queryEvent = query.isActive() ? + QueryEvent.executing(ryaInstance, query) : QueryEvent.stopped(ryaInstance, query.getQueryId()); + log.debug("LogEventWorker - offering: " + queryEvent); + + // Offer it to the worker until there is room for it in the work queue, or we are shutting down. + offerUntilAcceptedOrShutdown(queryWorkQueue, queryEvent, blockingValue, blockingUnits, shutdownSignal); + }); + + // Indicate the subscription work is finished so that the registered listener may start + // adding work to the queue. + log.info("LogEventWorker - Counting down the subscription work latch."); + subscriptionWorkFinished.countDown(); + break; + + case DELETE: + if(repos.containsKey(ryaInstance)) { + // Shut down the query repository for the Rya instance. This ensures the listener will + // not receive any more work that needs to be done. + final QueryRepository deletedRepo = repos.remove(ryaInstance); + deletedRepo.stopAndWait(); + + // Add work that stops all of the queries related to the instance. + final QueryEvent stopAllEvent = QueryEvent.stopALL(ryaInstance); + offerUntilAcceptedOrShutdown(queryWorkQueue, stopAllEvent, blockingValue, blockingUnits, shutdownSignal); + } + break; + } + } catch (final InterruptedException e) { + log.debug("LogEventWorker did not see any new events over the past 5 seconds. Polling again..."); } - } finally { - lock.unlock(); } + + log.info("LogEventWorker shutting down..."); + + // Shutdown all of the QueryRepositories that were started. + repos.values().forEach(repo -> repo.stopAndWait()); + + log.info("LogEventWorker shut down."); } } /** - * Listener used by the {@link QueryManager} to be notified when - * {@link QueryChangeLog}s are created or deleted. + * Listens to a {@link QueryRepository} and adds observations to the provided work queue. + * It does so until the provided shutdown signal is set. */ - private class QueryManagerSourceListener implements SourceListener { + @DefaultAnnotation(NonNull.class) + static class QueryEventWorkGenerator implements QueryChangeLogListener { + + private final String ryaInstance; + private final CountDownLatch subscriptionWorkFinished; + private final BlockingQueue<QueryEvent> queryWorkQueue; + private final long blockingValue; + private final TimeUnit blockingUnits; + private final AtomicBoolean shutdownSignal; + + /** + * Constructs an instance of {@link QueryEventWorkGenerator}. + * + * @param ryaInstance - The rya instance whose log this objects is watching. (not null) + * @param subscriptionWorkFinished - Indicates when work that needs to be completed before this + * listener handles notifications is completed. (not null) + * @param queryWorkQueue - A queue where {@link QueryEvent}s will be placed by this object. (not null) + * @param blockingValue - How long to wait when polling/offering new work. + * @param blockingUnits - The unit for the {@code blockingValue}. (not null) + * @param shutdownSignal - Indicates to this listener that it needs to stop adding events + * to the work queue because the application is shutting down. (not null) + */ + public QueryEventWorkGenerator( + final String ryaInstance, + final CountDownLatch subscriptionWorkFinished, + final BlockingQueue<QueryEvent> queryWorkQueue, + final long blockingValue, + final TimeUnit blockingUnits, + final AtomicBoolean shutdownSignal) { + this.ryaInstance = requireNonNull(ryaInstance); + this.subscriptionWorkFinished = requireNonNull(subscriptionWorkFinished); + this.queryWorkQueue = requireNonNull(queryWorkQueue); + this.blockingValue = blockingValue; + this.blockingUnits = requireNonNull(blockingUnits); + this.shutdownSignal = requireNonNull(shutdownSignal); + } + @Override - public void notifyCreate(final String ryaInstanceName, final QueryChangeLog log) { - lock.lock(); + public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent, final Optional<StreamsQuery> newQueryState) { + requireNonNull(queryChangeEvent); + requireNonNull(newQueryState); + + // Wait for the subscription work to be finished. try { - LOG.info("Discovered new Query Change Log for Rya Instance " + ryaInstanceName + "."); - final QueryRepository repo = new InMemoryQueryRepository(log, scheduler); - repo.startAndWait(); - final Set<StreamsQuery> queries = repo.subscribe(new QueryExecutionForwardingListener(ryaInstanceName)); - queries.forEach(query -> { - if (query.isActive()) { - try { - queryExecutor.startQuery(ryaInstanceName, query); - } catch (IllegalStateException | QueryExecutorException e) { - LOG.error("Unable to start query for rya instance " + ryaInstanceName, e); + log.debug("Waiting for Subscription Work Finished latch to release..."); + while(!shutdownSignal.get() && !subscriptionWorkFinished.await(blockingValue, blockingUnits)) { + log.debug("Still waiting..."); + } + log.debug("Subscription Work Finished latch to released."); + } catch (final InterruptedException e) { + log.warn("Interrupted while waiting for the Subscription Work Finished latch to be " + + "released. Shutting down?", e); + } + + // If we left the loop because of a shutdown, return immediately. + if(shutdownSignal.get()) { + log.debug("Not processing notification. Shutting down."); + return; + } + + // Generate work from the notification. + final QueryChange change = queryChangeEvent.getEntry(); + switch(change.getChangeType()) { + case CREATE: + if(newQueryState.isPresent()) { + log.info("Rya Instance " + ryaInstance + " created Rya Streams query " + newQueryState + "."); + final StreamsQuery newQuery = newQueryState.get(); + if(newQuery.isActive()) { + final QueryEvent executeNewQuery = QueryEvent.executing(ryaInstance, newQuery); + offerUntilAcceptedOrShutdown(queryWorkQueue, executeNewQuery, blockingValue, blockingUnits, shutdownSignal); } + } else { + log.error("Received a CREATE QueryChange for Rya Instance: " + ryaInstance + + ", Query ID: " + change.getQueryId() + ", but the QueryRepository did not supply a " + + "StreamsQuery representing the created query. The query will not be processed."); } - }); - queryRepos.put(ryaInstanceName, repo); - } finally { - lock.unlock(); + break; + + case DELETE: + final UUID deletedQueryId = change.getQueryId(); + log.info("Rya Instance " + ryaInstance + " deleted Rya Streams query with ID " + deletedQueryId); + final QueryEvent stopDeletedQuery = QueryEvent.stopped(ryaInstance, deletedQueryId); + offerUntilAcceptedOrShutdown(queryWorkQueue, stopDeletedQuery, blockingValue, blockingUnits, shutdownSignal); + break; + + case UPDATE: + if(newQueryState.isPresent()) { + final StreamsQuery updatedQuery = newQueryState.get(); + if(updatedQuery.isActive()) { + log.info("Rya Instance " + ryaInstance + " updated Rya Streams query with ID " + + updatedQuery.getQueryId() + " to be active."); + final QueryEvent executeUpdatedQuery = QueryEvent.executing(ryaInstance, updatedQuery); + offerUntilAcceptedOrShutdown(queryWorkQueue, executeUpdatedQuery, blockingValue, blockingUnits, shutdownSignal); + } else { + log.info("Rya Instance " + ryaInstance + " updated Rya Streams query with ID " + + updatedQuery.getQueryId() + " to be inactive."); + final QueryEvent stopUpdatedQuery = QueryEvent.stopped(ryaInstance, updatedQuery.getQueryId()); + offerUntilAcceptedOrShutdown(queryWorkQueue, stopUpdatedQuery, blockingValue, blockingUnits, shutdownSignal); + } + } else { + log.error("Received an UPDATE QueryChange for Rya Instance: " + ryaInstance + + ", Query ID: " + change.getQueryId() + ", but the QueryRepository did not supply a " + + "StreamsQuery representing the created query. The query will not be processed."); + } + break; } } + } + + /** + * Processes a work queue of {@link QueryEvent}s. + * <p/> + * Each type of event maps the to corresponding method on {@link QueryExecutor} that is called into. + */ + @DefaultAnnotation(NonNull.class) + static class QueryEventWorker implements Runnable { + + private final BlockingQueue<QueryEvent> workQueue; + private final QueryExecutor queryExecutor; + private final long pollingValue; + private final TimeUnit pollingUnits; + private final AtomicBoolean shutdownSignal; + + /** + * Constructs an instance of {@link QueryEventWorker}. + * + * @param workQueue - A queue of {@link QueryEvent}s that will be worked by this object. (not null) + * @param queryExecutor - Responsible for executing the {@link StreamsQuery}s. (not null) + * @param pollingValue - How long to wait when polling for new work. + * @param pollingUnits - The units for the {@code pollingValue}. (not null) + * @param shutdownSignal - Indicates when the application has been shutdown, so the executing thread + * may exit the {@link #run()} method. (not null) + */ + public QueryEventWorker( + final BlockingQueue<QueryEvent> workQueue, + final QueryExecutor queryExecutor, + final long pollingValue, + final TimeUnit pollingUnits, + final AtomicBoolean shutdownSignal) { + this.workQueue = requireNonNull(workQueue); + this.queryExecutor = requireNonNull(queryExecutor); + this.pollingValue = pollingValue; + this.pollingUnits = requireNonNull(pollingUnits); + this.shutdownSignal = requireNonNull(shutdownSignal); + } @Override - public void notifyDelete(final String ryaInstanceName) { - lock.lock(); - try { - LOG.info("Notified of deleting QueryChangeLog, stopping all queries belonging to the change log for " - + ryaInstanceName + "."); - queryExecutor.stopAll(ryaInstanceName); - } catch (final QueryExecutorException e) { - LOG.error("Failed to stop all queries belonging to: " + ryaInstanceName, e); - } finally { - lock.unlock(); + public void run() { + log.info("QueryEventWorker starting."); + + // Run until the shutdown signal is set. + while(!shutdownSignal.get()) { + // Pull a unit of work from the queue. + try { + log.debug("Polling the work queue for a new QueryEvent."); + final QueryEvent event = workQueue.poll(pollingValue, pollingUnits); + if(event == null) { + // Poll again if nothing was found. + continue; + } + + log.info("QueryEventWorker handling:\n" + event); + + // Ensure the state within the executor matches the query event's state. + switch(event.getType()) { + case EXECUTING: + try { + queryExecutor.startQuery(event.getRyaInstance(), event.getStreamsQuery().get()); + } catch (final IllegalStateException | QueryExecutorException e) { + log.error("Could not start a query represented by the following work: " + event, e); + } + break; + + case STOPPED: + try { + queryExecutor.stopQuery(event.getQueryId().get()); + } catch (final IllegalStateException | QueryExecutorException e) { + log.error("Could not stop a query represented by the following work: " + event, e); + } + break; + + case STOP_ALL: + try { + queryExecutor.stopAll(event.getRyaInstance()); + } catch (final IllegalStateException | QueryExecutorException e) { + log.error("Could not stop all queries represented by the following work: " + event, e); + } + break; + } + } catch (final InterruptedException e) { + log.debug("QueryEventWorker interrupted. Probably shutting down."); + } } + log.info("QueryEventWorker shut down."); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java index 515d699..04a0382 100644 --- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java +++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java @@ -33,6 +33,7 @@ import org.apache.commons.daemon.DaemonContext; import org.apache.commons.daemon.DaemonInitException; import org.apache.rya.streams.kafka.KafkaStreamsFactory; import org.apache.rya.streams.kafka.SingleThreadKafkaStreamsFactory; +import org.apache.rya.streams.kafka.interactor.CreateKafkaTopic; import org.apache.rya.streams.querymanager.kafka.KafkaQueryChangeLogSource; import org.apache.rya.streams.querymanager.kafka.LocalQueryExecutor; import org.apache.rya.streams.querymanager.xml.Kafka; @@ -91,7 +92,7 @@ public class QueryManagerDaemon implements Daemon { // Unmarshall the configuration file into an object. final QueryManagerConfig config; - try(InputStream stream = Files.newInputStream(configFile)) { + try(final InputStream stream = Files.newInputStream(configFile)) { config = QueryManagerConfigUnmarshaller.unmarshall(stream); } catch(final JAXBException | SAXException e) { throw new DaemonInitException("Unable to marshall the configuration XML file: " + configFile, e); @@ -110,11 +111,12 @@ public class QueryManagerDaemon implements Daemon { final QueryChangeLogSource source = new KafkaQueryChangeLogSource(kafka.getHostname(), kafka.getPort(), scheduler); // Initialize a QueryExecutor. + final String zookeeperServers = config.getQueryExecutor().getLocalKafkaStreams().getZookeepers(); final KafkaStreamsFactory streamsFactory = new SingleThreadKafkaStreamsFactory(kafka.getHostname() + ":" + kafka.getPort()); - final QueryExecutor queryExecutor = new LocalQueryExecutor(streamsFactory); + final QueryExecutor queryExecutor = new LocalQueryExecutor(new CreateKafkaTopic(zookeeperServers), streamsFactory); // Initialize the QueryManager using the configured resources. - manager = new QueryManager(queryExecutor, source, scheduler); + manager = new QueryManager(queryExecutor, source, period, units); } @Override http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1cd8db32/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java index 32305f5..e746baf 100644 --- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java +++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java @@ -33,11 +33,12 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.rya.streams.api.queries.QueryChangeLog; import org.apache.rya.streams.kafka.KafkaTopics; import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog; import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory; import org.apache.rya.streams.querymanager.QueryChangeLogSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.collect.Sets; import com.google.common.util.concurrent.AbstractScheduledService; @@ -53,6 +54,8 @@ import edu.umd.cs.findbugs.annotations.NonNull; @DefaultAnnotation(NonNull.class) public class KafkaQueryChangeLogSource extends AbstractScheduledService implements QueryChangeLogSource { + private static final Logger log = LoggerFactory.getLogger(KafkaQueryChangeLogSource.class); + /** * Ensures thread safe interactions with this object. */ @@ -74,10 +77,10 @@ public class KafkaQueryChangeLogSource extends AbstractScheduledService implemen private final Set<SourceListener> listeners = new HashSet<>(); /** - * Maps Rya instance name to a Query Change Log for that instance. This map is used to keep - * track of how the change logs change over time within the Kafka Server. + * Maps Rya instance names to the Query Change Log topic name in Kafka. This map is used to + * keep track of how the change logs change over time within the Kafka Server. */ - private final HashMap<String, QueryChangeLog> knownChangeLogs = new HashMap<>(); + private final HashMap<String, String> knownChangeLogs = new HashMap<>(); /** * A consumer that is used to poll the Kafka Server for topics. @@ -101,6 +104,8 @@ public class KafkaQueryChangeLogSource extends AbstractScheduledService implemen @Override protected void startUp() throws Exception { + log.info("Kafka Query Change Log Source watching " + kafkaBootstrapServer + " starting up..."); + // Setup the consumer that is used to list topics for the source. final Properties consumerProperties = new Properties(); consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServer); @@ -108,17 +113,23 @@ public class KafkaQueryChangeLogSource extends AbstractScheduledService implemen consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); listTopicsConsumer = new KafkaConsumer<>(consumerProperties); + + log.info("Kafka Query Change Log Source watching " + kafkaBootstrapServer + " started."); } @Override protected void shutDown() throws Exception { - // Shut down the consumer that's used to list topics. - listTopicsConsumer.close(); + log.info("Kafka Query Change Log Source watching " + kafkaBootstrapServer + " shutting down..."); - // Shut down all of the change logs that were created within this class. - for(final QueryChangeLog changeLog : knownChangeLogs.values()) { - changeLog.close(); + lock.lock(); + try { + // Shut down the consumer that's used to list topics. + listTopicsConsumer.close(); + } finally { + lock.unlock(); } + + log.info("Kafka Query Change Log Source watching " + kafkaBootstrapServer + " shut down."); } @Override @@ -130,8 +141,10 @@ public class KafkaQueryChangeLogSource extends AbstractScheduledService implemen listeners.add(listener); // Notify it with everything that already exists. - for(final Entry<String, QueryChangeLog> entry : knownChangeLogs.entrySet()) { - listener.notifyCreate(entry.getKey(), entry.getValue()); + for(final Entry<String, String> entry : knownChangeLogs.entrySet()) { + final String changeLogTopic = entry.getValue(); + final KafkaQueryChangeLog changeLog = KafkaQueryChangeLogFactory.make(kafkaBootstrapServer, changeLogTopic); + listener.notifyCreate(entry.getKey(), changeLog); } } finally { lock.unlock(); @@ -174,26 +187,23 @@ public class KafkaQueryChangeLogSource extends AbstractScheduledService implemen // Handle the deletes. for(final String deletedRyaInstance : deletedRyaInstances) { // Remove the change log from the set of known logs. - final QueryChangeLog removed = knownChangeLogs.remove(deletedRyaInstance); + knownChangeLogs.remove(deletedRyaInstance); - // Notify the listeners of the update. + // Notify the listeners of the update so that they may close the previously provided change log. for(final SourceListener listener : listeners) { listener.notifyDelete(deletedRyaInstance); } - - // Ensure the change log is closed. - removed.close(); } // Handle the adds. for(final String createdRyaInstance : createdRyaInstances) { // Create and store the ChangeLog. final String changeLogTopic = KafkaTopics.queryChangeLogTopic(createdRyaInstance); - final KafkaQueryChangeLog changeLog = KafkaQueryChangeLogFactory.make(kafkaBootstrapServer, changeLogTopic); - knownChangeLogs.put(createdRyaInstance, changeLog); + knownChangeLogs.put(createdRyaInstance, changeLogTopic); // Notify the listeners of the update. for(final SourceListener listener : listeners) { + final KafkaQueryChangeLog changeLog = KafkaQueryChangeLogFactory.make(kafkaBootstrapServer, changeLogTopic); listener.notifyCreate(createdRyaInstance, changeLog); } }