http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java
 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java
deleted file mode 100644
index 22a883b..0000000
--- 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.streams.kafka.processors.filter;
-
-import static org.junit.Assert.assertEquals;
-
-import java.time.ZonedDateTime;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.rya.api.function.projection.RandomUUIDFactory;
-import org.apache.rya.api.model.VisibilityBindingSet;
-import org.apache.rya.api.model.VisibilityStatement;
-import org.apache.rya.streams.kafka.KafkaTopics;
-import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
-import 
org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier.FilterProcessor;
-import 
org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
-import org.apache.rya.streams.kafka.topology.TopologyFactory;
-import org.apache.rya.test.kafka.KafkaTestInstanceRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.openrdf.model.Resource;
-import org.openrdf.model.Statement;
-import org.openrdf.model.URI;
-import org.openrdf.model.Value;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.StatementImpl;
-import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.query.algebra.evaluation.function.Function;
-import org.openrdf.query.algebra.evaluation.function.FunctionRegistry;
-import org.openrdf.query.impl.MapBindingSet;
-
-/**
- * Integration tests the temporal methods of {@link FilterProcessor}.
- */
-public class TemporalFilterIT {
-    private static final ValueFactory vf = new ValueFactoryImpl();
-    private static final String TEMPORAL = "http://rya.apache.org/ns/temporal";;
-    private static final ZonedDateTime TIME = 
ZonedDateTime.parse("2015-12-30T12:00:00Z");
-    private static final ZonedDateTime TIME_10 = 
ZonedDateTime.parse("2015-12-30T12:00:10Z");
-    private static final ZonedDateTime TIME_20 = 
ZonedDateTime.parse("2015-12-30T12:00:20Z");
-
-    @Rule
-    public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(false);
-
-    @Test
-    public void temporalFunctionsRegistered() {
-        int count = 0;
-        final Collection<Function> funcs = 
FunctionRegistry.getInstance().getAll();
-        for (final Function fun : funcs) {
-            if (fun.getURI().startsWith(TEMPORAL)) {
-                count++;
-            }
-        }
-
-        // There are 4 temporal functions registered, ensure that there are 4.
-        assertEquals(4, count);
-    }
-
-    @Test
-    public void showEqualsWorks() throws Exception {
-        // Enumerate some topics that will be re-used
-        final String ryaInstance = UUID.randomUUID().toString();
-        final UUID queryId = UUID.randomUUID();
-        final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
-        // Get the RDF model objects that will be used to build the query.
-        final String sparql =
-                "PREFIX time: <http://www.w3.org/2006/time/> \n"
-                        + "PREFIX tempf: 
<http://rya.apache.org/ns/temporal/>\n"
-                        + "SELECT * \n"
-                        + "WHERE { \n"
-                        + "  <urn:time> time:atTime ?date .\n"
-                        + " FILTER(tempf:equals(?date, \"" + TIME.toString() + 
"\")) "
-                        + "}";
-        // Setup a topology.
-        final TopologyBuilder builder = new TopologyFactory().build(sparql, 
statementsTopic, resultsTopic, new RandomUUIDFactory());
-
-        // Create the statements that will be input into the query.
-        final ValueFactory vf = new ValueFactoryImpl();
-        final List<VisibilityStatement> statements = getStatements();
-
-        // Make the expected results.
-        final Set<VisibilityBindingSet> expected = new HashSet<>();
-        final MapBindingSet bs = new MapBindingSet();
-        bs.addBinding("date", vf.createLiteral(TIME.toString()));
-        expected.add( new VisibilityBindingSet(bs, "a") );
-
-        // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, 
resultsTopic, builder, statements, expected, 
VisibilityBindingSetDeserializer.class);
-    }
-
-    @Test
-    public void showBeforeWorks() throws Exception {
-        // Enumerate some topics that will be re-used
-        final String ryaInstance = UUID.randomUUID().toString();
-        final UUID queryId = UUID.randomUUID();
-        final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
-        // Get the RDF model objects that will be used to build the query.
-        final String sparql =
-                "PREFIX time: <http://www.w3.org/2006/time/> \n"
-                        + "PREFIX tempf: 
<http://rya.apache.org/ns/temporal/>\n"
-                        + "SELECT * \n"
-                        + "WHERE { \n"
-                        + "  <urn:time> time:atTime ?date .\n"
-                        + " FILTER(tempf:before(?date, \"" + 
TIME_10.toString() + "\")) "
-                        + "}";
-        // Setup a topology.
-        final TopologyBuilder builder = new TopologyFactory().build(sparql, 
statementsTopic, resultsTopic, new RandomUUIDFactory());
-
-        // Create the statements that will be input into the query.
-        final ValueFactory vf = new ValueFactoryImpl();
-        final List<VisibilityStatement> statements = getStatements();
-
-        // Make the expected results.
-        final Set<VisibilityBindingSet> expected = new HashSet<>();
-        final MapBindingSet bs = new MapBindingSet();
-        bs.addBinding("date", vf.createLiteral(TIME.toString()));
-        expected.add( new VisibilityBindingSet(bs, "a") );
-
-        // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, 
resultsTopic, builder, statements, expected, 
VisibilityBindingSetDeserializer.class);
-    }
-
-    @Test
-    public void showAfterWorks() throws Exception {
-        // Enumerate some topics that will be re-used
-        final String ryaInstance = UUID.randomUUID().toString();
-        final UUID queryId = UUID.randomUUID();
-        final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
-        // Get the RDF model objects that will be used to build the query.
-        final String sparql =
-                "PREFIX time: <http://www.w3.org/2006/time/> \n"
-                        + "PREFIX tempf: 
<http://rya.apache.org/ns/temporal/>\n"
-                        + "SELECT * \n"
-                        + "WHERE { \n"
-                        + "  <urn:time> time:atTime ?date .\n"
-                        + " FILTER(tempf:after(?date, \"" + TIME_10.toString() 
+ "\")) "
-                        + "}";
-        // Setup a topology.
-        final TopologyBuilder builder = new TopologyFactory().build(sparql, 
statementsTopic, resultsTopic, new RandomUUIDFactory());
-
-        // Create the statements that will be input into the query.
-        final ValueFactory vf = new ValueFactoryImpl();
-        final List<VisibilityStatement> statements = getStatements();
-
-        // Make the expected results.
-        final Set<VisibilityBindingSet> expected = new HashSet<>();
-        final MapBindingSet bs = new MapBindingSet();
-        bs.addBinding("date", vf.createLiteral(TIME_20.toString()));
-        expected.add( new VisibilityBindingSet(bs, "a") );
-
-        // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, 
resultsTopic, builder, statements, expected, 
VisibilityBindingSetDeserializer.class);
-    }
-
-    @Test
-    public void showWithinWorks() throws Exception {
-        // Enumerate some topics that will be re-used
-        final String ryaInstance = UUID.randomUUID().toString();
-        final UUID queryId = UUID.randomUUID();
-        final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
-        // Get the RDF model objects that will be used to build the query.
-        final String sparql =
-                "PREFIX time: <http://www.w3.org/2006/time/> \n"
-                        + "PREFIX tempf: 
<http://rya.apache.org/ns/temporal/>\n"
-                        + "SELECT * \n"
-                        + "WHERE { \n"
-                        + "  <urn:time> time:atTime ?date .\n"
-                        + " FILTER(tempf:within(?date, \"" + TIME.toString() + 
"/" + TIME_20.toString() + "\")) "
-                        + "}";
-        // Setup a topology.
-        final TopologyBuilder builder = new TopologyFactory().build(sparql, 
statementsTopic, resultsTopic, new RandomUUIDFactory());
-
-        // Create the statements that will be input into the query.
-        final ValueFactory vf = new ValueFactoryImpl();
-        final List<VisibilityStatement> statements = getStatements();
-
-        // Make the expected results.
-        final Set<VisibilityBindingSet> expected = new HashSet<>();
-        final MapBindingSet bs = new MapBindingSet();
-        bs.addBinding("date", vf.createLiteral(TIME_10.toString()));
-        expected.add( new VisibilityBindingSet(bs, "a") );
-
-        // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, 
resultsTopic, builder, statements, expected, 
VisibilityBindingSetDeserializer.class);
-    }
-
-    private List<VisibilityStatement> getStatements() throws Exception {
-        final List<VisibilityStatement> statements = new ArrayList<>();
-        statements.add(new VisibilityStatement(statement(TIME), "a"));
-        statements.add(new VisibilityStatement(statement(TIME_10), "a"));
-        statements.add(new VisibilityStatement(statement(TIME_20), "a"));
-        return statements;
-    }
-
-    private static Statement statement(final ZonedDateTime time) {
-        final Resource subject = vf.createURI("urn:time");
-        final URI predicate = 
vf.createURI("http://www.w3.org/2006/time/atTime";);
-        final Value object = vf.createLiteral(time.toString());
-        return new StatementImpl(subject, predicate, object);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
deleted file mode 100644
index bdb9be6..0000000
--- 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.streams.kafka.processors.join;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.rya.api.function.join.NaturalJoin;
-import org.apache.rya.api.function.projection.RandomUUIDFactory;
-import org.apache.rya.api.model.VisibilityBindingSet;
-import org.apache.rya.api.model.VisibilityStatement;
-import org.apache.rya.streams.kafka.KafkaTopics;
-import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
-import org.apache.rya.streams.kafka.processors.ProcessorResult;
-import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult;
-import 
org.apache.rya.streams.kafka.processors.join.JoinProcessorSupplier.JoinProcessor;
-import 
org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
-import org.apache.rya.streams.kafka.topology.TopologyFactory;
-import org.apache.rya.test.kafka.KafkaTestInstanceRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.query.impl.MapBindingSet;
-
-import com.google.common.collect.Lists;
-
-/**
- * Integration tests the methods of {@link JoinProcessor}.
- */
-public class JoinProcessorIT {
-
-    @Rule
-    public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
-
-    @Test(expected = IllegalArgumentException.class)
-    public void badAllVars() throws IllegalArgumentException {
-        new JoinProcessorSupplier(
-                "NATURAL_JOIN",
-                new NaturalJoin(),
-                Lists.newArrayList("employee"),
-                Lists.newArrayList("person", "employee", "business"),
-                result -> ProcessorResult.make( new UnaryResult(result) ));
-    }
-
-    @Test
-    public void newLeftResult() throws Exception {
-        // Enumerate some topics that will be re-used
-        final String ryaInstance = UUID.randomUUID().toString();
-        final UUID queryId = UUID.randomUUID();
-        final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
-        // Setup a topology.
-        final String query =
-                "SELECT * WHERE { " +
-                    "?person <urn:talksTo> ?employee ." +
-                    "?employee <urn:worksAt> ?business" +
-                " }";
-        final TopologyFactory factory = new TopologyFactory();
-        final TopologyBuilder builder = factory.build(query, statementsTopic, 
resultsTopic, new RandomUUIDFactory());
-
-        // Create some statements that generate a bunch of right SP results.
-        final ValueFactory vf = new ValueFactoryImpl();
-        final List<VisibilityStatement> statements = new ArrayList<>();
-        statements.add( new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Bob"), 
vf.createURI("urn:worksAt"), vf.createURI("urn:TacoPlace")), "a&b") );
-        statements.add( new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Charlie"), 
vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "a") );
-        statements.add( new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Eve"), 
vf.createURI("urn:worksAt"), vf.createURI("urn:CoffeeShop")), "b") );
-        statements.add( new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Bob"), 
vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "b|c") );
-
-        // Add a statement that will generate a left result that joins with 
some of those right results.
-        statements.add( new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Alice"), 
vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "c") );
-
-        // Make the expected results.
-        final Set<VisibilityBindingSet> expected = new HashSet<>();
-        MapBindingSet bs = new MapBindingSet();
-        bs.addBinding("person", vf.createURI("urn:Alice"));
-        bs.addBinding("employee", vf.createURI("urn:Bob"));
-        bs.addBinding("business", vf.createURI("urn:TacoPlace"));
-        expected.add( new VisibilityBindingSet(bs, "a&b&c") );
-
-        bs = new MapBindingSet();
-        bs.addBinding("person", vf.createURI("urn:Alice"));
-        bs.addBinding("employee", vf.createURI("urn:Bob"));
-        bs.addBinding("business", vf.createURI("urn:BurgerJoint"));
-        expected.add( new VisibilityBindingSet(bs, "c&(b|c)") );
-
-        // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, 
resultsTopic, builder, statements, expected, 
VisibilityBindingSetDeserializer.class);
-    }
-
-    @Test
-    public void newRightResult() throws Exception {
-        // Enumerate some topics that will be re-used
-        final String ryaInstance = UUID.randomUUID().toString();
-        final UUID queryId = UUID.randomUUID();
-        final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
-        // Setup a topology.
-        final String query =
-                "SELECT * WHERE { " +
-                    "?person <urn:talksTo> ?employee ." +
-                    "?employee <urn:worksAt> ?business" +
-                " }";
-        final TopologyFactory factory = new TopologyFactory();
-        final TopologyBuilder builder = factory.build(query, statementsTopic, 
resultsTopic, new RandomUUIDFactory());
-
-        // Create some statements that generate a bunch of right SP results.
-        final ValueFactory vf = new ValueFactoryImpl();
-        final List<VisibilityStatement> statements = new ArrayList<>();
-        statements.add( new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Bob"), 
vf.createURI("urn:worksAt"), vf.createURI("urn:TacoPlace")), "a&b") );
-        statements.add( new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Charlie"), 
vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "a") );
-        statements.add( new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Eve"), 
vf.createURI("urn:worksAt"), vf.createURI("urn:CoffeeShop")), "b") );
-        statements.add( new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Bob"), 
vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "b|c") );
-
-        // Add a statement that will generate a left result that joins with 
some of those right results.
-        statements.add( new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Alice"), 
vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "c") );
-
-        // Make the expected results.
-        final Set<VisibilityBindingSet> expected = new HashSet<>();
-        MapBindingSet bs = new MapBindingSet();
-        bs.addBinding("person", vf.createURI("urn:Alice"));
-        bs.addBinding("employee", vf.createURI("urn:Bob"));
-        bs.addBinding("business", vf.createURI("urn:TacoPlace"));
-        expected.add( new VisibilityBindingSet(bs, "a&b&c") );
-
-        bs = new MapBindingSet();
-        bs.addBinding("person", vf.createURI("urn:Alice"));
-        bs.addBinding("employee", vf.createURI("urn:Bob"));
-        bs.addBinding("business", vf.createURI("urn:BurgerJoint"));
-        expected.add( new VisibilityBindingSet(bs, "c&(b|c)") );
-
-        // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, 
resultsTopic, builder, statements, expected, 
VisibilityBindingSetDeserializer.class);
-    }
-
-    @Test
-    public void newResultsBothSides() throws Exception {
-        // Enumerate some topics that will be re-used
-        final String ryaInstance = UUID.randomUUID().toString();
-        final UUID queryId = UUID.randomUUID();
-        final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
-        // Setup a topology.
-        final String query =
-                "SELECT * WHERE { " +
-                    "?person <urn:talksTo> ?employee ." +
-                    "?employee <urn:worksAt> ?business" +
-                " }";
-        final TopologyFactory factory = new TopologyFactory();
-        final TopologyBuilder builder = factory.build(query, statementsTopic, 
resultsTopic, new RandomUUIDFactory());
-
-        // Create some statements that generate a bunch of right SP results.
-        final ValueFactory vf = new ValueFactoryImpl();
-        final List<VisibilityStatement> statements = new ArrayList<>();
-        statements.add( new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Bob"), 
vf.createURI("urn:worksAt"), vf.createURI("urn:TacoPlace")), "a&b") );
-        statements.add( new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Alice"), 
vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "c") );
-        statements.add( new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Charlie"), 
vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "a") );
-        statements.add( new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Eve"), 
vf.createURI("urn:worksAt"), vf.createURI("urn:CoffeeShop")), "b") );
-        statements.add( new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Bob"), 
vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "b|c") );
-        statements.add( new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Bob"), 
vf.createURI("urn:talksTo"), vf.createURI("urn:Charlie")), "c") );
-
-        // Make the expected results.
-        final Set<VisibilityBindingSet> expected = new HashSet<>();
-        MapBindingSet bs = new MapBindingSet();
-        bs.addBinding("person", vf.createURI("urn:Alice"));
-        bs.addBinding("employee", vf.createURI("urn:Bob"));
-        bs.addBinding("business", vf.createURI("urn:TacoPlace"));
-        expected.add( new VisibilityBindingSet(bs, "a&b&c") );
-
-        bs = new MapBindingSet();
-        bs.addBinding("person", vf.createURI("urn:Alice"));
-        bs.addBinding("employee", vf.createURI("urn:Bob"));
-        bs.addBinding("business", vf.createURI("urn:BurgerJoint"));
-        expected.add( new VisibilityBindingSet(bs, "c&(b|c)") );
-
-        bs = new MapBindingSet();
-        bs.addBinding("person", vf.createURI("urn:Bob"));
-        bs.addBinding("employee", vf.createURI("urn:Charlie"));
-        bs.addBinding("business", vf.createURI("urn:BurgerJoint"));
-        expected.add( new VisibilityBindingSet(bs, "a&c") );
-
-        // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, 
resultsTopic, builder, statements, expected, 
VisibilityBindingSetDeserializer.class);
-    }
-
-    @Test
-    public void manyJoins() throws Exception {
-        // Enumerate some topics that will be re-used
-        final String ryaInstance = UUID.randomUUID().toString();
-        final UUID queryId = UUID.randomUUID();
-        final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
-        // Setup a topology.
-        final String query =
-                "SELECT * WHERE { " +
-                    "?person <urn:talksTo> ?employee ." +
-                    "?employee <urn:worksAt> ?business ." +
-                    "?employee <urn:hourlyWage> ?wage ." +
-                " }";
-        final TopologyFactory factory = new TopologyFactory();
-        final TopologyBuilder builder = factory.build(query, statementsTopic, 
resultsTopic, new RandomUUIDFactory());
-
-        // Create some statements that generate a bunch of right SP results.
-        final ValueFactory vf = new ValueFactoryImpl();
-        final List<VisibilityStatement> statements = new ArrayList<>();
-        statements.add( new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Alice"), 
vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
-        statements.add( new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Bob"), 
vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "a") );
-        statements.add( new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Bob"), 
vf.createURI("urn:hourlyWage"), vf.createLiteral(7.25)), "a") );
-
-        // Make the expected results.
-        final Set<VisibilityBindingSet> expected = new HashSet<>();
-        final MapBindingSet bs = new MapBindingSet();
-        bs.addBinding("person", vf.createURI("urn:Alice"));
-        bs.addBinding("employee", vf.createURI("urn:Bob"));
-        bs.addBinding("business", vf.createURI("urn:BurgerJoint"));
-        bs.addBinding("wage", vf.createLiteral(7.25));
-        expected.add( new VisibilityBindingSet(bs, "a") );
-
-        // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, 
resultsTopic, builder, statements, expected, 
VisibilityBindingSetDeserializer.class);
-    }
-
-    @Test
-    public void leftJoin() throws Exception {
-        // Enumerate some topics that will be re-used
-        final String ryaInstance = UUID.randomUUID().toString();
-        final UUID queryId = UUID.randomUUID();
-        final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
-        // Setup a topology.
-        final String query =
-                "SELECT * WHERE { " +
-                    "?person <urn:talksTo> ?employee ." +
-                    "OPTIONAL{ ?employee <urn:worksAt> ?business } " +
-                " }";
-        final TopologyFactory factory = new TopologyFactory();
-        final TopologyBuilder builder = factory.build(query, statementsTopic, 
resultsTopic, new RandomUUIDFactory());
-
-        // Create some statements that generate a result that includes the 
optional value as well as one that does not.
-        final ValueFactory vf = new ValueFactoryImpl();
-        final List<VisibilityStatement> statements = new ArrayList<>();
-        statements.add( new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Alice"), 
vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
-        statements.add( new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Bob"), 
vf.createURI("urn:worksAt"), vf.createURI("urn:TacoPlace")), "b") );
-        statements.add( new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:Bob"), 
vf.createURI("urn:talksTo"), vf.createURI("urn:Charlie")), "c") );
-        statements.add( new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:David"), 
vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "d") );
-
-        // Make the expected results.
-        final Set<VisibilityBindingSet> expected = new HashSet<>();
-        MapBindingSet bs = new MapBindingSet();
-        bs.addBinding("person", vf.createURI("urn:Alice"));
-        bs.addBinding("employee", vf.createURI("urn:Bob"));
-        expected.add( new VisibilityBindingSet(bs, "a") );
-
-        bs = new MapBindingSet();
-        bs.addBinding("person", vf.createURI("urn:Alice"));
-        bs.addBinding("employee", vf.createURI("urn:Bob"));
-        bs.addBinding("business", vf.createURI("urn:TacoPlace"));
-        expected.add( new VisibilityBindingSet(bs, "a&b") );
-
-        bs = new MapBindingSet();
-        bs.addBinding("person", vf.createURI("urn:Bob"));
-        bs.addBinding("employee", vf.createURI("urn:Charlie"));
-        expected.add( new VisibilityBindingSet(bs, "c") );
-
-        // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, 
resultsTopic, builder, statements, expected, 
VisibilityBindingSetDeserializer.class);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
deleted file mode 100644
index a8de401..0000000
--- 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.streams.kafka.processors.projection;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.rya.api.model.VisibilityStatement;
-import org.apache.rya.streams.kafka.KafkaTopics;
-import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
-import 
org.apache.rya.streams.kafka.processors.projection.MultiProjectionProcessorSupplier.MultiProjectionProcessor;
-import 
org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
-import org.apache.rya.streams.kafka.topology.TopologyFactory;
-import org.apache.rya.test.kafka.KafkaTestInstanceRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.openrdf.model.BNode;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.model.vocabulary.RDF;
-
-/**
- * Integration tests the methods of {@link MultiProjectionProcessor}.
- */
-public class MultiProjectionProcessorIT {
-
-    @Rule
-    public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
-
-    @Test
-    public void showProcessorWorks() throws Exception {
-        // Enumerate some topics that will be re-used
-        final String ryaInstance = UUID.randomUUID().toString();
-        final UUID queryId = UUID.randomUUID();
-        final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
-        // Create a topology for the Query that will be tested.
-        final String sparql =
-                "CONSTRUCT {" +
-                    "_:b a <urn:movementObservation> ; " +
-                    "<urn:location> ?location ; " +
-                    "<urn:direction> ?direction ; " +
-                "}" +
-                "WHERE {" +
-                    "?thing <urn:corner> ?location ." +
-                    "?thing <urn:compass> ?direction." +
-                "}";
-
-        final String bNodeId = UUID.randomUUID().toString();
-        final TopologyBuilder builder = new TopologyFactory().build(sparql, 
statementsTopic, resultsTopic, () -> bNodeId);
-
-        // Create the statements that will be input into the query.
-        final ValueFactory vf = new ValueFactoryImpl();
-        final List<VisibilityStatement> statements = new ArrayList<>();
-        statements.add( new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:car1"), 
vf.createURI("urn:compass"), vf.createURI("urn:NW")), "a") );
-        statements.add( new VisibilityStatement(
-                vf.createStatement(vf.createURI("urn:car1"), 
vf.createURI("urn:corner"), vf.createURI("urn:corner1")), "a") );
-
-        // Make the expected results.
-        final Set<VisibilityStatement> expected = new HashSet<>();
-        final BNode blankNode = vf.createBNode(bNodeId);
-
-        expected.add(new VisibilityStatement(vf.createStatement(blankNode, 
RDF.TYPE, vf.createURI("urn:movementObservation")), "a"));
-        expected.add(new VisibilityStatement(vf.createStatement(blankNode, 
vf.createURI("urn:direction"), vf.createURI("urn:NW")), "a"));
-        expected.add(new VisibilityStatement(vf.createStatement(blankNode, 
vf.createURI("urn:location"), vf.createURI("urn:corner1")), "a"));
-
-        // Run the test.
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, 
resultsTopic, builder, statements, expected, 
VisibilityStatementDeserializer.class);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
deleted file mode 100644
index 2af3a49..0000000
--- 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.streams.kafka.processors.projection;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.rya.api.function.projection.RandomUUIDFactory;
-import org.apache.rya.api.model.VisibilityBindingSet;
-import org.apache.rya.api.model.VisibilityStatement;
-import org.apache.rya.streams.kafka.KafkaTopics;
-import org.apache.rya.streams.kafka.RyaStreamsTestUtil;
-import 
org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor;
-import 
org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
-import org.apache.rya.streams.kafka.topology.TopologyFactory;
-import org.apache.rya.test.kafka.KafkaTestInstanceRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.query.impl.MapBindingSet;
-
-import com.google.common.collect.Sets;
-
-/**
- * Integration tests the methods of {@link StatementPatternProcessor}.
- */
-public class ProjectionProcessorIT {
-
-    @Rule
-    public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
-
-    @Test
-    public void showProcessorWorks() throws Exception {
-        // Enumerate some topics that will be re-used
-        final String ryaInstance = UUID.randomUUID().toString();
-        final UUID queryId = UUID.randomUUID();
-        final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
-        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
-
-        // Create a topology for the Query that will be tested.
-        final String sparql =
-                "SELECT (?person AS ?p) ?otherPerson " +
-                "WHERE { " +
-                    "?person <urn:talksTo> ?otherPerson . " +
-                "}";
-
-        final TopologyBuilder builder = new TopologyFactory().build(sparql, 
statementsTopic, resultsTopic, new RandomUUIDFactory());
-
-        // Load some data into the input topic.
-        final ValueFactory vf = new ValueFactoryImpl();
-        final List<VisibilityStatement> statements = new ArrayList<>();
-        statements.add( new 
VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), 
vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
-
-        // Show the correct binding set results from the job.
-        final Set<VisibilityBindingSet> expected = new HashSet<>();
-
-        final MapBindingSet expectedBs = new MapBindingSet();
-        expectedBs.addBinding("p", vf.createURI("urn:Alice"));
-        expectedBs.addBinding("otherPerson", vf.createURI("urn:Bob"));
-        expected.add(new VisibilityBindingSet(expectedBs, "a"));
-
-        RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, 
resultsTopic, builder, statements, Sets.newHashSet(expected), 
VisibilityBindingSetDeserializer.class);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.streams/pom.xml b/extras/rya.streams/pom.xml
index bd559d2..dd876a0 100644
--- a/extras/rya.streams/pom.xml
+++ b/extras/rya.streams/pom.xml
@@ -35,7 +35,10 @@
 
     <modules>
         <module>kafka</module>
+        <module>kafka-test</module>
         <module>api</module>
         <module>client</module>
+        <module>geo</module>
+        <module>integration</module>
     </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 58cc7c5..31b17f8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -329,6 +329,11 @@ under the License.
                 <artifactId>rya.streams.kafka</artifactId>
                 <version>${project.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.apache.rya</groupId>
+                <artifactId>rya.streams.kafka-test</artifactId>
+                <version>${project.version}</version>
+            </dependency>
 
             <dependency>
                 <groupId>org.apache.thrift</groupId>
@@ -357,6 +362,11 @@ under the License.
             </dependency>
             <dependency>
                 <groupId>org.apache.rya</groupId>
+                <artifactId>rya.test.rdf</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.rya</groupId>
                 <artifactId>rya.test.kafka</artifactId>
                 <version>${project.version}</version>
             </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/test/pom.xml
----------------------------------------------------------------------
diff --git a/test/pom.xml b/test/pom.xml
index d458f8f..d13ea0b 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -35,5 +35,6 @@
 
     <modules>
         <module>kafka</module>
+        <module>rdf</module>
     </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/test/rdf/pom.xml
----------------------------------------------------------------------
diff --git a/test/rdf/pom.xml b/test/rdf/pom.xml
new file mode 100644
index 0000000..f8dae1d
--- /dev/null
+++ b/test/rdf/pom.xml
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+    <parent>
+        <groupId>org.apache.rya</groupId>
+        <artifactId>rya.test.parent</artifactId>
+        <version>3.2.12-incubating-SNAPSHOT</version>
+    </parent>
+    
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>rya.test.rdf</artifactId>
+    
+    <name>Apache Rya Test RDF</name>
+    <description>
+        This module contains the Rya Test RDF components that help write RDF 
based tests.
+    </description>
+
+    <dependencies>
+        <!-- Third Party Dependencies -->
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-queryalgebra-model</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-queryparser-sparql</artifactId>
+        </dependency>
+    
+        <dependency>
+            <groupId>com.github.stephenc.findbugs</groupId>
+            <artifactId>findbugs-annotations</artifactId>
+        </dependency>
+    
+        <!-- Testing dependencies. -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>compile</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/test/rdf/src/main/java/org/apache/rya/streams/kafka/RdfTestUtil.java
----------------------------------------------------------------------
diff --git 
a/test/rdf/src/main/java/org/apache/rya/streams/kafka/RdfTestUtil.java 
b/test/rdf/src/main/java/org/apache/rya/streams/kafka/RdfTestUtil.java
new file mode 100644
index 0000000..b4388c3
--- /dev/null
+++ b/test/rdf/src/main/java/org/apache/rya/streams/kafka/RdfTestUtil.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.openrdf.query.algebra.Filter;
+import org.openrdf.query.algebra.MultiProjection;
+import org.openrdf.query.algebra.Projection;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A set of utility functions that are useful when writing tests RDF functions.
+ */
+@DefaultAnnotation(NonNull.class)
+public final class RdfTestUtil {
+
+    private RdfTestUtil() { }
+
+    /**
+     * Fetch the {@link StatementPattern} from a SPARQL string.
+     *
+     * @param sparql - A SPARQL query that contains only a single Statement 
Patern. (not nul)
+     * @return The {@link StatementPattern} that was in the query, if it could 
be found. Otherwise {@code null}
+     * @throws Exception The statement pattern could not be found in the 
parsed SPARQL query.
+     */
+    public static @Nullable StatementPattern getSp(final String sparql) throws 
Exception {
+        requireNonNull(sparql);
+
+        final AtomicReference<StatementPattern> statementPattern = new 
AtomicReference<>();
+        final ParsedQuery parsed = new SPARQLParser().parseQuery(sparql, null);
+        parsed.getTupleExpr().visitChildren(new 
QueryModelVisitorBase<Exception>() {
+            @Override
+            public void meet(final StatementPattern node) throws Exception {
+                statementPattern.set(node);
+            }
+        });
+        return statementPattern.get();
+    }
+
+    /**
+     * Get the first {@link Projection} node from a SPARQL query.
+     *
+     * @param sparql - The query that contains a single Projection node.
+     * @return The first {@link Projection} that is encountered.
+     * @throws Exception The query could not be parsed.
+     */
+    public static @Nullable Projection getProjection(final String sparql) 
throws Exception {
+        requireNonNull(sparql);
+
+        final AtomicReference<Projection> projection = new AtomicReference<>();
+        final ParsedQuery parsed = new SPARQLParser().parseQuery(sparql, null);
+        parsed.getTupleExpr().visit(new QueryModelVisitorBase<Exception>() {
+            @Override
+            public void meet(final Projection node) throws Exception {
+                projection.set(node);
+            }
+        });
+
+        return projection.get();
+    }
+
+    /**
+     * Get the first {@link MultiProjection} node from a SPARQL query.
+     *
+     * @param sparql - The query that contains a single Projection node.
+     * @return The first {@link MultiProjection} that is encountered.
+     * @throws Exception The query could not be parsed.
+     */
+    public static @Nullable MultiProjection getMultiProjection(final String 
sparql) throws Exception {
+        requireNonNull(sparql);
+
+        final AtomicReference<MultiProjection> multiProjection = new 
AtomicReference<>();
+        final ParsedQuery parsed = new SPARQLParser().parseQuery(sparql, null);
+        parsed.getTupleExpr().visit(new QueryModelVisitorBase<Exception>() {
+            @Override
+            public void meet(final MultiProjection node) throws Exception {
+                multiProjection.set(node);
+            }
+        });
+
+        return multiProjection.get();
+    }
+
+    /**
+     * Get the first {@link Filter} node from a SPARQL query.
+     *
+     * @param sparql - The query that contains a single Projection node.
+     * @return The first {@link Filter} that is encountered.
+     * @throws Exception The query could not be parsed.
+     */
+    public static @Nullable Filter getFilter(final String sparql) throws 
Exception {
+        requireNonNull(sparql);
+
+        final AtomicReference<Filter> filter = new AtomicReference<>();
+        final ParsedQuery parsed = new SPARQLParser().parseQuery(sparql, null);
+        parsed.getTupleExpr().visit(new QueryModelVisitorBase<Exception>() {
+            @Override
+            public void meet(final Filter node) throws Exception {
+                filter.set(node);
+            }
+        });
+
+        return filter.get();
+    }
+}
\ No newline at end of file

Reply via email to