RYA-456 Implement a Single Node implementation of QueryExecutor.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/a11ca4a2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/a11ca4a2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/a11ca4a2 Branch: refs/heads/master Commit: a11ca4a2b1fd73ba754b1c2495b061e67c06ab0e Parents: a31e256 Author: kchilton2 <[email protected]> Authored: Fri Jan 26 15:55:59 2018 -0500 Committer: Valiyil <[email protected]> Committed: Fri Mar 9 12:59:45 2018 -0500 ---------------------------------------------------------------------- .../client/command/RunQueryCommandIT.java | 3 +- .../rya/streams/kafka/KafkaStreamsFactory.java | 57 ++++ .../kafka/SingleThreadKafkaStreamsFactory.java | 90 ++++++ .../rya/streams/querymanager/QueryExecutor.java | 19 +- .../querymanager/kafka/LocalQueryExecutor.java | 187 ++++++++++++ .../kafka/LocalQueryExecutorIT.java | 148 +++++++++ .../kafka/LocalQueryExecutorTest.java | 299 +++++++++++++++++++ 7 files changed, 795 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a11ca4a2/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java index 7e3b8bc..5d63f32 100644 --- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java +++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java @@ -91,7 +91,8 @@ public class RunQueryCommandIT { } @After - public void cleanup() throws Exception{ + public void cleanup() throws Exception { + queryRepo.stopAndWait(); stmtProducer.close(); resultConsumer.close(); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a11ca4a2/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaStreamsFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaStreamsFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaStreamsFactory.java new file mode 100644 index 0000000..bd8ff1e --- /dev/null +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaStreamsFactory.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka; + +import org.apache.kafka.streams.KafkaStreams; +import org.apache.rya.streams.api.entity.StreamsQuery; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Builds {@link KafkaStreams} objects that are able to process a specific {@link StreamsQuery}. + */ +@DefaultAnnotation(NonNull.class) +public interface KafkaStreamsFactory { + + /** + * Builds a {@link KafkaStreams} object that is able to process a specific {@link StreamsQuery}. + * + * @param ryaInstance - The Rya Instance the streams job is for. (not null) + * @param query - Defines the query that will be executed. (not null) + * @return A {@link KafkaStreams} object that will process the provided query. + * @throws KafkaStreamsFactoryException Unable to create a {@link KafkaStreams} object from the provided values. + */ + public KafkaStreams make(String ryaInstance, StreamsQuery query) throws KafkaStreamsFactoryException; + + /** + * A {@link KafkaStreamsFactory} could not create a {@link KafkaStreams} object. + */ + public static class KafkaStreamsFactoryException extends Exception { + private static final long serialVersionUID = 1L; + + public KafkaStreamsFactoryException(final String message) { + super(message); + } + + public KafkaStreamsFactoryException(final String message, final Throwable cause) { + super(message, cause); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a11ca4a2/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java new file mode 100644 index 0000000..7ab7e90 --- /dev/null +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka; + +import static java.util.Objects.requireNonNull; + +import java.util.Properties; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.rya.api.function.projection.RandomUUIDFactory; +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.kafka.topology.TopologyBuilderFactory; +import org.apache.rya.streams.kafka.topology.TopologyBuilderFactory.TopologyBuilderException; +import org.apache.rya.streams.kafka.topology.TopologyFactory; +import org.openrdf.query.MalformedQueryException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Creates {@link KafkaStreams} objects that are able to process {@link StreamsQuery}s + * using a single thread of execution starting from the earliest point in within the + * input topic. The Application ID used by the client is based on the Query ID of the + * query that is being executed so that this job may resume where it left off if it + * is stopped. + */ +@DefaultAnnotation(NonNull.class) +public class SingleThreadKafkaStreamsFactory implements KafkaStreamsFactory { + + private final TopologyBuilderFactory topologyFactory = new TopologyFactory(); + + private final String bootstrapServersConfig; + + /** + * Constructs an instance of {@link SingleThreadKafkaStreamsFactory}. + * + * @param bootstrapServersConfig - Configures which Kafka cluster the jobs will interact with. (not null) + */ + public SingleThreadKafkaStreamsFactory(final String bootstrapServersConfig) { + this.bootstrapServersConfig = requireNonNull(bootstrapServersConfig); + } + + @Override + public KafkaStreams make(final String ryaInstance, final StreamsQuery query) throws KafkaStreamsFactoryException { + requireNonNull(ryaInstance); + requireNonNull(query); + + // Setup the Kafka Stream program. + final Properties streamsProps = new Properties(); + + // Configure the Kafka servers that will be talked to. + streamsProps.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig); + + // Use the Query ID as the Application ID to ensure we resume where we left off the last time this command was run. + streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "RyaStreams-Query-" + query.getQueryId()); + + // Always start at the beginning of the input topic. + streamsProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + // Setup the topology that processes the Query. + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(query.getQueryId()); + + try { + final TopologyBuilder topologyBuilder = topologyFactory.build(query.getSparql(), statementsTopic, resultsTopic, new RandomUUIDFactory()); + return new KafkaStreams(topologyBuilder, new StreamsConfig(streamsProps)); + } catch (MalformedQueryException | TopologyBuilderException e) { + throw new KafkaStreamsFactoryException("Could not create a KafkaStreams processing topology for query " + query.getQueryId(), e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a11ca4a2/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryExecutor.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryExecutor.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryExecutor.java index 4572f08..bcee796 100644 --- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryExecutor.java +++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryExecutor.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -35,36 +35,41 @@ import edu.umd.cs.findbugs.annotations.NonNull; */ @DefaultAnnotation(NonNull.class) public interface QueryExecutor extends Service { + /** * Starts running a {@link StreamsQuery}. * * @param ryaInstanceName - The rya instance whose {@link Statement}s will be processed by the query. (not null) * @param query - The query to run. (not null) * @throws QueryExecutorException When the query fails to start. + * @throws IllegalStateException The service has not been started yet. */ - public void startQuery(final String ryaInstanceName, final StreamsQuery query) throws QueryExecutorException; + public void startQuery(final String ryaInstanceName, final StreamsQuery query) throws QueryExecutorException, IllegalStateException; /** * Stops a {@link StreamsQuery}. * * @param queryID - The ID of the query to stop. (not null) * @throws QueryExecutorException When the query fails to stop. + * @throws IllegalStateException The service has not been started yet. */ - public void stopQuery(final UUID queryID) throws QueryExecutorException; + public void stopQuery(final UUID queryID) throws QueryExecutorException, IllegalStateException; /** * Stops all {@link StreamsQuery} belonging to a specific rya instance. * * @param ryaInstanceName - The name of the rya instance to stop all queries for. (not null) * @throws QueryExecutorException When the queries fails to stop. + * @throws IllegalStateException The service has not been started yet. */ - public void stopAll(final String ryaInstanceName) throws QueryExecutorException; + public void stopAll(final String ryaInstanceName) throws QueryExecutorException, IllegalStateException; /** - * @return - A set of {@link UUID}s representing the current active queries. + * @return A set of {@link UUID}s representing the current active queries. * @throws QueryExecutorException Can't discover which queries are currently running. + * @throws IllegalStateException The service has not been started yet. */ - public Set<UUID> getRunningQueryIds() throws QueryExecutorException; + public Set<UUID> getRunningQueryIds() throws QueryExecutorException, IllegalStateException; /** * Exception to be used by {@link QueryExecutor} when queries fail to start or stop. @@ -100,4 +105,4 @@ public interface QueryExecutor extends Service { super(cause); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a11ca4a2/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java new file mode 100644 index 0000000..947a215 --- /dev/null +++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutor.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager.kafka; + +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.kafka.streams.KafkaStreams; +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.kafka.KafkaStreamsFactory; +import org.apache.rya.streams.kafka.KafkaStreamsFactory.KafkaStreamsFactoryException; +import org.apache.rya.streams.querymanager.QueryExecutor; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import com.google.common.util.concurrent.AbstractIdleService; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import kafka.consumer.KafkaStream; + +/** + * A {@link QueryExecutor} that runs a {@link KafkaStreams} job within its own JVM every + * time {@link #startQuery(String, StreamsQuery)} is invoked. + * <p/> + * This executor may run out of JVM resources if it is used to execute too many queries. + */ +@DefaultAnnotation(NonNull.class) +public class LocalQueryExecutor extends AbstractIdleService implements QueryExecutor { + + /** + * Provides thread safety when interacting with this class. + */ + public static ReentrantLock lock = new ReentrantLock(); + + /** + * Lookup the Rya Instance of a specific Query Id. + */ + private final Map<UUID, String> ryaInstanceById = new HashMap<>(); + + /** + * Lookup the Query IDs that are running for a specific Rya Instance. + */ + private final Multimap<String, UUID> idByRyaInstance = HashMultimap.create(); + + /** + * Lookup the executing {@link KafkaStreams} job for a running Query Id. + */ + private final Map<UUID, KafkaStreams> byQueryId = new HashMap<>(); + + /** + * Builds the {@link KafkaStreams} objects that execute {@link KafkaStream}s. + */ + private final KafkaStreamsFactory streamsFactory; + + /** + * Constructs an instance of {@link LocalQueryExecutor}. + * + * @param streamsFactory - Builds the {@link KafkaStreams} objects that execute {@link KafkaStream}s. (not null) + */ + public LocalQueryExecutor(final KafkaStreamsFactory streamsFactory) { + this.streamsFactory = requireNonNull(streamsFactory); + } + + @Override + protected void startUp() throws Exception { + // Nothing to do. + } + + @Override + protected void shutDown() throws Exception { + // Stop all of the running queries. + for(final KafkaStreams job : byQueryId.values()) { + job.close(); + } + } + + @Override + public void startQuery(final String ryaInstance, final StreamsQuery query) throws QueryExecutorException { + requireNonNull(ryaInstance); + requireNonNull(query); + checkState(state() == State.RUNNING, "The service must be RUNNING to execute this method."); + + lock.lock(); + try { + // Setup the Kafka Streams job that will execute. + final KafkaStreams streams = streamsFactory.make(ryaInstance, query); + streams.start(); + + // Mark which Rya Instance the Query ID is for. + ryaInstanceById.put(query.getQueryId(), ryaInstance); + + // Add the Query ID to the collection of running queries for the Rya instance. + idByRyaInstance.put(ryaInstance, query.getQueryId()); + + // Add the running Kafka Streams job for the Query ID. + byQueryId.put(query.getQueryId(), streams); + + } catch (final KafkaStreamsFactoryException e) { + throw new QueryExecutorException("Could not start query " + query.getQueryId(), e); + } finally { + lock.unlock(); + } + } + + @Override + public void stopQuery(final UUID queryId) throws QueryExecutorException { + requireNonNull(queryId); + checkState(state() == State.RUNNING, "The service must be RUNNING to execute this method."); + + lock.lock(); + try { + if(byQueryId.containsKey(queryId)) { + // Stop the job from running. + final KafkaStreams streams = byQueryId.get(queryId); + streams.close(); + + // Remove it from the Rya Instance Name lookup. + final String ryaInstance = ryaInstanceById.remove(queryId); + + // Remove it from the collection of running queries for the Rya Instance. + idByRyaInstance.remove(ryaInstance, queryId); + + // Remove it from the running Kafka Streams job lookup. + byQueryId.remove(queryId); + } + } finally { + lock.unlock(); + } + } + + @Override + public void stopAll(final String ryaInstanceName) throws QueryExecutorException { + requireNonNull(ryaInstanceName); + checkState(state() == State.RUNNING, "The service must be RUNNING to execute this method."); + + lock.lock(); + try { + if(idByRyaInstance.containsKey(ryaInstanceName)) { + // A defensive copy of the queries so that we may remove them from the maps. + final Set<UUID> queryIds = new HashSet<>( idByRyaInstance.get(ryaInstanceName) ); + + // Stop each of them. + for(final UUID queryId : queryIds) { + stopQuery(queryId); + } + } + } finally { + lock.unlock(); + } + } + + @Override + public Set<UUID> getRunningQueryIds() throws QueryExecutorException { + lock.lock(); + checkState(state() == State.RUNNING, "The service must be RUNNING to execute this method."); + + try { + return new HashSet<>( byQueryId.keySet() ); + } finally { + lock.unlock(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a11ca4a2/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java new file mode 100644 index 0000000..3cbe894 --- /dev/null +++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager.kafka; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.model.VisibilityStatement; +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.interactor.LoadStatements; +import org.apache.rya.streams.kafka.KafkaStreamsFactory; +import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.SingleThreadKafkaStreamsFactory; +import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; +import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer; +import org.apache.rya.streams.querymanager.QueryExecutor; +import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.apache.rya.test.kafka.KafkaTestUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.impl.MapBindingSet; + +import com.google.common.collect.Lists; + +/** + * Integration tests the methods of {@link LocalQueryExecutor}. + */ +public class LocalQueryExecutorIT { + + private final String ryaInstance = UUID.randomUUID().toString(); + + private Producer<String, VisibilityStatement> stmtProducer = null; + private Consumer<String, VisibilityBindingSet> resultConsumer = null; + + @Rule + public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(false); + + @Before + public void setup() { + // Make sure the topic that the change log uses exists. + final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance); + kafka.createTopic(changeLogTopic); + + // Initialize the Statements Producer and the Results Consumer. + stmtProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityStatementSerializer.class); + resultConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, VisibilityBindingSetDeserializer.class); + } + + @After + public void cleanup() throws Exception { + stmtProducer.close(); + resultConsumer.close(); + } + + @Test + public void runQuery() throws Exception { + // Test values. + final String ryaInstance = "rya"; + final StreamsQuery sQuery = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?person <urn:worksAt> ?business . }", true); + + // Create the statements that will be loaded. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add(new VisibilityStatement(vf.createStatement( + vf.createURI("urn:Alice"), + vf.createURI("urn:worksAt"), + vf.createURI("urn:BurgerJoint")), "a")); + statements.add(new VisibilityStatement(vf.createStatement( + vf.createURI("urn:Bob"), + vf.createURI("urn:worksAt"), + vf.createURI("urn:TacoShop")), "a")); + statements.add(new VisibilityStatement(vf.createStatement( + vf.createURI("urn:Charlie"), + vf.createURI("urn:worksAt"), + vf.createURI("urn:TacoShop")), "a")); + + // Create the expected results. + final List<VisibilityBindingSet> expected = new ArrayList<>(); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("business", vf.createURI("urn:BurgerJoint")); + expected.add(new VisibilityBindingSet(bs, "a")); + bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Bob")); + bs.addBinding("business", vf.createURI("urn:TacoShop")); + expected.add(new VisibilityBindingSet(bs, "a")); + bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Charlie")); + bs.addBinding("business", vf.createURI("urn:TacoShop")); + expected.add(new VisibilityBindingSet(bs, "a")); + + // Start the executor that will be tested. + final String kafkaServers = kafka.getKafkaHostname() + ":" + kafka.getKafkaPort(); + final KafkaStreamsFactory jobFactory = new SingleThreadKafkaStreamsFactory(kafkaServers); + final QueryExecutor executor = new LocalQueryExecutor(jobFactory); + executor.startAndWait(); + try { + // Start the query. + executor.startQuery(ryaInstance, sQuery); + + // Wait for the program to start. + Thread.sleep(5000); + + // Write some statements to the program. + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final LoadStatements loadStatements = new KafkaLoadStatements(statementsTopic, stmtProducer); + loadStatements.fromCollection(statements); + + // Read the output of the streams program. + final String resultsTopic = KafkaTopics.queryResultsTopic(sQuery.getQueryId()); + resultConsumer.subscribe( Lists.newArrayList(resultsTopic) ); + final List<VisibilityBindingSet> results = KafkaTestUtil.pollForResults(500, 6, 3, resultConsumer); + assertEquals(expected, results); + + } finally { + executor.stopAndWait(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a11ca4a2/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java new file mode 100644 index 0000000..0df5794 --- /dev/null +++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java @@ -0,0 +1,299 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager.kafka; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Set; +import java.util.UUID; + +import org.apache.kafka.streams.KafkaStreams; +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.kafka.KafkaStreamsFactory; +import org.apache.rya.streams.querymanager.QueryExecutor; +import org.junit.Test; + +import com.google.common.collect.Sets; + +/** + * Unit tests the methods of {@link LocalQueryExecutor}. + */ +public class LocalQueryExecutorTest { + + @Test(expected = IllegalStateException.class) + public void startQuery_serviceNotStarted() throws Exception { + final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class)); + executor.startQuery("rya", new StreamsQuery(UUID.randomUUID(), "query", true)); + } + + @Test + public void startQuery() throws Exception { + // Test values. + final String ryaInstance = "rya"; + final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true); + + // Mock the streams factory so that we can tell if the start function is invoked by the executor. + final KafkaStreamsFactory jobFactory = mock(KafkaStreamsFactory.class); + final KafkaStreams queryJob = mock(KafkaStreams.class); + when(jobFactory.make(eq(ryaInstance), eq(query))).thenReturn(queryJob); + + // Start the executor that will be tested. + final QueryExecutor executor = new LocalQueryExecutor(jobFactory); + executor.startAndWait(); + try { + // Tell the executor to start the query. + executor.startQuery(ryaInstance, query); + + // Show a job was started for that query's ID. + verify(queryJob).start(); + } finally { + executor.stopAndWait(); + } + } + + @Test(expected = IllegalStateException.class) + public void stopQuery_serviceNotStarted() throws Exception { + final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class)); + executor.stopQuery(UUID.randomUUID()); + } + + @Test + public void stopQuery_queryNotRunning() throws Exception { + // Start an executor. + final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class)); + executor.startAndWait(); + try { + // Try to stop a query that was never stareted. + executor.stopQuery(UUID.randomUUID()); + } finally { + executor.stopAndWait(); + } + } + + @Test + public void stopQuery() throws Exception { + // Test values. + final String ryaInstance = "rya"; + final StreamsQuery query = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true); + + // Mock the streams factory so that we can tell if the stop function is invoked by the executor. + final KafkaStreamsFactory jobFactory = mock(KafkaStreamsFactory.class); + final KafkaStreams queryJob = mock(KafkaStreams.class); + when(jobFactory.make(eq(ryaInstance), eq(query))).thenReturn(queryJob); + + // Start the executor that will be tested. + final QueryExecutor executor = new LocalQueryExecutor(jobFactory); + executor.startAndWait(); + try { + // Tell the executor to start the query. + executor.startQuery(ryaInstance, query); + + // Tell the executor to stop the query. + executor.stopQuery(query.getQueryId()); + + // Show a job was stopped for that query's ID. + verify(queryJob).close(); + } finally { + executor.stopAndWait(); + } + } + + @Test(expected = IllegalStateException.class) + public void stopAll_serviceNotStarted() throws Exception { + final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class)); + executor.stopAll("rya"); + } + + @Test + public void stopAll_noneForThatRyaInstance() throws Exception { + // Test values. + final String ryaInstance = "rya"; + final StreamsQuery query1= new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true); + final StreamsQuery query2= new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true); + + // Mock the streams factory so that we can tell if the stop function is invoked by the executor. + final KafkaStreamsFactory jobFactory = mock(KafkaStreamsFactory.class); + final KafkaStreams queryJob1 = mock(KafkaStreams.class); + final KafkaStreams queryJob2 = mock(KafkaStreams.class); + when(jobFactory.make(eq(ryaInstance), eq(query1))).thenReturn(queryJob1); + when(jobFactory.make(eq(ryaInstance), eq(query2))).thenReturn(queryJob2); + + // Start the executor that will be tested. + final QueryExecutor executor = new LocalQueryExecutor(jobFactory); + executor.startAndWait(); + try { + // Tell the executor to start the queries. + executor.startQuery(ryaInstance, query1); + executor.startQuery(ryaInstance, query2); + + // Verify both are running. + verify(queryJob1).start(); + verify(queryJob2).start(); + + // Tell the executor to stop queries running under rya2. + executor.stopAll("someOtherRyaInstance"); + + // Show none of the queries were stopped. + verify(queryJob1, never()).close(); + verify(queryJob2, never()).close(); + + } finally { + executor.stopAndWait(); + } + } + + @Test + public void stopAll() throws Exception { + // Test values. + final String ryaInstance1 = "rya1"; + final StreamsQuery query1= new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true); + final String ryaInstance2 = "rya2"; + final StreamsQuery query2= new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true); + + // Mock the streams factory so that we can tell if the stop function is invoked by the executor. + final KafkaStreamsFactory jobFactory = mock(KafkaStreamsFactory.class); + final KafkaStreams queryJob1 = mock(KafkaStreams.class); + when(jobFactory.make(eq(ryaInstance1), eq(query1))).thenReturn(queryJob1); + final KafkaStreams queryJob2 = mock(KafkaStreams.class); + when(jobFactory.make(eq(ryaInstance2), eq(query2))).thenReturn(queryJob2); + + // Start the executor that will be tested. + final QueryExecutor executor = new LocalQueryExecutor(jobFactory); + executor.startAndWait(); + try { + // Tell the executor to start the queries. + executor.startQuery(ryaInstance1, query1); + executor.startQuery(ryaInstance2, query2); + + // Verify both are running. + verify(queryJob1).start(); + verify(queryJob2).start(); + + // Tell the executor to stop queries running under rya2. + executor.stopAll(ryaInstance2); + + // Show the first query is still running, but the second isn't. + verify(queryJob1, never()).close(); + verify(queryJob2).close(); + + } finally { + executor.stopAndWait(); + } + } + + @Test(expected = IllegalStateException.class) + public void getRunningQueryIds_serviceNotStarted() throws Exception { + final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class)); + executor.getRunningQueryIds(); + } + + @Test + public void getRunningQueryIds_noneStarted() throws Exception { + // Start an executor. + final QueryExecutor executor = new LocalQueryExecutor(mock(KafkaStreamsFactory.class)); + executor.startAndWait(); + try { + // Get the list of running queries. + final Set<UUID> runningQueries = executor.getRunningQueryIds(); + + // Show no queries are reported as running. + assertTrue(runningQueries.isEmpty()); + } finally { + executor.stopAndWait(); + } + } + + @Test + public void getRunningQueryIds_noneStopped() throws Exception { + // Test values. + final String ryaInstance = "rya"; + final StreamsQuery query1 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true); + final StreamsQuery query2 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true); + final StreamsQuery query3 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true); + + // Mock the streams factory so that we can figure out what is started. + final KafkaStreamsFactory jobFactory = mock(KafkaStreamsFactory.class); + when(jobFactory.make(eq(ryaInstance), eq(query1))).thenReturn(mock(KafkaStreams.class)); + when(jobFactory.make(eq(ryaInstance), eq(query2))).thenReturn(mock(KafkaStreams.class)); + when(jobFactory.make(eq(ryaInstance), eq(query3))).thenReturn(mock(KafkaStreams.class)); + + // Start the executor that will be tested. + final QueryExecutor executor = new LocalQueryExecutor(jobFactory); + executor.startAndWait(); + try { + // Start the queries. + executor.startQuery(ryaInstance, query1); + executor.startQuery(ryaInstance, query2); + executor.startQuery(ryaInstance, query3); + + // All of those query IDs should be reported as running. + final Set<UUID> expected = Sets.newHashSet( + query1.getQueryId(), + query2.getQueryId(), + query3.getQueryId()); + assertEquals(expected, executor.getRunningQueryIds()); + + } finally { + executor.stopAndWait(); + } + } + + @Test + public void getRunningQueryIds_stoppedNoLongerListed() throws Exception { + // Test values. + final String ryaInstance = "rya"; + final StreamsQuery query1 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true); + final StreamsQuery query2 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true); + final StreamsQuery query3 = new StreamsQuery(UUID.randomUUID(), "SELECT * WHERE { ?a ?b ?c. }", true); + + // Mock the streams factory so that we can figure out what is started. + final KafkaStreamsFactory jobFactory = mock(KafkaStreamsFactory.class); + when(jobFactory.make(eq(ryaInstance), eq(query1))).thenReturn(mock(KafkaStreams.class)); + when(jobFactory.make(eq(ryaInstance), eq(query2))).thenReturn(mock(KafkaStreams.class)); + when(jobFactory.make(eq(ryaInstance), eq(query3))).thenReturn(mock(KafkaStreams.class)); + + // Start the executor that will be tested. + final QueryExecutor executor = new LocalQueryExecutor(jobFactory); + executor.startAndWait(); + try { + // Start the queries. + executor.startQuery(ryaInstance, query1); + executor.startQuery(ryaInstance, query2); + executor.startQuery(ryaInstance, query3); + + // Stop the second query. + executor.stopQuery(query2.getQueryId()); + + // Only the first and third queries are running. + final Set<UUID> expected = Sets.newHashSet( + query1.getQueryId(), + query3.getQueryId()); + assertEquals(expected, executor.getRunningQueryIds()); + + } finally { + executor.stopAndWait(); + } + } +} \ No newline at end of file
