http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5adda982/extras/rya.streams/client/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/pom.xml b/extras/rya.streams/client/pom.xml index 38c3c86..3529537 100644 --- a/extras/rya.streams/client/pom.xml +++ b/extras/rya.streams/client/pom.xml @@ -105,6 +105,12 @@ under the License. <artifactId>rya.test.kafka</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.api</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> </dependencies> <build>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5adda982/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryAndLoadStatementsStreamsIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryAndLoadStatementsStreamsIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryAndLoadStatementsStreamsIT.java new file mode 100644 index 0000000..bf58817 --- /dev/null +++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/AddQueryAndLoadStatementsStreamsIT.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.client.command; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.rya.api.function.projection.RandomUUIDFactory; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; +import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer; +import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer; +import org.apache.rya.streams.kafka.topology.TopologyFactory; +import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.apache.rya.test.kafka.KafkaTestUtil; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; + +/** + * Integration Test for adding a new query through a command, loading data from + * a file, and then starting up the streams program. + */ +public class AddQueryAndLoadStatementsStreamsIT { + private static final Path LUBM_FILE = Paths.get("src/test/resources/lubm-1uni-withschema.nt"); + private static final String LUBM_PREFIX = "http://swat.cse.lehigh.edu/onto/univ-bench.owl#"; + private static final int LUBM_EXPECTED_RESULTS_COUNT = 1874; + + private final String ryaInstance = UUID.randomUUID().toString(); + private QueryRepository queryRepo; + + @Rule + public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true); + + @Before + public void setup() { + // Make sure the topic that the change log uses exists. + final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance); + kafka.createTopic(changeLogTopic); + + // Setup the QueryRepository used by the test. + final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class); + final Consumer<?, QueryChange> queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class); + final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic); + queryRepo = new InMemoryQueryRepository(changeLog, Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS)); + } + + @Test + public void testLubm() throws Exception { + // Arguments that add a query to Rya Streams. + final String query = + "PREFIX lubm: <" + LUBM_PREFIX + "> \n" + + "SELECT * WHERE \n" + + "{ \n" + + " ?graduateStudent a lubm:GraduateStudent . \n" + + " ?underGradUniversity a lubm:University . \n" + + " ?graduateStudent lubm:undergraduateDegreeFrom ?underGradUniversity . \n" + + "}"; + + final String query2 = + "PREFIX lubm: <" + LUBM_PREFIX + "> \n" + + "SELECT * WHERE \n" + + "{ \n" + + " ?graduateStudent a lubm:GraduateStudent . \n" + + " ?underGradUniversity a lubm:University . \n" + + " ?graduateStudent lubm:undergraduateDegreeFrom ?underGradUniversity . \n" + + "}"; + + final String[] addArgs = new String[] { + "--ryaInstance", "" + ryaInstance, + "--kafkaHostname", kafka.getKafkaHostname(), + "--kafkaPort", kafka.getKafkaPort(), + "--query", query, + "--isActive", "true", + "--isInsert", "false" + }; + + final String[] addArgs2 = new String[] { + "--ryaInstance", "" + ryaInstance, + "--kafkaHostname", kafka.getKafkaHostname(), + "--kafkaPort", kafka.getKafkaPort(), + "--query", query2, + "--isActive", "true", + "--isInsert", "false" + }; + + // Execute the command. + final AddQueryCommand command = new AddQueryCommand(); + command.execute(addArgs); + // Add the same query twice to confirm that joins aren't being performed + // across both queries. + command.execute(addArgs2); + + // Show that the query was added to the Query Repository. + final Set<StreamsQuery> queries = queryRepo.list(); + assertEquals(2, queries.size()); + final StreamsQuery streamsQuery = queries.iterator().next(); + final UUID queryId = streamsQuery.getQueryId(); + assertEquals(query, queries.iterator().next().getSparql()); + + // Load a file of statements into Kafka. + final String visibilities = ""; + final String[] loadArgs = new String[] { + "--ryaInstance", "" + ryaInstance, + "--kafkaHostname", kafka.getKafkaHostname(), + "--kafkaPort", kafka.getKafkaPort(), + "--statementsFile", LUBM_FILE.toString(), + "--visibilities", visibilities + }; + + // Load the file of statements into the Statements topic. + new LoadStatementsCommand().execute(loadArgs); + + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, queryId); + + final TopologyFactory factory = new TopologyFactory(); + final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory()); + + // Start the streams program. + final Properties props = kafka.createBootstrapServerConfig(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + final AtomicReference<String> errorMessage = new AtomicReference<>(); + final KafkaStreams streams = new KafkaStreams(builder, new StreamsConfig(props)); + streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(final Thread thread, final Throwable throwable) { + final String stackTrace = ExceptionUtils.getStackTrace(throwable); + errorMessage.getAndSet("Kafka Streams threw an uncaught exception in thread (" + thread.getName() + "): " + stackTrace); + } + }); + streams.cleanUp(); + try { + streams.start(); + + // Wait for the streams application to start. Streams only see data after their consumers are connected. + Thread.sleep(6000); + + // Wait for the final results to appear in the output topic and verify the expected Binding Sets were found. + try(Consumer<String, VisibilityBindingSet> consumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, VisibilityBindingSetDeserializer.class)) { + // Register the topic. + consumer.subscribe(Arrays.asList(resultsTopic)); + + // Poll for the result. + final Set<VisibilityBindingSet> results = Sets.newHashSet( KafkaTestUtil.pollForResults(500, 2 * LUBM_EXPECTED_RESULTS_COUNT, LUBM_EXPECTED_RESULTS_COUNT, consumer) ); + + System.out.println("LUBM Query Results Count: " + results.size()); + // Show the correct binding sets results from the job. + assertEquals(LUBM_EXPECTED_RESULTS_COUNT, results.size()); + } + } finally { + streams.close(); + } + + if (StringUtils.isNotBlank(errorMessage.get())) { + fail(errorMessage.get()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5adda982/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java index 21a8e4c..7c661d2 100644 --- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java +++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java @@ -20,8 +20,12 @@ package org.apache.rya.streams.client.command; import static org.junit.Assert.assertEquals; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -63,6 +67,9 @@ import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; * Integration tests the methods of {@link RunQueryCommand}. */ public class RunQueryCommandIT { + private static final Path LUBM_FILE = Paths.get("src/test/resources/lubm-1uni-withschema.nt"); + private static final String LUBM_PREFIX = "http://swat.cse.lehigh.edu/onto/univ-bench.owl#"; + private static final int LUBM_EXPECTED_RESULTS_COUNT = 1874; private final String ryaInstance = UUID.randomUUID().toString(); @@ -81,7 +88,7 @@ public class RunQueryCommandIT { // Setup the QueryRepository used by the test. final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class); - final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class); + final Consumer<?, QueryChange> queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class); final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic); queryRepo = new InMemoryQueryRepository(changeLog, Scheduler.newFixedRateSchedule(0L, 5, TimeUnit.SECONDS)); @@ -134,7 +141,7 @@ public class RunQueryCommandIT { final RunQueryCommand command = new RunQueryCommand(); try { command.execute(args); - } catch (ArgumentsException | ExecutionException e) { + } catch (final ArgumentsException | ExecutionException e) { // Do nothing. Test will still fail because the expected results will be missing. } } @@ -196,4 +203,74 @@ public class RunQueryCommandIT { // Show the read results matched the expected ones. assertEquals(expected, results); } + + @Test + public void runQueryFromFile() throws Exception { + // TODO: Maybe test with org.apache.rya.api.utils.LubmQuery.LUBM_QUERY_1,2,3,etc... + // NOTE: the order of the query statements previously lead to join + // issues. When "lubm:undergraduateDegreeFrom" was the first statement + // in the where clause (as opposed to the last) the + // KeyValueJoinStateStore blew up. + // (This issue appears to have been resolved now though, but is tested + // here) + final String query = + "PREFIX lubm: <" + LUBM_PREFIX + "> \n" + + "SELECT * WHERE \n" + + "{ \n" + + " ?graduateStudent lubm:undergraduateDegreeFrom ?underGradUniversity . \n" + + " ?graduateStudent a lubm:GraduateStudent . \n" + + " ?underGradUniversity a lubm:University . \n" + + "}"; + + // Register a query with the Query Repository. + final StreamsQuery sQuery = queryRepo.add(query, true, true); + + // Arguments that run the query we just registered with Rya Streams. + final String[] args = new String[] { + "--ryaInstance", "" + ryaInstance, + "--kafkaHostname", kafka.getKafkaHostname(), + "--kafkaPort", kafka.getKafkaPort(), + "--queryID", sQuery.getQueryId().toString(), + "--zookeepers", kafka.getZookeeperServers() + }; + + // Create a new Thread that runs the command. + final Thread commandThread = new Thread() { + @Override + public void run() { + final RunQueryCommand command = new RunQueryCommand(); + try { + command.execute(args); + } catch (final ArgumentsException | ExecutionException e) { + // Do nothing. Test will still fail because the expected results will be missing. + } + } + }; + + // Execute the test. This will result in a set of results that were read from the results topic. + final Set<VisibilityBindingSet> results = new HashSet<>(); + try { + // Wait for the program to start. + commandThread.start(); + Thread.sleep(5000); + + // Write some statements to the program. + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final LoadStatements loadStatements = new KafkaLoadStatements(statementsTopic, stmtProducer); + loadStatements.fromFile(LUBM_FILE, ""); + + // Read the output of the streams program. + final String resultsTopic = KafkaTopics.queryResultsTopic(ryaInstance, sQuery.getQueryId()); + resultConsumer.subscribe( Lists.newArrayList(resultsTopic) ); + results.addAll(KafkaTestUtil.pollForResults(500, 2 * LUBM_EXPECTED_RESULTS_COUNT, LUBM_EXPECTED_RESULTS_COUNT, resultConsumer)); + } finally { + // Tear down the test. + commandThread.interrupt(); + commandThread.join(3000); + } + + System.out.println("LUBM Query Results Count: " + results.size()); + // Show the read results matched the expected ones. + assertEquals(LUBM_EXPECTED_RESULTS_COUNT, results.size()); + } } \ No newline at end of file