RYA-377 Implement the Filter processors for Rya Streams.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/a5e36180 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/a5e36180 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/a5e36180 Branch: refs/heads/master Commit: a5e361806faeae26ff5a4acfcf2dd4bb5f74a75f Parents: da63fd1 Author: kchilton2 <kevin.e.chil...@gmail.com> Authored: Tue Nov 21 12:40:54 2017 -0500 Committer: caleb <caleb.me...@parsons.com> Committed: Tue Jan 9 15:13:01 2018 -0500 ---------------------------------------------------------------------- .../api/function/Filter/FilterEvaluator.java | 117 ++++++++++++++++++ .../function/filter/FilterEvaluatorTest.java | 108 +++++++++++++++++ .../filter/FilterProcessorSupplier.java | 120 +++++++++++++++++++ .../streams/kafka/topology/TopologyFactory.java | 18 +++ .../apache/rya/streams/kafka/RdfTestUtil.java | 23 ++++ .../processors/filter/FilterProcessorIT.java | 86 +++++++++++++ .../processors/filter/FilterProcessorTest.java | 75 ++++++++++++ 7 files changed, 547 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a5e36180/common/rya.api.function/src/main/java/org/apache/rya/api/function/Filter/FilterEvaluator.java ---------------------------------------------------------------------- diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/Filter/FilterEvaluator.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/Filter/FilterEvaluator.java new file mode 100644 index 0000000..d1e1776 --- /dev/null +++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/Filter/FilterEvaluator.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.function.Filter; + +import static java.util.Objects.requireNonNull; + +import org.apache.rya.api.model.VisibilityBindingSet; +import org.openrdf.model.Resource; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.Filter; +import org.openrdf.query.algebra.ValueExpr; +import org.openrdf.query.algebra.evaluation.TripleSource; +import org.openrdf.query.algebra.evaluation.impl.EvaluationStrategyImpl; +import org.openrdf.query.algebra.evaluation.util.QueryEvaluationUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import info.aduna.iteration.CloseableIteration; + +/** + * Processes a {@link Filter} node from a SPARQL query. + */ +@DefaultAnnotation(NonNull.class) +public class FilterEvaluator { + private static final Logger log = LoggerFactory.getLogger(FilterEvaluator.class); + + /** + * Is used to evaluate the conditions of a {@link Filter}. + */ + private static final EvaluationStrategyImpl EVALUATOR = new EvaluationStrategyImpl( + new TripleSource() { + private final ValueFactory valueFactory = new ValueFactoryImpl(); + + @Override + public ValueFactory getValueFactory() { + return valueFactory; + } + + @Override + public CloseableIteration<? extends Statement, QueryEvaluationException> getStatements( + final Resource arg0, + final URI arg1, + final Value arg2, + final Resource... arg3) throws QueryEvaluationException { + throw new UnsupportedOperationException(); + } + }); + + private final ValueExpr condition; + + /** + * Constructs an instance of {@link FilterEvaluator}. + * + * @param condition - The condition that defines what passes the filter function. (not null) + */ + public FilterEvaluator(final ValueExpr condition) { + this.condition = requireNonNull(condition); + } + + /** + * Make a {@link FilterEvaluator} that processes the logic of a {@link Filter}. + * + * @param filter - Defines the Filter that will be processed. (not null) + * @return The {@link FilterEvaluator} for the provided {@link Filter}. + */ + public static FilterEvaluator make(final Filter filter) { + requireNonNull(filter); + final ValueExpr condition = filter.getCondition(); + return new FilterEvaluator(condition); + } + + /** + * Checks to see if a {@link VisibilityBindingSet} should be included in the results or not. + * + * @param bs - The value that will be evaluated against the filter. (not null) + * @return {@code true} if the binding set matches the filter and it should be included in the node's results, + * otherwise {@code false} and it should be excluded. + */ + public boolean filter(final VisibilityBindingSet bs) { + requireNonNull(bs); + + try { + final Value result = EVALUATOR.evaluate(condition, bs); + return QueryEvaluationUtil.getEffectiveBooleanValue(result); + } catch (final QueryEvaluationException e) { + //False returned because for whatever reason, the ValueExpr could not be evaluated. + //In the event that the ValueExpr is a FunctionCall, this Exception will be generated if + //the Function URI is a valid URI that was found in the FunctionRegistry, but the arguments + //for that Function could not be parsed. + log.error("Could not evaluate a Filter.", e); + return false; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a5e36180/common/rya.api.function/src/test/java/org/apache/rya/api/function/filter/FilterEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/common/rya.api.function/src/test/java/org/apache/rya/api/function/filter/FilterEvaluatorTest.java b/common/rya.api.function/src/test/java/org/apache/rya/api/function/filter/FilterEvaluatorTest.java new file mode 100644 index 0000000..8bbf005 --- /dev/null +++ b/common/rya.api.function/src/test/java/org/apache/rya/api/function/filter/FilterEvaluatorTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.function.filter; + +import static java.util.Objects.requireNonNull; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.rya.api.function.Filter.FilterEvaluator; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.junit.Test; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.algebra.Filter; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; +import org.openrdf.query.impl.MapBindingSet; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; + +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * Unit tests the methods of {@link FilterEvaluator}. + */ +public class FilterEvaluatorTest { + + @Test + public void matches() throws Exception { + // Read the filter object from a SPARQL query. + final Filter filter = getFilter( + "SELECT * " + + "WHERE { " + + "FILTER(?age < 10)" + + "?person <urn:age> ?age " + + "}"); + + // Create the input binding set. + final ValueFactory vf = new ValueFactoryImpl(); + final MapBindingSet bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("age", vf.createLiteral(9)); + final VisibilityBindingSet visBs = new VisibilityBindingSet(bs); + + // Test the evaluator. + assertTrue( FilterEvaluator.make(filter).filter(visBs) ); + } + + @Test + public void doesNotMatch() throws Exception { + // Read the filter object from a SPARQL query. + final Filter filter = getFilter( + "SELECT * " + + "WHERE { " + + "FILTER(?age < 10)" + + "?person <urn:age> ?age " + + "}"); + + // Create the input binding set. + final ValueFactory vf = new ValueFactoryImpl(); + final MapBindingSet bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("age", vf.createLiteral(11)); + final VisibilityBindingSet visBs = new VisibilityBindingSet(bs); + + // Test the evaluator. + assertFalse( FilterEvaluator.make(filter).filter(visBs) ); + } + + /** + * Get the first {@link Filter} node from a SPARQL query. + * + * @param sparql - The query that contains a single Projection node. + * @return The first {@link Filter} that is encountered. + * @throws Exception The query could not be parsed. + */ + public static @Nullable Filter getFilter(final String sparql) throws Exception { + requireNonNull(sparql); + + final AtomicReference<Filter> filter = new AtomicReference<>(); + final ParsedQuery parsed = new SPARQLParser().parseQuery(sparql, null); + parsed.getTupleExpr().visit(new QueryModelVisitorBase<Exception>() { + @Override + public void meet(final Filter node) throws Exception { + filter.set(node); + } + }); + + return filter.get(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a5e36180/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorSupplier.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorSupplier.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorSupplier.java new file mode 100644 index 0000000..5997237 --- /dev/null +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorSupplier.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors.filter; + +import static java.util.Objects.requireNonNull; + +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.rya.api.function.Filter.FilterEvaluator; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.streams.kafka.processors.ProcessorResult; +import org.apache.rya.streams.kafka.processors.ProcessorResult.ResultType; +import org.apache.rya.streams.kafka.processors.ProcessorResultFactory; +import org.apache.rya.streams.kafka.processors.RyaStreamsProcessor; +import org.apache.rya.streams.kafka.processors.RyaStreamsProcessorSupplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Supplies {@link FilterProcessor} instances. + */ +@DefaultAnnotation(NonNull.class) +public class FilterProcessorSupplier extends RyaStreamsProcessorSupplier { + private static final Logger log = LoggerFactory.getLogger(FilterProcessorSupplier.class); + + private final FilterEvaluator filter; + + /** + * Constructs an instance of {@link FilterProcessorSupplier}. + * + * @param filter - Defines the filter the supplied processors will evaluate. (not null) + * @param resultFactory - The factory that the supplied processors will use to create results. (not null) + */ + public FilterProcessorSupplier( + final FilterEvaluator filter, + final ProcessorResultFactory resultFactory) { + super(resultFactory); + this.filter = requireNonNull(filter); + } + + @Override + public Processor<Object, ProcessorResult> get() { + return new FilterProcessor(filter, super.getResultFactory()); + } + + /** + * Evaluates {@link ProcessorResult}s against a {@link FilterEvaluator} and forwards the original result + * to a downstream processor if it passes the filter's condition. + */ + @DefaultAnnotation(NonNull.class) + public static class FilterProcessor extends RyaStreamsProcessor { + + private final FilterEvaluator filter; + private ProcessorContext context; + + /** + * Constructs an instance of {@link FilterProcessor}. + * + * @param filter - Defines the filter the supplied processor will evaluate. (not null) + * @param resultFactory - The factory that the processor will use to create results. (not null) + */ + public FilterProcessor( + final FilterEvaluator filter, + final ProcessorResultFactory resultFactory) { + super(resultFactory); + this.filter = requireNonNull(filter); + } + + @Override + public void init(final ProcessorContext context) { + this.context = context; + } + + @Override + public void process(final Object key, final ProcessorResult value) { + // Filters can only be unary. + if (value.getType() != ResultType.UNARY) { + throw new RuntimeException("The ProcessorResult to be processed must be Unary."); + } + + // If the value's binding set passes the filter, then forward it to the downstream processor. + final VisibilityBindingSet bindingSet = value.getUnary().getResult(); + log.debug("\nINPUT:\n{}", bindingSet); + if(filter.filter(bindingSet)) { + log.debug("\nOUTPUT:\n{}", bindingSet); + final ProcessorResult result = super.getResultFactory().make(bindingSet); + context.forward(key, result); + } + } + + @Override + public void punctuate(final long timestamp) { + // Do nothing. + } + + @Override + public void close() { + // Do nothing. + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a5e36180/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java index 08f3625..426b041 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java @@ -38,6 +38,7 @@ import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.state.Stores; +import org.apache.rya.api.function.Filter.FilterEvaluator; import org.apache.rya.api.function.join.IterativeJoin; import org.apache.rya.api.function.join.LeftOuterJoin; import org.apache.rya.api.function.join.NaturalJoin; @@ -50,6 +51,7 @@ import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult; import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side; import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult; import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier; +import org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier; import org.apache.rya.streams.kafka.processors.join.JoinProcessorSupplier; import org.apache.rya.streams.kafka.processors.output.BindingSetOutputFormatterSupplier; import org.apache.rya.streams.kafka.processors.output.StatementOutputFormatterSupplier; @@ -62,6 +64,7 @@ import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer; import org.openrdf.query.MalformedQueryException; import org.openrdf.query.algebra.BinaryTupleOperator; import org.openrdf.query.algebra.Extension; +import org.openrdf.query.algebra.Filter; import org.openrdf.query.algebra.Join; import org.openrdf.query.algebra.LeftJoin; import org.openrdf.query.algebra.MultiProjection; @@ -90,6 +93,7 @@ public class TopologyFactory implements TopologyBuilderFactory { private static final String STATEMENT_PATTERN_PREFIX = "SP_"; private static final String JOIN_PREFIX = "JOIN_"; private static final String PROJECTION_PREFIX = "PROJECTION_"; + private static final String FILTER_PREFIX = "FILTER_"; private static final String SINK = "SINK"; private List<ProcessorEntry> processorEntryList; @@ -428,6 +432,20 @@ public class TopologyFactory implements TopologyBuilderFactory { } @Override + public void meet(final Filter node) throws TopologyBuilderException { + final String id = FILTER_PREFIX + UUID.randomUUID(); + final Optional<Side> side = getSide(node); + + final FilterProcessorSupplier supplier = new FilterProcessorSupplier( + FilterEvaluator.make(node), + result -> getResult(side, result)); + + entries.add(new ProcessorEntry(node, id, side, supplier, Lists.newArrayList(node.getArg()))); + idMap.put(node, id); + super.meet(node); + } + + @Override public void meet(final Join node) throws TopologyBuilderException { final String id = JOIN_PREFIX + UUID.randomUUID(); meetJoin(id, new NaturalJoin(), node); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a5e36180/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RdfTestUtil.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RdfTestUtil.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RdfTestUtil.java index 190bad3..b4388c3 100644 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RdfTestUtil.java +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RdfTestUtil.java @@ -22,6 +22,7 @@ import static java.util.Objects.requireNonNull; import java.util.concurrent.atomic.AtomicReference; +import org.openrdf.query.algebra.Filter; import org.openrdf.query.algebra.MultiProjection; import org.openrdf.query.algebra.Projection; import org.openrdf.query.algebra.StatementPattern; @@ -105,4 +106,26 @@ public final class RdfTestUtil { return multiProjection.get(); } + + /** + * Get the first {@link Filter} node from a SPARQL query. + * + * @param sparql - The query that contains a single Projection node. + * @return The first {@link Filter} that is encountered. + * @throws Exception The query could not be parsed. + */ + public static @Nullable Filter getFilter(final String sparql) throws Exception { + requireNonNull(sparql); + + final AtomicReference<Filter> filter = new AtomicReference<>(); + final ParsedQuery parsed = new SPARQLParser().parseQuery(sparql, null); + parsed.getTupleExpr().visit(new QueryModelVisitorBase<Exception>() { + @Override + public void meet(final Filter node) throws Exception { + filter.set(node); + } + }); + + return filter.get(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a5e36180/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java new file mode 100644 index 0000000..0348dcd --- /dev/null +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors.filter; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.rya.api.function.projection.RandomUUIDFactory; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.model.VisibilityStatement; +import org.apache.rya.streams.kafka.KafkaTestUtil; +import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier.FilterProcessor; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; +import org.apache.rya.streams.kafka.topology.TopologyFactory; +import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.junit.Rule; +import org.junit.Test; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.impl.MapBindingSet; + +/** + * Integration tests the methods of {@link FilterProcessor}. + */ +public class FilterProcessorIT { + + @Rule + public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true); + + @Test + public void showProcessorWorks() throws Exception { + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Get the RDF model objects that will be used to build the query. + final String sparql = + "SELECT * " + + "WHERE { " + + "FILTER(?age < 10)" + + "?person <urn:age> ?age " + + "}"; + + // Setup a topology. + final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); + + // Create the statements that will be input into the query. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add(new VisibilityStatement(vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(11)), "a")); + statements.add(new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(9)), "a")); + + // Make the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + final MapBindingSet bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("age", vf.createLiteral(9)); + expected.add( new VisibilityBindingSet(bs, "a") ); + + // Run the test. + KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/a5e36180/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorTest.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorTest.java new file mode 100644 index 0000000..4c44bae --- /dev/null +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors.filter; + +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.rya.api.function.Filter.FilterEvaluator; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.streams.kafka.RdfTestUtil; +import org.apache.rya.streams.kafka.processors.ProcessorResult; +import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult; +import org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier.FilterProcessor; +import org.junit.Test; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.algebra.Filter; +import org.openrdf.query.impl.MapBindingSet; + +/** + * Unit tests the methods of {@link FilterProcessor}. + */ +public class FilterProcessorTest { + + @Test + public void showFilterFunctionIsCalled() throws Exception { + // Read the filter object from a SPARQL query. + final Filter filter = RdfTestUtil.getFilter( + "SELECT * " + + "WHERE { " + + "FILTER(?age < 10)" + + "?person <urn:age> ?age " + + "}"); + + // Create a Binding Set that will be passed into the Filter function based on the where clause. + final ValueFactory vf = new ValueFactoryImpl(); + final MapBindingSet bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("age", vf.createLiteral(9)); + final VisibilityBindingSet inputVisBs = new VisibilityBindingSet(bs, "a"); + + // Mock the processor context that will be invoked. + final ProcessorContext context = mock(ProcessorContext.class); + + // Run the test. + final FilterProcessor processor = new FilterProcessor( + FilterEvaluator.make(filter), + result -> ProcessorResult.make(new UnaryResult(result))); + processor.init(context); + processor.process("key", ProcessorResult.make(new UnaryResult(inputVisBs))); + + // Verify the binding set was passed through. + verify(context, times(1)).forward(eq("key"), eq(ProcessorResult.make(new UnaryResult(inputVisBs)))); + + } +} \ No newline at end of file