http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538393fe/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
new file mode 100644
index 0000000..99e2451
--- /dev/null
+++ 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.projection;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.function.projection.ProjectionEvaluator;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.kafka.KafkaTestUtil;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.RdfTestUtil;
+import org.apache.rya.streams.kafka.processors.ProcessorResult;
+import 
org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier;
+import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult;
+import 
org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor;
+import 
org.apache.rya.streams.kafka.processors.output.BindingSetOutputFormatterSupplier.BindingSetOutputFormatter;
+import 
org.apache.rya.streams.kafka.processors.projection.ProjectionProcessorSupplier;
+import 
org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer;
+import 
org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.algebra.Projection;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Integration tests the methods of {@link StatementPatternProcessor}.
+ */
+public class ProjectionProcessorIT {
+
+    @Rule
+    public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
+
+    @Test
+    public void showProcessorWorks() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Get the RDF model objects that will be used to build the query.
+        final String sparql =
+                "SELECT (?person AS ?p) ?otherPerson " +
+                "WHERE { " +
+                    "?person <urn:talksTo> ?otherPerson . " +
+                "}";
+        final Projection projection = RdfTestUtil.getProjection(sparql);
+        final StatementPattern sp = RdfTestUtil.getSp(sparql);
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        // The topic that Statements are written to is used as a source.
+        builder.addSource("STATEMENTS", new StringDeserializer(), new 
VisibilityStatementDeserializer(), statementsTopic);
+
+        // Add a processor that handles the first statement pattern.
+        builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp, 
result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS");
+
+        // Add a processor that handles the projection.
+        builder.addProcessor("P1", new ProjectionProcessorSupplier(
+                ProjectionEvaluator.make(projection),
+                result -> ProcessorResult.make(new UnaryResult(result))), 
"SP1");
+
+        // Add a processor that formats the VisibilityBindingSet for output.
+        builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, 
"P1");
+
+        // Add a sink that writes the data out to a new Kafka topic.
+        builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), 
new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+
+        // Load some data into the input topic.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add( new 
VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), 
vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
+
+        // Show the correct binding set results from the job.
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+
+        final MapBindingSet expectedBs = new MapBindingSet();
+        expectedBs.addBinding("p", vf.createURI("urn:Alice"));
+        expectedBs.addBinding("otherPerson", vf.createURI("urn:Bob"));
+        expected.add(new VisibilityBindingSet(expectedBs, "a"));
+
+        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, 
resultsTopic, builder, 2000, statements, Sets.newHashSet(expected));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538393fe/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorTest.java
 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorTest.java
new file mode 100644
index 0000000..7ff2c96
--- /dev/null
+++ 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.projection;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.rya.api.function.projection.ProjectionEvaluator;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.streams.kafka.RdfTestUtil;
+import org.apache.rya.streams.kafka.processors.ProcessorResult;
+import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult;
+import 
org.apache.rya.streams.kafka.processors.projection.ProjectionProcessorSupplier.ProjectionProcessor;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.algebra.Projection;
+import org.openrdf.query.impl.MapBindingSet;
+
+/**
+ * Unit tests the methods of {@link ProjectionProcessor}.
+ */
+public class ProjectionProcessorTest {
+
+    @Test
+    public void showProjectionFunctionIsCalled() throws Exception {
+        // A query whose projection does not change anything.
+        final Projection projection = RdfTestUtil.getProjection(
+                "SELECT (?person AS ?p) (?employee AS ?e) ?business " +
+                "WHERE { " +
+                    "?person <urn:talksTo> ?employee . " +
+                    "?employee <urn:worksAt> ?business . " +
+                "}");
+
+        // Create a Binding Set that contains the result of the WHERE clause.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final MapBindingSet inputBs = new MapBindingSet();
+        inputBs.addBinding("person", vf.createURI("urn:Alice"));
+        inputBs.addBinding("employee", vf.createURI("urn:Bob"));
+        inputBs.addBinding("business", vf.createURI("urn:TacoJoint"));
+        final VisibilityBindingSet inputVisBs = new 
VisibilityBindingSet(inputBs, "a");
+
+        // The expected binding set changes the "person" binding name to "p" 
and "employee" to "e".
+        final MapBindingSet expectedBs = new MapBindingSet();
+        expectedBs.addBinding("p", vf.createURI("urn:Alice"));
+        expectedBs.addBinding("e", vf.createURI("urn:Bob"));
+        expectedBs.addBinding("business", vf.createURI("urn:TacoJoint"));
+        final VisibilityBindingSet expectedVisBs = new 
VisibilityBindingSet(expectedBs, "a");
+
+        // Show it resulted in the correct output BindingSet.
+        final ProcessorContext context = mock(ProcessorContext.class);
+        final ProjectionProcessor processor = new ProjectionProcessor(
+                ProjectionEvaluator.make(projection),
+                result -> ProcessorResult.make(new UnaryResult(result)));
+        processor.init(context);
+
+        processor.process("key", ProcessorResult.make(new 
UnaryResult(inputVisBs)));
+
+        // Verify the expected binding set was emitted.
+        final ProcessorResult expected = ProcessorResult.make(new 
UnaryResult(expectedVisBs));
+        verify(context, times(1)).forward(eq("key"), eq(expected));
+    }
+}
\ No newline at end of file

Reply via email to