RYA-377 Added Construct query support to Rya Streams.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/da63fd12 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/da63fd12 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/da63fd12 Branch: refs/heads/master Commit: da63fd125e16779df3536b4172fa66e36561e4ff Parents: 538393f Author: kchilton2 <[email protected]> Authored: Tue Nov 21 18:49:13 2017 -0500 Committer: caleb <[email protected]> Committed: Tue Jan 9 15:13:01 2018 -0500 ---------------------------------------------------------------------- .../projection/MultiProjectionEvaluator.java | 4 +- .../projection/ProjectionEvaluator.java | 5 +- .../kafka/topology/TopologyBuilderFactory.java | 40 ++- .../streams/kafka/topology/TopologyFactory.java | 178 +++++++++++--- .../apache/rya/streams/kafka/KafkaTestUtil.java | 18 +- .../processors/StatementPatternProcessorIT.java | 18 +- .../kafka/processors/join/JoinProcessorIT.java | 243 ++++--------------- .../projection/MultiProjectionProcessorIT.java | 87 +------ .../projection/ProjectionProcessorIT.java | 42 +--- .../kafka/topology/TopologyFactoryTest.java | 32 ++- 10 files changed, 280 insertions(+), 387 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/da63fd12/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/MultiProjectionEvaluator.java ---------------------------------------------------------------------- diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/MultiProjectionEvaluator.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/MultiProjectionEvaluator.java index e2b7046..0e9093d 100644 --- a/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/MultiProjectionEvaluator.java +++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/MultiProjectionEvaluator.java @@ -57,9 +57,9 @@ public class MultiProjectionEvaluator { /** * Constructs an instance of {@link MultiProjection}. * - * @param projections - The {@link ProjectionEvaluators} that handle each projection within the multi. (not null) + * @param projections - The {@link ProjectionEvaluators} that handle each projection within the MultiProjection. (not null) * @param blankNodeSourceNames - If there are blank nodes in the projection, this is a set of their names - * so that they may be re-label to have the same node IDs. (not null) + * so that they may be re-labeled to have the same node IDs. (not null) * @param bNodeIdFactory - Creates the IDs for Blank Nodes. (not null) */ public MultiProjectionEvaluator( http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/da63fd12/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/ProjectionEvaluator.java ---------------------------------------------------------------------- diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/ProjectionEvaluator.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/ProjectionEvaluator.java index a0b59c1..4b37448 100644 --- a/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/ProjectionEvaluator.java +++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/ProjectionEvaluator.java @@ -179,7 +179,10 @@ public class ProjectionEvaluator { } } - result.addBinding(elem.getTargetName(), value); + // Only add the value if there is one. There may not be one if a binding is optional. + if(value != null) { + result.addBinding(elem.getTargetName(), value); + } } return new VisibilityBindingSet(result, bs.getVisibility()); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/da63fd12/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyBuilderFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyBuilderFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyBuilderFactory.java index 9e9dd92..666cbb0 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyBuilderFactory.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyBuilderFactory.java @@ -19,8 +19,8 @@ package org.apache.rya.streams.kafka.topology; import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.rya.api.function.projection.BNodeIdFactory; import org.openrdf.query.MalformedQueryException; -import org.openrdf.query.algebra.TupleExpr; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; @@ -32,17 +32,33 @@ import edu.umd.cs.findbugs.annotations.NonNull; public interface TopologyBuilderFactory { /** - * Builds a {@link TopologyBuilder} based on the provided sparql query where - * each {@link TupleExpr} in the parsed query is a processor in the - * topology. + * Builds a {@link TopologyBuilder} based on the provided SPARQL query that + * pulls from {@code statementsTopic} for input and writes the query's results + * to {@code resultsTopic}. * * @param sparqlQuery - The SPARQL query to build a topology for. (not null) - * @param statementTopic - The topic for the source to read from. (not null) - * @param statementTopic - The topic for the sink to write to. (not null) - * @return - The created {@link TopologyBuilder}. - * @throws MalformedQueryException - The provided query is not a valid - * SPARQL query. + * @param statementsTopic - The topic for the source to read from. (not null) + * @param resultsTopic - The topic for the sink to write to. (not null) + * @param bNodeIdFactory - A factory that generates Blank Node IDs if any are required. (not null) + * @return The created {@link TopologyBuilder}. + * @throws MalformedQueryException - The provided query is not a valid SPARQL query. + * @throws TopologyBuilderException - A problem occurred while constructing the topology. */ - public TopologyBuilder build(final String sparqlQuery, final String statementTopic, final String resultTopic) - throws Exception; -} + public TopologyBuilder build( + final String sparqlQuery, + final String statementsTopic, + final String resultsTopic, + final BNodeIdFactory bNodeIdFactory) throws MalformedQueryException, TopologyBuilderException; + + /** + * An Exception thrown when a problem occurs when constructing the processor + * topology in the {@link TopologyFactory}. + */ + public static class TopologyBuilderException extends Exception { + private static final long serialVersionUID = 1L; + + public TopologyBuilderException(final String message, final Throwable cause) { + super(message, cause); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/da63fd12/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java index 782a58b..08f3625 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java @@ -30,6 +30,7 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.processor.Processor; @@ -40,6 +41,9 @@ import org.apache.kafka.streams.state.Stores; import org.apache.rya.api.function.join.IterativeJoin; import org.apache.rya.api.function.join.LeftOuterJoin; import org.apache.rya.api.function.join.NaturalJoin; +import org.apache.rya.api.function.projection.BNodeIdFactory; +import org.apache.rya.api.function.projection.MultiProjectionEvaluator; +import org.apache.rya.api.function.projection.ProjectionEvaluator; import org.apache.rya.api.model.VisibilityBindingSet; import org.apache.rya.streams.kafka.processors.ProcessorResult; import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult; @@ -48,17 +52,22 @@ import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult; import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier; import org.apache.rya.streams.kafka.processors.join.JoinProcessorSupplier; import org.apache.rya.streams.kafka.processors.output.BindingSetOutputFormatterSupplier; +import org.apache.rya.streams.kafka.processors.output.StatementOutputFormatterSupplier; +import org.apache.rya.streams.kafka.processors.projection.MultiProjectionProcessorSupplier; import org.apache.rya.streams.kafka.processors.projection.ProjectionProcessorSupplier; import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerde; import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer; import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer; +import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer; import org.openrdf.query.MalformedQueryException; import org.openrdf.query.algebra.BinaryTupleOperator; import org.openrdf.query.algebra.Extension; import org.openrdf.query.algebra.Join; import org.openrdf.query.algebra.LeftJoin; +import org.openrdf.query.algebra.MultiProjection; import org.openrdf.query.algebra.Projection; import org.openrdf.query.algebra.QueryModelNode; +import org.openrdf.query.algebra.Reduced; import org.openrdf.query.algebra.StatementPattern; import org.openrdf.query.algebra.TupleExpr; import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; @@ -85,34 +94,28 @@ public class TopologyFactory implements TopologyBuilderFactory { private List<ProcessorEntry> processorEntryList; - /** - * Builds a {@link TopologyBuilder} based on the provided sparql query. - * - * @param sparqlQuery - The SPARQL query to build a topology for. (not null) - * @param statementTopic - The topic for the source to read from. (not null) - * @param resultTopic - The topic for the sink to write to. (not null) - * @return - The created {@link TopologyBuilder}. - * @throws MalformedQueryException - The provided query is not a valid SPARQL query. - * @throws TopologyBuilderException - A problem occurred while constructing the topology. - */ @Override - public TopologyBuilder build(final String sparqlQuery, final String statementTopic, final String resultTopic) + public TopologyBuilder build( + final String sparqlQuery, + final String statementsTopic, + final String resultsTopic, + final BNodeIdFactory bNodeIdFactory) throws MalformedQueryException, TopologyBuilderException { requireNonNull(sparqlQuery); - requireNonNull(statementTopic); - requireNonNull(resultTopic); + requireNonNull(statementsTopic); + requireNonNull(resultsTopic); final ParsedQuery parsedQuery = new SPARQLParser().parseQuery(sparqlQuery, null); final TopologyBuilder builder = new TopologyBuilder(); final TupleExpr expr = parsedQuery.getTupleExpr(); - final QueryVisitor visitor = new QueryVisitor(); + final QueryVisitor visitor = new QueryVisitor(bNodeIdFactory); expr.visit(visitor); processorEntryList = visitor.getProcessorEntryList(); final Map<TupleExpr, String> idMap = visitor.getIDs(); // add source node - builder.addSource(SOURCE, new StringDeserializer(), new VisibilityStatementDeserializer(), statementTopic); + builder.addSource(SOURCE, new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic); // processing the processor entry list in reverse order means we go from leaf // nodes -> parent nodes. @@ -146,11 +149,12 @@ public class TopologyFactory implements TopologyBuilderFactory { } } - // convert processing results to visibility binding sets - builder.addProcessor("OUTPUT_FORMATTER", new BindingSetOutputFormatterSupplier(), entry.getID()); + // Add a formatter that converts the ProcessorResults into the output format. + final SinkEntry<?,?> sinkEntry = visitor.getSinkEntry(); + builder.addProcessor("OUTPUT_FORMATTER", sinkEntry.getFormatterSupplier(), entry.getID()); - // add sink - builder.addSink(SINK, resultTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "OUTPUT_FORMATTER"); + // Add the sink. + builder.addSink(SINK, resultsTopic, sinkEntry.getKeySerializer(), sinkEntry.getValueSerializer(), "OUTPUT_FORMATTER"); return builder; } @@ -264,16 +268,82 @@ public class TopologyFactory implements TopologyBuilderFactory { } /** + * Information about how key/value pairs need to be written to the sink. + * + * @param <K> - The type of Key that the sink uses. + * @param <V> - The type of Value that the sink uses. + */ + private final static class SinkEntry<K, V> { + + private final ProcessorSupplier<Object, ProcessorResult> formatterSupplier; + private final Serializer<K> keySerializer; + private final Serializer<V> valueSerializer; + + /** + * Constructs an instance of {@link SinkEntry}. + * + * @param formatterSupplier - Formats {@link ProcessingResult}s for output to the sink. (not null) + * @param keySerializer - Serializes keys that are used to write to the sink. (not null) + * @param valueSerializer - Serializes values that are used to write to the sink. (not null) + */ + public SinkEntry( + final ProcessorSupplier<Object, ProcessorResult> formatterSupplier, + final Serializer<K> keySerializer, + final Serializer<V> valueSerializer) { + this.keySerializer = requireNonNull(keySerializer); + this.valueSerializer = requireNonNull(valueSerializer); + this.formatterSupplier = requireNonNull(formatterSupplier); + } + + /** + * @return Formats {@link ProcessingResult}s for output to the sink. + */ + public ProcessorSupplier<Object, ProcessorResult> getFormatterSupplier() { + return formatterSupplier; + } + + /** + * @return Serializes keys that are used to write to the sink. + */ + public Serializer<K> getKeySerializer() { + return keySerializer; + } + + /** + * @return Serializes values that are used to write to the sink. + */ + public Serializer<V> getValueSerializer() { + return valueSerializer; + } + } + + /** * Visits each node in a {@link TupleExpr} and creates a * {@link ProcessorSupplier} and meta information needed for creating a * {@link TopologyBuilder}. */ final static class QueryVisitor extends QueryModelVisitorBase<TopologyBuilderException> { - // Each node needs a ProcessorEntry to be a processor node in the - // TopologyBuilder. + // Each node needs a ProcessorEntry to be a processor node in the TopologyBuilder. private final List<ProcessorEntry> entries = new ArrayList<>(); private final Map<TupleExpr, String> idMap = new HashMap<>(); + // Default to a Binding Set outputting sink entry. + private SinkEntry<?, ?> sinkEntry = new SinkEntry<>( + new BindingSetOutputFormatterSupplier(), + new StringSerializer(), + new VisibilityBindingSetSerializer()); + + private final BNodeIdFactory bNodeIdFactory; + + /** + * Constructs an instance of {@link QueryVisitor}. + * + * @param bNodeIdFactory - Builds Blank Node IDs for the query's results. (not null) + */ + public QueryVisitor(final BNodeIdFactory bNodeIdFactory) { + this.bNodeIdFactory = requireNonNull(bNodeIdFactory); + } + /** * @return The {@link ProcessorEntry}s used to create a Topology. */ @@ -288,6 +358,23 @@ public class TopologyFactory implements TopologyBuilderFactory { return idMap; } + /** + * @return Information about how values are to be output by the topology to the results sink. + */ + public SinkEntry<?, ?> getSinkEntry() { + return sinkEntry; + } + + @Override + public void meet(final Reduced node) throws TopologyBuilderException { + // This indicates we're outputting VisibilityStatements. + sinkEntry = new SinkEntry<>( + new StatementOutputFormatterSupplier(), + new StringSerializer(), + new VisibilityStatementSerializer()); + super.meet(node); + } + @Override public void meet(final StatementPattern node) throws TopologyBuilderException { // topology parent for Statement Patterns will always be a source @@ -303,14 +390,39 @@ public class TopologyFactory implements TopologyBuilderFactory { public void meet(final Projection node) throws TopologyBuilderException { final String id = PROJECTION_PREFIX + UUID.randomUUID(); final Optional<Side> side = getSide(node); - TupleExpr arg = node.getArg(); + // If the arg is an Extension, there are rebindings that need to be // ignored since they do not have a processor node. - if (arg instanceof Extension) { - arg = ((Extension) arg).getArg(); + TupleExpr downstreamNode = node.getArg(); + if (downstreamNode instanceof Extension) { + downstreamNode = ((Extension) downstreamNode).getArg(); + } + + final ProjectionProcessorSupplier supplier = new ProjectionProcessorSupplier( + ProjectionEvaluator.make(node), + result -> getResult(side, result)); + + entries.add(new ProcessorEntry(node, id, side, supplier, Lists.newArrayList(downstreamNode))); + idMap.put(node, id); + super.meet(node); + } + + @Override + public void meet(final MultiProjection node) throws TopologyBuilderException { + final String id = PROJECTION_PREFIX + UUID.randomUUID(); + final Optional<Side> side = getSide(node); + + final MultiProjectionProcessorSupplier supplier = new MultiProjectionProcessorSupplier( + MultiProjectionEvaluator.make(node, bNodeIdFactory), + result -> getResult(side, result)); + + // If the arg is an Extension, then this node's grandchild is the next processing node. + TupleExpr downstreamNode = node.getArg(); + if (downstreamNode instanceof Extension) { + downstreamNode = ((Extension) downstreamNode).getArg(); } - final ProjectionProcessorSupplier supplier = new ProjectionProcessorSupplier(node.getProjectionElemList(), result -> getResult(side, result)); - entries.add(new ProcessorEntry(node, id, side, supplier, Lists.newArrayList(arg))); + + entries.add(new ProcessorEntry(node, id, side, supplier, Lists.newArrayList(downstreamNode))); idMap.put(node, id); super.meet(node); } @@ -399,16 +511,4 @@ public class TopologyFactory implements TopologyBuilderFactory { } } } - - /** - * An Exception thrown when a problem occurs when constructing the processor - * topology in the {@link TopologyFactory}. - */ - public class TopologyBuilderException extends Exception { - private static final long serialVersionUID = 1L; - - public TopologyBuilderException(final String message, final Throwable cause) { - super(message, cause); - } - } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/da63fd12/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java index 0a1a8a4..8898284 100644 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java @@ -42,10 +42,8 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.TopologyBuilder; -import org.apache.rya.api.model.VisibilityBindingSet; import org.apache.rya.api.model.VisibilityStatement; import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements; -import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer; import org.apache.rya.test.kafka.KafkaTestInstanceRule; @@ -144,29 +142,34 @@ public final class KafkaTestUtil { * Runs a Kafka Streams topology, loads statements into the input topic, read the binding sets that come out of * the results topic, and ensures the expected results match the read results. * - * @param kafka - The embedded kafka instance that is being tested with. (not null) + * @param <T> The type of value that will be consumed from the results topic. + * @param kafka - The embedded Kafka instance that is being tested with. (not null) * @param statementsTopic - The topic statements will be written to. (not null) * @param resultsTopic - The topic results will be read from. (not null) * @param builder - The streams topology that will be executed. (not null) * @param startupMs - How long to wait for the topology to start before writing the statements. * @param statements - The statements that will be loaded into the topic. (not null) * @param expected - The expected results. (not null) + * @param expectedDeserializerClass - The class of the deserializer that will be used when reading + * values from the results topic. (not null) * @throws Exception If any exception was thrown while running the test. */ - public static void runStreamProcessingTest( + public static <T> void runStreamProcessingTest( final KafkaTestInstanceRule kafka, final String statementsTopic, final String resultsTopic, final TopologyBuilder builder, final int startupMs, final List<VisibilityStatement> statements, - final Set<VisibilityBindingSet> expected) throws Exception { + final Set<T> expected, + final Class<? extends Deserializer<T>> expectedDeserializerClass) throws Exception { requireNonNull(kafka); requireNonNull(statementsTopic); requireNonNull(resultsTopic); requireNonNull(builder); requireNonNull(statements); requireNonNull(expected); + requireNonNull(expectedDeserializerClass); // Explicitly create the topics that are being used. kafka.createTopic(statementsTopic); @@ -191,13 +194,12 @@ public final class KafkaTestUtil { } // Wait for the final results to appear in the output topic and verify the expected Binding Sets were found. - try(Consumer<String, VisibilityBindingSet> consumer = KafkaTestUtil.fromStartConsumer( - kafka, StringDeserializer.class, VisibilityBindingSetDeserializer.class)) { + try(Consumer<String, T> consumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, expectedDeserializerClass)) { // Register the topic. consumer.subscribe(Arrays.asList(resultsTopic)); // Poll for the result. - final Set<VisibilityBindingSet> results = Sets.newHashSet( KafkaTestUtil.pollForResults(500, 6, expected.size(), consumer) ); + final Set<T> results = Sets.newHashSet( KafkaTestUtil.pollForResults(500, 6, expected.size(), consumer) ); // Show the correct binding sets results from the job. assertEquals(expected, results); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/da63fd12/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java index e55ec2e..3e0e64d 100644 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java @@ -25,11 +25,13 @@ import java.util.Set; import java.util.UUID; import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.rya.api.function.projection.RandomUUIDFactory; import org.apache.rya.api.model.VisibilityBindingSet; import org.apache.rya.api.model.VisibilityStatement; import org.apache.rya.streams.kafka.KafkaTestUtil; import org.apache.rya.streams.kafka.KafkaTopics; import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; import org.apache.rya.streams.kafka.topology.TopologyFactory; import org.apache.rya.test.kafka.KafkaTestInstanceRule; import org.junit.Rule; @@ -57,7 +59,7 @@ public class StatementPatternProcessorIT { // Setup a topology. final String query = "SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }"; final TopologyFactory factory = new TopologyFactory(); - final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic); + final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory()); // Create a statement that generate an SP result. final ValueFactory vf = new ValueFactoryImpl(); @@ -73,7 +75,7 @@ public class StatementPatternProcessorIT { expected.add( new VisibilityBindingSet(bs, "a") ); // Run the test. - KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected); + KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class); } @Test @@ -87,7 +89,7 @@ public class StatementPatternProcessorIT { // Setup a topology. final String query = "SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }"; final TopologyFactory factory = new TopologyFactory(); - final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic); + final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory()); // Create some statements where some generates SP results and others do not. final ValueFactory vf = new ValueFactoryImpl(); @@ -111,7 +113,7 @@ public class StatementPatternProcessorIT { expected.add( new VisibilityBindingSet(bs, "a|b") ); // Run the test. - KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected); + KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class); } @Test @@ -128,7 +130,7 @@ public class StatementPatternProcessorIT { + "?person ?action <urn:Bob>" + "}"; final TopologyFactory factory = new TopologyFactory(); - final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic); + final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory()); // Create some statements where some generates SP results and others do not. final ValueFactory vf = new ValueFactoryImpl(); @@ -145,7 +147,7 @@ public class StatementPatternProcessorIT { expected.add( new VisibilityBindingSet(bs, "a") ); // Run the test. - KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected); + KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class); } @Test @@ -162,7 +164,7 @@ public class StatementPatternProcessorIT { + "?person ?action <urn:Bob>" + "}"; final TopologyFactory factory = new TopologyFactory(); - final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic); + final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory()); // Create some statements where some generates SP results and others do not. final ValueFactory vf = new ValueFactoryImpl(); @@ -188,6 +190,6 @@ public class StatementPatternProcessorIT { expected.add(new VisibilityBindingSet(bs, "a")); // Run the test. - KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected); + KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/da63fd12/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java index dbad15c..b137a9a 100644 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java @@ -24,35 +24,23 @@ import java.util.List; import java.util.Set; import java.util.UUID; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.TopologyBuilder; -import org.apache.kafka.streams.state.Stores; -import org.apache.rya.api.function.join.LeftOuterJoin; import org.apache.rya.api.function.join.NaturalJoin; +import org.apache.rya.api.function.projection.RandomUUIDFactory; import org.apache.rya.api.model.VisibilityBindingSet; import org.apache.rya.api.model.VisibilityStatement; import org.apache.rya.streams.kafka.KafkaTestUtil; import org.apache.rya.streams.kafka.KafkaTopics; -import org.apache.rya.streams.kafka.RdfTestUtil; import org.apache.rya.streams.kafka.processors.ProcessorResult; -import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult; -import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side; import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult; -import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier; import org.apache.rya.streams.kafka.processors.join.JoinProcessorSupplier.JoinProcessor; -import org.apache.rya.streams.kafka.processors.output.BindingSetOutputFormatterSupplier.BindingSetOutputFormatter; -import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerde; -import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer; -import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; import org.apache.rya.streams.kafka.topology.TopologyFactory; import org.apache.rya.test.kafka.KafkaTestInstanceRule; import org.junit.Rule; import org.junit.Test; import org.openrdf.model.ValueFactory; import org.openrdf.model.impl.ValueFactoryImpl; -import org.openrdf.query.algebra.StatementPattern; import org.openrdf.query.impl.MapBindingSet; import com.google.common.collect.Lists; @@ -84,12 +72,13 @@ public class JoinProcessorIT { final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); // Setup a topology. - final String query = "SELECT * WHERE { " - + "?person <urn:talksTo> ?employee ." - + "?employee <urn:worksAt> ?business" - + " }"; + final String query = + "SELECT * WHERE { " + + "?person <urn:talksTo> ?employee ." + + "?employee <urn:worksAt> ?business" + + " }"; final TopologyFactory factory = new TopologyFactory(); - final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic); + final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory()); // Create some statements that generate a bunch of right SP results. final ValueFactory vf = new ValueFactoryImpl(); @@ -122,7 +111,7 @@ public class JoinProcessorIT { expected.add( new VisibilityBindingSet(bs, "c&(b|c)") ); // Run the test. - KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected); + KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class); } @Test @@ -133,46 +122,14 @@ public class JoinProcessorIT { final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - // Get the StatementPatterns that will be evaluated. - final StatementPattern leftSp = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?employee }"); - final StatementPattern rightSp = RdfTestUtil.getSp("SELECT * WHERE { ?employee <urn:worksAt> ?business }"); - // Setup a topology. - final TopologyBuilder builder = new TopologyBuilder(); - - // The topic that Statements are written to is used as a source. - builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic); - - // Add a processor that handles the first statement pattern. - builder.addProcessor("LEFT_SP", new StatementPatternProcessorSupplier(leftSp, - result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, result) )), "STATEMENTS"); - - // Add a processor that handles the second statement pattern. - builder.addProcessor("RIGHT_SP", new StatementPatternProcessorSupplier(rightSp, - result -> ProcessorResult.make( new BinaryResult(Side.LEFT, result) )), "STATEMENTS"); - - // Add a processor that handles a natrual join over the SPs. - builder.addProcessor("NATURAL_JOIN", new JoinProcessorSupplier( - "NATURAL_JOIN", - new NaturalJoin(), - Lists.newArrayList("employee"), - Lists.newArrayList("employee", "person", "business"), - result -> ProcessorResult.make( new UnaryResult(result) )), "LEFT_SP", "RIGHT_SP"); - - // Add a state store for the join processor. - final StateStoreSupplier joinStoreSupplier = - Stores.create( "NATURAL_JOIN" ) - .withStringKeys() - .withValues(new VisibilityBindingSetSerde()) - .inMemory() - .build(); - builder.addStateStore(joinStoreSupplier, "NATURAL_JOIN"); - - // Add a processor that formats the VisibilityBindingSet for output. - builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "NATURAL_JOIN"); - - // Add a sink that writes the data out to a new Kafka topic. - builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER"); + final String query = + "SELECT * WHERE { " + + "?person <urn:talksTo> ?employee ." + + "?employee <urn:worksAt> ?business" + + " }"; + final TopologyFactory factory = new TopologyFactory(); + final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory()); // Create some statements that generate a bunch of right SP results. final ValueFactory vf = new ValueFactoryImpl(); @@ -205,7 +162,7 @@ public class JoinProcessorIT { expected.add( new VisibilityBindingSet(bs, "c&(b|c)") ); // Run the test. - KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected); + KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class); } @Test @@ -216,46 +173,14 @@ public class JoinProcessorIT { final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - // Get the StatementPatterns that will be evaluated. - final StatementPattern leftSp = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?employee }"); - final StatementPattern rightSp = RdfTestUtil.getSp("SELECT * WHERE { ?employee <urn:worksAt> ?business }"); - // Setup a topology. - final TopologyBuilder builder = new TopologyBuilder(); - - // The topic that Statements are written to is used as a source. - builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic); - - // Add a processor that handles the first statement pattern. - builder.addProcessor("LEFT_SP", new StatementPatternProcessorSupplier(leftSp, - result -> ProcessorResult.make( new BinaryResult(Side.LEFT, result) )), "STATEMENTS"); - - // Add a processor that handles the second statement pattern. - builder.addProcessor("RIGHT_SP", new StatementPatternProcessorSupplier(rightSp, - result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, result) )), "STATEMENTS"); - - // Add a processor that handles a natrual join over the SPs. - builder.addProcessor("NATURAL_JOIN", new JoinProcessorSupplier( - "NATURAL_JOIN", - new NaturalJoin(), - Lists.newArrayList("employee"), - Lists.newArrayList("employee", "person", "business"), - result -> ProcessorResult.make( new UnaryResult(result) )), "LEFT_SP", "RIGHT_SP"); - - // Add a state store for the join processor. - final StateStoreSupplier joinStoreSupplier = - Stores.create( "NATURAL_JOIN" ) - .withStringKeys() - .withValues(new VisibilityBindingSetSerde()) - .inMemory() - .build(); - builder.addStateStore(joinStoreSupplier, "NATURAL_JOIN"); - - // Add a processor that formats the VisibilityBindingSet for output. - builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "NATURAL_JOIN"); - - // Add a sink that writes the data out to a new Kafka topic. - builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER"); + final String query = + "SELECT * WHERE { " + + "?person <urn:talksTo> ?employee ." + + "?employee <urn:worksAt> ?business" + + " }"; + final TopologyFactory factory = new TopologyFactory(); + final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory()); // Create some statements that generate a bunch of right SP results. final ValueFactory vf = new ValueFactoryImpl(); @@ -294,7 +219,7 @@ public class JoinProcessorIT { expected.add( new VisibilityBindingSet(bs, "a&c") ); // Run the test. - KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected); + KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class); } @Test @@ -305,67 +230,15 @@ public class JoinProcessorIT { final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - // Get the StatementPatterns that will be evaluated. - final StatementPattern sp1 = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?employee }"); - final StatementPattern sp2 = RdfTestUtil.getSp("SELECT * WHERE { ?employee <urn:worksAt> ?business }"); - final StatementPattern sp3 = RdfTestUtil.getSp("SELECT * WHERE { ?employee <urn:hourlyWage> ?wage }"); - // Setup a topology. - final TopologyBuilder builder = new TopologyBuilder(); - - // The topic that Statements are written to is used as a source. - builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic); - - // Add a processor that handles the first statement pattern. - builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp1, - result -> ProcessorResult.make( new BinaryResult(Side.LEFT, result) )), "STATEMENTS"); - - // Add a processor that handles the second statement pattern. - builder.addProcessor("SP2", new StatementPatternProcessorSupplier(sp2, - result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, result) )), "STATEMENTS"); - - // Add a processor that handles a natural join over SPs 1 and 2. - builder.addProcessor("JOIN1", new JoinProcessorSupplier( - "JOIN1", - new NaturalJoin(), - Lists.newArrayList("employee"), - Lists.newArrayList("employee", "person", "business"), - result -> ProcessorResult.make( new BinaryResult(Side.LEFT, result) )), "SP1", "SP2"); - - // Add a processor that handles the third statement pattern. - builder.addProcessor("SP3", new StatementPatternProcessorSupplier(sp3, - result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, result) )), "STATEMENTS"); - - // Add a processor that handles a natural join over JOIN1 and SP3. - builder.addProcessor("JOIN2", new JoinProcessorSupplier( - "JOIN2", - new NaturalJoin(), - Lists.newArrayList("employee"), - Lists.newArrayList("employee", "business", "wage"), - result -> ProcessorResult.make( new UnaryResult(result) )), "JOIN1", "SP3"); - - // Setup the join state suppliers. - final StateStoreSupplier join1StoreSupplier = - Stores.create( "JOIN1" ) - .withStringKeys() - .withValues(new VisibilityBindingSetSerde()) - .inMemory() - .build(); - builder.addStateStore(join1StoreSupplier, "JOIN1"); - - final StateStoreSupplier join2StoreSupplier = - Stores.create( "JOIN2" ) - .withStringKeys() - .withValues(new VisibilityBindingSetSerde()) - .inMemory() - .build(); - builder.addStateStore(join2StoreSupplier, "JOIN2"); - - // Add a processor that formats the VisibilityBindingSet for output. - builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "JOIN2"); - - // Add a sink that writes the data out to a new Kafka topic. - builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER"); + final String query = + "SELECT * WHERE { " + + "?person <urn:talksTo> ?employee ." + + "?employee <urn:worksAt> ?business ." + + "?employee <urn:hourlyWage> ?wage ." + + " }"; + final TopologyFactory factory = new TopologyFactory(); + final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory()); // Create some statements that generate a bunch of right SP results. final ValueFactory vf = new ValueFactoryImpl(); @@ -387,7 +260,7 @@ public class JoinProcessorIT { expected.add( new VisibilityBindingSet(bs, "a") ); // Run the test. - KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 3000, statements, expected); + KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 3000, statements, expected, VisibilityBindingSetDeserializer.class); } @Test @@ -398,46 +271,14 @@ public class JoinProcessorIT { final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - // Get the StatementPatterns that will be evaluated. - final StatementPattern requiredSp = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?employee }"); - final StatementPattern optionalSp = RdfTestUtil.getSp("SELECT * WHERE { ?employee <urn:worksAt> ?business }"); - // Setup a topology. - final TopologyBuilder builder = new TopologyBuilder(); - - // The topic that Statements are written to is used as a source. - builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic); - - // Add a processor that handles the first statement pattern. - builder.addProcessor("REQUIRED_SP", new StatementPatternProcessorSupplier(requiredSp, - result -> ProcessorResult.make( new BinaryResult(Side.LEFT, result) )), "STATEMENTS"); - - // Add a processor that handles the second statement pattern. - builder.addProcessor("OPTIONAL_SP", new StatementPatternProcessorSupplier(optionalSp, - result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, result) )), "STATEMENTS"); - - // Add a processor that handles a natrual join over the SPs. - builder.addProcessor("LEFT_JOIN", new JoinProcessorSupplier( - "LEFT_JOIN", - new LeftOuterJoin(), - Lists.newArrayList("employee"), - Lists.newArrayList("employee", "person", "business"), - result -> ProcessorResult.make( new UnaryResult(result) )), "REQUIRED_SP", "OPTIONAL_SP"); - - // Add a state store for the join processor. - final StateStoreSupplier joinStoreSupplier = - Stores.create( "LEFT_JOIN" ) - .withStringKeys() - .withValues(new VisibilityBindingSetSerde()) - .inMemory() - .build(); - builder.addStateStore(joinStoreSupplier, "LEFT_JOIN"); - - // Add a processor that formats the VisibilityBindingSet for output. - builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "LEFT_JOIN"); - - // Add a sink that writes the data out to a new Kafka topic. - builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER"); + final String query = + "SELECT * WHERE { " + + "?person <urn:talksTo> ?employee ." + + "OPTIONAL{ ?employee <urn:worksAt> ?business } " + + " }"; + final TopologyFactory factory = new TopologyFactory(); + final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory()); // Create some statements that generate a result that includes the optional value as well as one that does not. final ValueFactory vf = new ValueFactoryImpl(); @@ -470,6 +311,6 @@ public class JoinProcessorIT { expected.add( new VisibilityBindingSet(bs, "c") ); // Run the test. - KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected); + KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/da63fd12/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java index ee0e55b..d71577b 100644 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java @@ -24,29 +24,13 @@ import java.util.List; import java.util.Set; import java.util.UUID; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.TopologyBuilder; -import org.apache.kafka.streams.state.Stores; -import org.apache.rya.api.function.join.NaturalJoin; -import org.apache.rya.api.function.projection.MultiProjectionEvaluator; -import org.apache.rya.api.model.VisibilityBindingSet; import org.apache.rya.api.model.VisibilityStatement; import org.apache.rya.streams.kafka.KafkaTestUtil; import org.apache.rya.streams.kafka.KafkaTopics; -import org.apache.rya.streams.kafka.RdfTestUtil; -import org.apache.rya.streams.kafka.processors.ProcessorResult; -import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult; -import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side; -import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult; -import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier; -import org.apache.rya.streams.kafka.processors.join.JoinProcessorSupplier; -import org.apache.rya.streams.kafka.processors.output.BindingSetOutputFormatterSupplier.BindingSetOutputFormatter; import org.apache.rya.streams.kafka.processors.projection.MultiProjectionProcessorSupplier.MultiProjectionProcessor; -import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerde; -import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer; import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer; +import org.apache.rya.streams.kafka.topology.TopologyFactory; import org.apache.rya.test.kafka.KafkaTestInstanceRule; import org.junit.Rule; import org.junit.Test; @@ -54,11 +38,6 @@ import org.openrdf.model.BNode; import org.openrdf.model.ValueFactory; import org.openrdf.model.impl.ValueFactoryImpl; import org.openrdf.model.vocabulary.RDF; -import org.openrdf.query.algebra.MultiProjection; -import org.openrdf.query.algebra.StatementPattern; -import org.openrdf.query.impl.MapBindingSet; - -import com.google.common.collect.Lists; /** * Integration tests the methods of {@link MultiProjectionProcessor}. @@ -76,10 +55,8 @@ public class MultiProjectionProcessorIT { final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - // Get the RDF model objects that will be used to build the query. - final StatementPattern sp1 = RdfTestUtil.getSp("SELECT * WHERE { ?thing <urn:corner> ?location . }"); - final StatementPattern sp2 = RdfTestUtil.getSp("SELECT * WHERE { ?thing <urn:compass> ?direction . }"); - final MultiProjection multiProjection = RdfTestUtil.getMultiProjection( + // Create a topology for the Query that will be tested. + final String sparql = "CONSTRUCT {" + "_:b a <urn:movementObservation> ; " + "<urn:location> ?location ; " + @@ -88,38 +65,10 @@ public class MultiProjectionProcessorIT { "WHERE {" + "?thing <urn:corner> ?location ." + "?thing <urn:compass> ?direction." + - "}"); - - // Setup a topology. - final TopologyBuilder builder = new TopologyBuilder(); - builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic); - builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp1, - result -> ProcessorResult.make( new BinaryResult(Side.LEFT, result) )), "STATEMENTS"); - builder.addProcessor("SP2", new StatementPatternProcessorSupplier(sp2, - result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, result) )), "STATEMENTS"); - - builder.addProcessor("NATURAL_JOIN", new JoinProcessorSupplier( - "NATURAL_JOIN", - new NaturalJoin(), - Lists.newArrayList("thing"), - Lists.newArrayList("thing", "location", "direction"), - result -> ProcessorResult.make( new UnaryResult(result) )), "SP1", "SP2"); - - final StateStoreSupplier joinStoreSupplier = - Stores.create( "NATURAL_JOIN" ) - .withStringKeys() - .withValues(new VisibilityBindingSetSerde()) - .inMemory() - .build(); - builder.addStateStore(joinStoreSupplier, "NATURAL_JOIN"); + "}"; - final String blankNodeId = UUID.randomUUID().toString(); - builder.addProcessor("MULTIPROJECTION", new MultiProjectionProcessorSupplier( - MultiProjectionEvaluator.make(multiProjection, () -> blankNodeId), - result -> ProcessorResult.make(new UnaryResult(result))), "NATURAL_JOIN"); - - builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "MULTIPROJECTION"); - builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER"); + final String bNodeId = UUID.randomUUID().toString(); + final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, () -> bNodeId); // Create the statements that will be input into the query. final ValueFactory vf = new ValueFactoryImpl(); @@ -130,26 +79,14 @@ public class MultiProjectionProcessorIT { vf.createStatement(vf.createURI("urn:car1"), vf.createURI("urn:corner"), vf.createURI("urn:corner1")), "a") ); // Make the expected results. - final Set<VisibilityBindingSet> expected = new HashSet<>(); - final BNode blankNode = vf.createBNode(blankNodeId); - - MapBindingSet expectedBs = new MapBindingSet(); - expectedBs.addBinding("subject", blankNode); - expectedBs.addBinding("predicate", RDF.TYPE); - expectedBs.addBinding("object", vf.createURI("urn:movementObservation")); - - expectedBs = new MapBindingSet(); - expectedBs.addBinding("subject", blankNode); - expectedBs.addBinding("predicate", vf.createURI("urn:direction")); - expectedBs.addBinding("object", vf.createURI("urn:NW")); - + final Set<VisibilityStatement> expected = new HashSet<>(); + final BNode blankNode = vf.createBNode(bNodeId); - expectedBs = new MapBindingSet(); - expectedBs.addBinding("subject", blankNode); - expectedBs.addBinding("predicate", vf.createURI("urn:location")); - expectedBs.addBinding("object", vf.createURI("urn:corner1")); + expected.add(new VisibilityStatement(vf.createStatement(blankNode, RDF.TYPE, vf.createURI("urn:movementObservation")), "a")); + expected.add(new VisibilityStatement(vf.createStatement(blankNode, vf.createURI("urn:direction"), vf.createURI("urn:NW")), "a")); + expected.add(new VisibilityStatement(vf.createStatement(blankNode, vf.createURI("urn:location"), vf.createURI("urn:corner1")), "a")); // Run the test. - KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected); + KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityStatementDeserializer.class); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/da63fd12/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java index 99e2451..bc5f115 100644 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java @@ -24,30 +24,20 @@ import java.util.List; import java.util.Set; import java.util.UUID; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.processor.TopologyBuilder; -import org.apache.rya.api.function.projection.ProjectionEvaluator; +import org.apache.rya.api.function.projection.RandomUUIDFactory; import org.apache.rya.api.model.VisibilityBindingSet; import org.apache.rya.api.model.VisibilityStatement; import org.apache.rya.streams.kafka.KafkaTestUtil; import org.apache.rya.streams.kafka.KafkaTopics; -import org.apache.rya.streams.kafka.RdfTestUtil; -import org.apache.rya.streams.kafka.processors.ProcessorResult; -import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier; -import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult; import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor; -import org.apache.rya.streams.kafka.processors.output.BindingSetOutputFormatterSupplier.BindingSetOutputFormatter; -import org.apache.rya.streams.kafka.processors.projection.ProjectionProcessorSupplier; -import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer; -import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; +import org.apache.rya.streams.kafka.topology.TopologyFactory; import org.apache.rya.test.kafka.KafkaTestInstanceRule; import org.junit.Rule; import org.junit.Test; import org.openrdf.model.ValueFactory; import org.openrdf.model.impl.ValueFactoryImpl; -import org.openrdf.query.algebra.Projection; -import org.openrdf.query.algebra.StatementPattern; import org.openrdf.query.impl.MapBindingSet; import com.google.common.collect.Sets; @@ -68,34 +58,14 @@ public class ProjectionProcessorIT { final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - // Get the RDF model objects that will be used to build the query. + // Create a topology for the Query that will be tested. final String sparql = "SELECT (?person AS ?p) ?otherPerson " + "WHERE { " + "?person <urn:talksTo> ?otherPerson . " + "}"; - final Projection projection = RdfTestUtil.getProjection(sparql); - final StatementPattern sp = RdfTestUtil.getSp(sparql); - // Setup a topology. - final TopologyBuilder builder = new TopologyBuilder(); - - // The topic that Statements are written to is used as a source. - builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic); - - // Add a processor that handles the first statement pattern. - builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS"); - - // Add a processor that handles the projection. - builder.addProcessor("P1", new ProjectionProcessorSupplier( - ProjectionEvaluator.make(projection), - result -> ProcessorResult.make(new UnaryResult(result))), "SP1"); - - // Add a processor that formats the VisibilityBindingSet for output. - builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "P1"); - - // Add a sink that writes the data out to a new Kafka topic. - builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER"); + final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); // Load some data into the input topic. final ValueFactory vf = new ValueFactoryImpl(); @@ -110,6 +80,6 @@ public class ProjectionProcessorIT { expectedBs.addBinding("otherPerson", vf.createURI("urn:Bob")); expected.add(new VisibilityBindingSet(expectedBs, "a")); - KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, Sets.newHashSet(expected)); + KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, Sets.newHashSet(expected), VisibilityBindingSetDeserializer.class); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/da63fd12/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/topology/TopologyFactoryTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/topology/TopologyFactoryTest.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/topology/TopologyFactoryTest.java index eda4c89..31462ec 100644 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/topology/TopologyFactoryTest.java +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/topology/TopologyFactoryTest.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.rya.streams.kafka.topology; import static org.junit.Assert.assertEquals; @@ -5,6 +23,7 @@ import static org.junit.Assert.assertTrue; import java.util.List; +import org.apache.rya.api.function.projection.RandomUUIDFactory; import org.apache.rya.streams.kafka.topology.TopologyFactory.ProcessorEntry; import org.junit.Before; import org.junit.Test; @@ -15,6 +34,9 @@ import org.openrdf.query.algebra.Projection; import org.openrdf.query.algebra.StatementPattern; import org.openrdf.query.algebra.Var; +/** + * Unit tests the methods of {@link TopologyFactory}. + */ public class TopologyFactoryTest { private static TopologyFactory FACTORY; @@ -40,7 +62,7 @@ public class TopologyFactoryTest { + "?person <urn:talksTo> ?otherPerson . " + "}"; - FACTORY.build(query, "source", "sink"); + FACTORY.build(query, "source", "sink", new RandomUUIDFactory()); final List<ProcessorEntry> entries = FACTORY.getProcessorEntry(); assertTrue(entries.get(0).getNode() instanceof Projection); @@ -57,7 +79,7 @@ public class TopologyFactoryTest { + "?otherPerson <urn:talksTo> ?dog . " + "}"; - FACTORY.build(query, "source", "sink"); + FACTORY.build(query, "source", "sink", new RandomUUIDFactory()); final List<ProcessorEntry> entries = FACTORY.getProcessorEntry(); assertTrue(entries.get(0).getNode() instanceof Projection); @@ -76,7 +98,7 @@ public class TopologyFactoryTest { + "?dog <urn:chews> ?toy . " + "}"; - FACTORY.build(query, "source", "sink"); + FACTORY.build(query, "source", "sink", new RandomUUIDFactory()); final List<ProcessorEntry> entries = FACTORY.getProcessorEntry(); assertTrue(entries.get(0).getNode() instanceof Projection); @@ -96,11 +118,11 @@ public class TopologyFactoryTest { + "?person <urn:talksTo> ?otherPerson . " + "}"; - FACTORY.build(query, "source", "sink"); + FACTORY.build(query, "source", "sink", new RandomUUIDFactory()); final List<ProcessorEntry> entries = FACTORY.getProcessorEntry(); assertTrue(entries.get(0).getNode() instanceof Projection); final StatementPattern expected = new StatementPattern(new Var("person"), TALKS_TO, new Var("otherPerson")); assertEquals(expected, entries.get(1).getNode()); } -} +} \ No newline at end of file
