RYA-377 Implement the JoinProcessor for KafkaStreams.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/3471cb7e Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/3471cb7e Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/3471cb7e Branch: refs/heads/master Commit: 3471cb7eedaadde5352c584cfa4135a4dbd66520 Parents: c4966ff Author: kchilton2 <[email protected]> Authored: Fri Nov 10 13:44:25 2017 -0500 Committer: caleb <[email protected]> Committed: Tue Jan 9 15:13:00 2018 -0500 ---------------------------------------------------------------------- extras/rya.streams/kafka/pom.xml | 5 +- .../StatementPatternProcessorSupplier.java | 5 + .../processors/join/CloseableIterator.java | 32 ++ .../processors/join/JoinProcessorSupplier.java | 191 +++++++ .../kafka/processors/join/JoinStateStore.java | 11 +- .../processors/join/KeyValueJoinStateStore.java | 259 ++++++++++ .../apache/rya/streams/kafka/KafkaTestUtil.java | 82 +++ .../processors/StatementPatternProcessorIT.java | 240 +++++++-- .../kafka/processors/join/JoinProcessorIT.java | 507 +++++++++++++++++++ .../kafka/src/test/resources/log4j.properties | 29 ++ 10 files changed, 1296 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3471cb7e/extras/rya.streams/kafka/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/pom.xml b/extras/rya.streams/kafka/pom.xml index 33cc985..d3f6891 100644 --- a/extras/rya.streams/kafka/pom.xml +++ b/extras/rya.streams/kafka/pom.xml @@ -57,11 +57,12 @@ under the License. <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency> - <dependency> + <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> </dependency> + <!-- Misc. dependencies --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> @@ -79,4 +80,4 @@ under the License. <scope>test</scope> </dependency> </dependencies> -</project> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3471cb7e/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorSupplier.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorSupplier.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorSupplier.java index 6991783..386fe98 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorSupplier.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorSupplier.java @@ -30,6 +30,8 @@ import org.apache.rya.api.model.VisibilityBindingSet; import org.apache.rya.api.model.VisibilityStatement; import org.openrdf.query.BindingSet; import org.openrdf.query.algebra.StatementPattern; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; @@ -70,6 +72,8 @@ public class StatementPatternProcessorSupplier implements ProcessorSupplier<Stri @DefaultAnnotation(NonNull.class) public static final class StatementPatternProcessor implements Processor<String, VisibilityStatement> { + private static final Logger log = LoggerFactory.getLogger(StatementPatternProcessor.class); + private final StatementPatternMatcher spMatcher; private final ProcessorResultFactory resultFactory; @@ -104,6 +108,7 @@ public class StatementPatternProcessorSupplier implements ProcessorSupplier<Stri // Wrap the binding set as a result and forward it to the downstream processor. final ProcessorResult resultValue = resultFactory.make(visBs); + log.debug("\nOUTPUT:\n{}", visBs); context.forward(key, resultValue); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3471cb7e/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/CloseableIterator.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/CloseableIterator.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/CloseableIterator.java new file mode 100644 index 0000000..9ea927d --- /dev/null +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/CloseableIterator.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors.join; + +import java.util.Iterator; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * An {@link Iterator} that is also {@link AutoCloseable}. + * + * @param <T> - The type of elements that will be iterated over. + */ +@DefaultAnnotation(NonNull.class) +public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable { } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3471cb7e/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java new file mode 100644 index 0000000..9ed2363 --- /dev/null +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors.join; + +import static java.util.Objects.requireNonNull; + +import java.util.Iterator; +import java.util.List; + +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.rya.api.function.join.IterativeJoin; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.streams.kafka.processors.ProcessorResult; +import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult; +import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side; +import org.apache.rya.streams.kafka.processors.ProcessorResultFactory; +import org.apache.rya.streams.kafka.processors.RyaStreamsProcessor; +import org.apache.rya.streams.kafka.processors.RyaStreamsProcessorSupplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Supplies {@link JoinProcessor} instances. + */ +@DefaultAnnotation(NonNull.class) +public class JoinProcessorSupplier extends RyaStreamsProcessorSupplier { + + private final String stateStoreName; + private final IterativeJoin join; + private final List<String> joinVars; + private final List<String> allVars; + + /** + * Constructs an instance of {@link JoinProcessorSupplier}. + * + * @param stateStoreName - The name of the state store the processor will use. (not null) + * @param join - The join function the supplied processor will use. (not null) + * @param joinVars - The variables that the supplied processor will join over. (not null) + * @param allVars - An ordered list of all the variables that may appear in resulting Binding Sets. + * This list must lead with the same variables and order as {@code joinVars}. (not null) + * @param resultFactory - The factory that the supplied processors will use to create results. (not null) + * @throws IllegalArgumentException Thrown if {@code allVars} does not start with {@code joinVars}. + */ + public JoinProcessorSupplier( + final String stateStoreName, + final IterativeJoin join, + final List<String> joinVars, + final List<String> allVars, + final ProcessorResultFactory resultFactory) throws IllegalArgumentException { + super(resultFactory); + this.stateStoreName = requireNonNull(stateStoreName); + this.join = requireNonNull(join); + this.joinVars = requireNonNull(joinVars); + this.allVars = requireNonNull(allVars); + + if(!allVars.subList(0, joinVars.size()).equals(joinVars)) { + throw new IllegalArgumentException("All vars must be lead by the join vars, but it did not. " + + "Join Vars: " + joinVars + ", All Vars: " + allVars); + } + } + + @Override + public Processor<Object, ProcessorResult> get() { + return new JoinProcessor(stateStoreName, join, joinVars, allVars, super.getResultFactory()); + } + + /** + * Joins {@link VisibilityBindingSet}s against all binding sets that were emitted on the other side. This function + * does not have an age off policy, so it will match everything that could have ever possibly matched, however this + * may become prohibitive for joins that match a large volume of binding sets since this will indefinitely grow + * within the state store. + */ + @DefaultAnnotation(NonNull.class) + public static class JoinProcessor extends RyaStreamsProcessor { + + private static final Logger log = LoggerFactory.getLogger(JoinProcessor.class); + + private final String stateStoreName; + private final IterativeJoin join; + private final List<String> joinVars; + private final List<String> allVars; + private final ProcessorResultFactory resultFactory; + + private ProcessorContext context; + private JoinStateStore joinStateStore; + + /** + * Constructs an instance of {@link JoinProcessor}. + * + * @param stateStoreName - The name of the state store the processor will use. (not null) + * @param join - The join function that the processor will use. (not null) + * @param joinVars - The variables that the processor will join over. (not null) + * @param allVars - An ordered list of all the variables that may appear in resulting Binding Sets. + * This list must lead with the same variables and order as {@code joinVars}. (not null) + * @param resultFactory - The factory that will format this processor's final results + * for the downstream processor. (not null) + */ + public JoinProcessor( + final String stateStoreName, + final IterativeJoin join, + final List<String> joinVars, + final List<String> allVars, + final ProcessorResultFactory resultFactory) { + super(resultFactory); + this.stateStoreName = requireNonNull(stateStoreName); + this.join = requireNonNull(join); + this.joinVars = requireNonNull(joinVars); + this.allVars = requireNonNull(allVars); + this.resultFactory = requireNonNull(resultFactory); + + if(!allVars.subList(0, joinVars.size()).equals(joinVars)) { + throw new IllegalArgumentException("All vars must be lead by the join vars, but it did not. " + + "Join Vars: " + joinVars + ", All Vars: " + allVars); + } + } + + @Override + public void init(final ProcessorContext context) { + // Hold onto the context so that we can foward results. + this.context = context; + + // Get a reference to the state store that keeps track of what can be joined with. + final KeyValueStore<String, VisibilityBindingSet> stateStore = + (KeyValueStore<String, VisibilityBindingSet>) context.getStateStore( stateStoreName ); + joinStateStore = new KeyValueJoinStateStore( stateStore, joinVars, allVars ); + } + + @Override + public void process(final Object key, final ProcessorResult value) { + // Log the key/value that have been encountered. + log.debug("\nINPUT:\nSide: {}\nBinding Set: {}", value.getBinary().getSide(), value.getBinary().getResult()); + + // Must be a binary result. + final BinaryResult binary = value.getBinary(); + + // Store the new result in the state store so that future joins may include it. + joinStateStore.store(binary); + + // Fetch the binding sets that the emitted value joins with. + try(final CloseableIterator<VisibilityBindingSet> otherSide = joinStateStore.getJoinedValues(binary)) { + // Create an iterator that performs the join operation. + final Iterator<VisibilityBindingSet> joinResults = binary.getSide() == Side.LEFT ? + join.newLeftResult(binary.getResult(), otherSide) : + join.newRightResult(otherSide, binary.getResult()); + + // Format each join result and forward it to the downstream processor. + while(joinResults.hasNext()) { + final VisibilityBindingSet joinResult = joinResults.next(); + final ProcessorResult resultValue = resultFactory.make(joinResult); + log.debug("\nOUTPUT:\n{}", joinResult); + context.forward(key, resultValue); + } + } catch (final Exception e) { + final String msg = "Problem encountered while iterating over the other side's values within the state store."; + log.error(msg, e); + throw new RuntimeException(msg, e); + } + } + + @Override + public void punctuate(final long timestamp) { + // Nothing to do. + } + + @Override + public void close() { + // Nothing to do. + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3471cb7e/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinStateStore.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinStateStore.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinStateStore.java index e53ec68..17a6ebb 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinStateStore.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinStateStore.java @@ -18,9 +18,6 @@ */ package org.apache.rya.streams.kafka.processors.join; -import java.util.Iterator; -import java.util.List; - import org.apache.rya.api.model.VisibilityBindingSet; import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult; @@ -37,17 +34,15 @@ public interface JoinStateStore { /** * Store a {@link VisibilityBindingSet} based on the side it was emitted from. * - * @param joinVars - An ordered list of the variables that are being joined over. (not null) * @param result - The result whose value will be stored. (not null) */ - public void store(List<String> joinVars, BinaryResult result); + public void store(BinaryResult result); /** * Get the previously stored {@link VisibilityBindingSet}s that join with the provided result. * - * @param joinVars - An ordered list of the variables to join over. (not null) - * @param result - Defines the values that will be used to join. (not null) + * @param result - The value that will be joined with. (not null) * @return The {@link VisibilityBinidngSet}s that join with {@code result}. */ - public Iterator<VisibilityBindingSet> getJoinedValues(List<String> joinVars, BinaryResult result); + public CloseableIterator<VisibilityBindingSet> getJoinedValues(BinaryResult result); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3471cb7e/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java new file mode 100644 index 0000000..d73b40e --- /dev/null +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors.join; + +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult; +import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side; +import org.openrdf.query.impl.MapBindingSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Joiner; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A {@link KeyValueStore} implementation of {@link JoinStateStore}. + * </p> + * This is a key/value store, so we need to store the {@link VisibilityBindingSet}s using keys that allow us to fetch + * all binding sets that join from a specific side. We use the following pattern to accomplish this: + * <pre> + * [side],[joinVar1 value], [joinVar2 value], ..., [joinVarN value] + * </pre> + * This will group all binding sets that have been emitted from a specific side and who have the same join variables + * next to each other within the store. This isn't enough information to fetch that group, though. We must provide a + * start and end key to bound the range that is fetched back. To accomplish this, we place a start of range marker + * as the first key for all unique [side]/[join values] groups, and an end of range marker as the last key for each + * of those groups. + * </p> + * The rows follow this pattern: + * <pre> + * [side],[joinVar1 value], [joinVar2 value], ..., [joinVarN value]0x00 + * [side],[joinVar1 value], [joinVar2 value], ..., [joinVarN value],[remainingBindingValues] + * [side],[joinVar1 value], [joinVar2 value], ..., [joinVarN value]0xFF + * </pre> + * </p> + * When an iterator over the results is returned, it skips over the start and end of range markers. + */ +@DefaultAnnotation(NonNull.class) +public class KeyValueJoinStateStore implements JoinStateStore { + + private static final Logger log = LoggerFactory.getLogger(KeyValueJoinStateStore.class); + + /** + * This is the minimum value in UTF-8 character. + */ + private static final String START_RANGE_SUFFIX = new String(new byte[] { 0x00 } ); + + /** + * This is the maximum value of a UTF-8 character. + */ + private static final String END_RANGE_SUFFIX = new String(new byte[] { (byte) 0XFF } ); + + /** + * A default empty value that is stored for a start of range or end of range marker. + */ + private static final VisibilityBindingSet RANGE_MARKER_VALUE = new VisibilityBindingSet(new MapBindingSet(), ""); + + private final KeyValueStore<String, VisibilityBindingSet> store; + private final List<String> joinVars; + private final List<String> allVars; + + /** + * Constructs an instance of {@link KeyValueJoinStateStore}. + * + * @param store - The state store that will be used. (not null) + * @param joinVars - The variables that are used to build grouping keys. (not null) + * @param allVars - The variables that are used to build full value keys. (not null) + * @throws IllegalArgumentException Thrown if {@code allVars} does not start with {@code joinVars}. + */ + public KeyValueJoinStateStore( + final KeyValueStore<String, VisibilityBindingSet> store, + final List<String> joinVars, + final List<String> allVars) throws IllegalArgumentException { + this.store = requireNonNull(store); + this.joinVars = requireNonNull(joinVars); + this.allVars = requireNonNull(allVars); + + for(int i = 0; i < joinVars.size(); i++) { + if(!joinVars.get(i).equals(allVars.get(i))) { + throw new IllegalArgumentException("All vars must be lead by the join vars, but it did not. " + + "Join Vars: " + joinVars + ", All Vars: " + allVars); + } + } + } + + @Override + public void store(final BinaryResult result) { + requireNonNull(result); + + // The join key prefix is an ordered list of values from the binding set that match the join variables. + // This is a prefix for every row that holds values for a specific set of join variable values. + final Side side = result.getSide(); + final VisibilityBindingSet bs = result.getResult(); + final String joinKeyPrefix = makeCommaDelimitedValues(side, joinVars, bs); + + final List<KeyValue<String, VisibilityBindingSet>> values = new ArrayList<>(); + + // For each join variable set, we need a start key for scanning, + final String startKey = joinKeyPrefix + START_RANGE_SUFFIX; + values.add( new KeyValue<>(startKey, RANGE_MARKER_VALUE) ); + + // The actual value that was emitted as a result. + final String valueKey = makeCommaDelimitedValues(side, allVars, bs); + values.add( new KeyValue<>(valueKey, bs) ); + + // And the end key for scanning. + final String endKey = joinKeyPrefix + END_RANGE_SUFFIX; + values.add( new KeyValue<>(endKey, RANGE_MARKER_VALUE) ); + + // Write the pairs to the store. + log.debug("\nStoring the following values: {}\n", values); + store.putAll( values ); + } + + @Override + public CloseableIterator<VisibilityBindingSet> getJoinedValues(final BinaryResult result) { + requireNonNull(result); + + // Get an iterator over the values that start with the join variables for the other side. + final Side otherSide = result.getSide() == Side.LEFT ? Side.RIGHT : Side.LEFT; + final VisibilityBindingSet bs = result.getResult(); + final String joinKeyPrefix = makeCommaDelimitedValues(otherSide, joinVars, bs); + + final String startKey = joinKeyPrefix + START_RANGE_SUFFIX; + final String endKey = joinKeyPrefix + END_RANGE_SUFFIX; + final KeyValueIterator<String, VisibilityBindingSet> rangeIt = store.range(startKey, endKey); + + // Return a CloseableIterator over the range's value fields, skipping the start and end entry. + return new CloseableIterator<VisibilityBindingSet>() { + + private Optional<VisibilityBindingSet> next = null; + + @Override + public boolean hasNext() { + // If the iterator has not been initialized yet, read a value in. + if(next == null) { + next = readNext(); + } + + // Return true if there is a next value, otherwise false. + return next.isPresent(); + } + + @Override + public VisibilityBindingSet next() { + // If the iterator has not been initialized yet, read a value in. + if(next == null) { + next = readNext(); + } + + // It's illegal to call next() when there is no next value. + if(!next.isPresent()) { + throw new IllegalStateException("May not invoke next() when there is nothing left in the Iterator."); + } + + // Update and return the next value. + final VisibilityBindingSet ret = next.get(); + log.debug("\nReturning: {}", ret); + next = readNext(); + return ret; + } + + private Optional<VisibilityBindingSet> readNext() { + // Check to see if there's anything left in the iterator. + if(!rangeIt.hasNext()) { + return Optional.empty(); + } + + // Read a candidate key/value pair from the iterator. + KeyValue<String, VisibilityBindingSet> candidate = rangeIt.next(); + + // If we are initializing, then the first thing we must read is a start of range marker. + if(next == null) { + if(!candidate.key.endsWith(START_RANGE_SUFFIX)) { + throw new IllegalStateException("The first key encountered must be a start of range key."); + } + log.debug("Read the start of range markers.\n"); + + // Read a new candidate to skip this one. + if(!rangeIt.hasNext()) { + throw new IllegalStateException("There must be another entry after the start of range key."); + } + candidate = rangeIt.next(); + } + + // If that value is an end of range key, then we are finished. Otherwise, return it. + else if(candidate.key.endsWith(END_RANGE_SUFFIX)) { + log.debug("Read the end of range marker.\n"); + + // If there are more messages, that's a problem. + if(rangeIt.hasNext()) { + throw new IllegalStateException("The end of range marker must be the last key in the iterator."); + } + + return Optional.empty(); + } + + // Otherwise we found a new value. + return Optional.of( candidate.value ); + } + + @Override + public void close() throws Exception { + rangeIt.close(); + } + }; + } + + /** + * A utility function that helps construct the keys used by {@link KeyValueJoinStateStore}. + * + * @param side - The side value for the key. (not null) + * @param vars - Which variables within the binding set to use for the key's values. (not null) + * @param bindingSet - The binding set the key is being constructed from. (not null) + * @return A comma delimited list of the binding values, leading with the side. + */ + private static String makeCommaDelimitedValues(final Side side, final List<String> vars, final VisibilityBindingSet bindingSet) { + requireNonNull(side); + requireNonNull(vars); + requireNonNull(bindingSet); + + // Make a an ordered list of the binding set variables. + final List<String> values = new ArrayList<>(); + values.add(side.toString()); + for(final String var : vars) { + values.add( bindingSet.hasBinding(var) ? bindingSet.getBinding(var).getValue().toString() : "" ); + } + + // Return a comma delimited list of those values. + return Joiner.on(",").join(values); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3471cb7e/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java index bff4fdb..0a1a8a4 100644 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java @@ -19,10 +19,13 @@ package org.apache.rya.streams.kafka; import static java.util.Objects.requireNonNull; +import static org.junit.Assert.assertEquals; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Properties; +import java.util.Set; import java.util.UUID; import org.apache.kafka.clients.consumer.Consumer; @@ -34,8 +37,20 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.model.VisibilityStatement; +import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; +import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer; import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import com.google.common.collect.Sets; + import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; @@ -124,4 +139,71 @@ public final class KafkaTestUtil { return values; } + + /** + * Runs a Kafka Streams topology, loads statements into the input topic, read the binding sets that come out of + * the results topic, and ensures the expected results match the read results. + * + * @param kafka - The embedded kafka instance that is being tested with. (not null) + * @param statementsTopic - The topic statements will be written to. (not null) + * @param resultsTopic - The topic results will be read from. (not null) + * @param builder - The streams topology that will be executed. (not null) + * @param startupMs - How long to wait for the topology to start before writing the statements. + * @param statements - The statements that will be loaded into the topic. (not null) + * @param expected - The expected results. (not null) + * @throws Exception If any exception was thrown while running the test. + */ + public static void runStreamProcessingTest( + final KafkaTestInstanceRule kafka, + final String statementsTopic, + final String resultsTopic, + final TopologyBuilder builder, + final int startupMs, + final List<VisibilityStatement> statements, + final Set<VisibilityBindingSet> expected) throws Exception { + requireNonNull(kafka); + requireNonNull(statementsTopic); + requireNonNull(resultsTopic); + requireNonNull(builder); + requireNonNull(statements); + requireNonNull(expected); + + // Explicitly create the topics that are being used. + kafka.createTopic(statementsTopic); + kafka.createTopic(resultsTopic); + + // Start the streams program. + final Properties props = kafka.createBootstrapServerConfig(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "StatementPatternProcessorIT"); + + final KafkaStreams streams = new KafkaStreams(builder, new StreamsConfig(props)); + streams.cleanUp(); + try { + streams.start(); + + // Wait for the streams application to start. Streams only see data after their consumers are connected. + Thread.sleep(startupMs); + + // Load the statements into the input topic. + try(Producer<String, VisibilityStatement> producer = KafkaTestUtil.makeProducer( + kafka, StringSerializer.class, VisibilityStatementSerializer.class)) { + new KafkaLoadStatements(statementsTopic, producer).fromCollection(statements); + } + + // Wait for the final results to appear in the output topic and verify the expected Binding Sets were found. + try(Consumer<String, VisibilityBindingSet> consumer = KafkaTestUtil.fromStartConsumer( + kafka, StringDeserializer.class, VisibilityBindingSetDeserializer.class)) { + // Register the topic. + consumer.subscribe(Arrays.asList(resultsTopic)); + + // Poll for the result. + final Set<VisibilityBindingSet> results = Sets.newHashSet( KafkaTestUtil.pollForResults(500, 6, expected.size(), consumer) ); + + // Show the correct binding sets results from the job. + assertEquals(expected, results); + } + } finally { + streams.close(); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3471cb7e/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java index 1b58b42..371fd0b 100644 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java @@ -18,34 +18,25 @@ */ package org.apache.rya.streams.kafka.processors; -import static org.junit.Assert.assertEquals; - import java.util.ArrayList; -import java.util.Arrays; +import java.util.HashSet; import java.util.List; -import java.util.Properties; +import java.util.Set; import java.util.UUID; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.rya.api.model.VisibilityBindingSet; import org.apache.rya.api.model.VisibilityStatement; import org.apache.rya.streams.kafka.KafkaTestUtil; import org.apache.rya.streams.kafka.KafkaTopics; import org.apache.rya.streams.kafka.RdfTestUtil; -import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements; import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult; import org.apache.rya.streams.kafka.processors.RyaStreamsSinkFormatterSupplier.RyaStreamsSinkFormatter; import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor; -import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer; import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer; -import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer; import org.apache.rya.test.kafka.KafkaTestInstanceRule; import org.junit.Rule; import org.junit.Test; @@ -63,7 +54,50 @@ public class StatementPatternProcessorIT { public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true); @Test - public void statementPatternMatches() throws Exception { + public void singlePattern_singleStatement() throws Exception { + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Get the StatementPattern object that will be evaluated. + final StatementPattern sp = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }"); + + // Setup a topology. + final TopologyBuilder builder = new TopologyBuilder(); + + // The topic that Statements are written to is used as a source. + builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic); + + // Add a processor that handles the first statement pattern. + builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS"); + + // Add a processor that formats the VisibilityBindingSet for output. + builder.addProcessor("SINK_FORMATTER", RyaStreamsSinkFormatter::new, "SP1"); + + // Add a sink that writes the data out to a new Kafka topic. + builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER"); + + // Create a statement that generate an SP result. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") ); + + // Show the correct binding set results from the job. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + + final QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("otherPerson", vf.createURI("urn:Bob")); + expected.add( new VisibilityBindingSet(bs, "a") ); + + // Run the test. + KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected); + } + + @Test + public void singlePattern_manyStatements() throws Exception { // Enumerate some topics that will be re-used final String ryaInstance = UUID.randomUUID().toString(); final UUID queryId = UUID.randomUUID(); @@ -88,48 +122,144 @@ public class StatementPatternProcessorIT { // Add a sink that writes the data out to a new Kafka topic. builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER"); - // Start the streams program. - final Properties props = kafka.createBootstrapServerConfig(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, "StatementPatternProcessorIT"); - - final KafkaStreams streams = new KafkaStreams(builder, new StreamsConfig(props)); - streams.cleanUp(); - try { - streams.start(); - - // Wait for the streams application to start. Streams only see data after their consumers are connected. - Thread.sleep(2000); - - // Load some data into the input topic. - final ValueFactory vf = new ValueFactoryImpl(); - final List<VisibilityStatement> statements = new ArrayList<>(); - statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") ); - - try(Producer<String, VisibilityStatement> producer = KafkaTestUtil.makeProducer( - kafka, StringSerializer.class, VisibilityStatementSerializer.class)) { - new KafkaLoadStatements(statementsTopic, producer).fromCollection(statements); - } - - // Wait for the final results to appear in the output topic and verify the expected Binding Set was found. - try(Consumer<String, VisibilityBindingSet> consumer = KafkaTestUtil.fromStartConsumer( - kafka, StringDeserializer.class, VisibilityBindingSetDeserializer.class)) { - // Register the topic. - consumer.subscribe(Arrays.asList(resultsTopic)); - - // Poll for the result. - final List<VisibilityBindingSet> results = KafkaTestUtil.pollForResults(500, 6, 1, consumer); - - // Show the correct binding set results from the job. - final QueryBindingSet bs = new QueryBindingSet(); - bs.addBinding("person", vf.createURI("urn:Alice")); - bs.addBinding("otherPerson", vf.createURI("urn:Bob")); - final VisibilityBindingSet expected = new VisibilityBindingSet(bs, "a"); - - final VisibilityBindingSet result = results.iterator().next(); - assertEquals(expected, result); - } - } finally { - streams.close(); - } + // Create some statements where some generates SP results and others do not. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") ); + statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoJoin")), "b") ); + statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:talksTo"), vf.createURI("urn:Alice")), "a|b") ); + statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "c") ); + + // Show the correct binding set results from the job. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + + QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("otherPerson", vf.createURI("urn:Bob")); + expected.add( new VisibilityBindingSet(bs, "a") ); + + bs = new QueryBindingSet(); + bs.addBinding("person", vf.createURI("urn:Bob")); + bs.addBinding("otherPerson", vf.createURI("urn:Alice")); + expected.add( new VisibilityBindingSet(bs, "a|b") ); + + // Run the test. + KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected); + } + + @Test + public void multiplePatterns_singleStatement() throws Exception { + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Get the StatementPattern object that will be evaluated. + final StatementPattern sp1 = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }"); + final StatementPattern sp2 = RdfTestUtil.getSp("SELECT * WHERE { ?person ?action <urn:Bob> }"); + + // Setup a topology. + final TopologyBuilder builder = new TopologyBuilder(); + + // The topic that Statements are written to is used as a source. + builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic); + + // Add a processor that handles the first statement pattern. + builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp1, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS"); + + // Add a processor that handles the second statement pattern. + builder.addProcessor("SP2", new StatementPatternProcessorSupplier(sp2, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS"); + + // Add a processor that formats the VisibilityBindingSet for output. + builder.addProcessor("SINK_FORMATTER", RyaStreamsSinkFormatter::new, "SP1", "SP2"); + + // Add a sink that writes the data out to a new Kafka topic. + builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER"); + + // Create some statements where some generates SP results and others do not. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") ); + + // Show the correct binding set results from the job. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + + QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("otherPerson", vf.createURI("urn:Bob")); + expected.add( new VisibilityBindingSet(bs, "a") ); + + bs = new QueryBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("action", vf.createURI("urn:talksTo")); + expected.add( new VisibilityBindingSet(bs, "a") ); + + // Run the test. + KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected); + } + + @Test + public void multiplePatterns_multipleStatements() throws Exception { + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Get the StatementPattern object that will be evaluated. + final StatementPattern sp1 = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }"); + final StatementPattern sp2 = RdfTestUtil.getSp("SELECT * WHERE { ?person ?action <urn:Bob> }"); + + // Setup a topology. + final TopologyBuilder builder = new TopologyBuilder(); + + // The topic that Statements are written to is used as a source. + builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic); + + // Add a processor that handles the first statement pattern. + builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp1, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS"); + + // Add a processor that handles the second statement pattern. + builder.addProcessor("SP2", new StatementPatternProcessorSupplier(sp2, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS"); + + // Add a processor that formats the VisibilityBindingSet for output. + builder.addProcessor("SINK_FORMATTER", RyaStreamsSinkFormatter::new, "SP1", "SP2"); + + // Add a sink that writes the data out to a new Kafka topic. + builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER"); + + // Create some statements where some generates SP results and others do not. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") ); + statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Charlie")), "a|b") ); + statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:walksWith"), vf.createURI("urn:Bob")), "b") ); + + // Show the correct binding set results from the job. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + + QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("otherPerson", vf.createURI("urn:Bob")); + expected.add( new VisibilityBindingSet(bs, "a") ); + + bs = new QueryBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("action", vf.createURI("urn:talksTo")); + expected.add( new VisibilityBindingSet(bs, "a") ); + + bs = new QueryBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("otherPerson", vf.createURI("urn:Charlie")); + expected.add( new VisibilityBindingSet(bs, "a|b") ); + + bs = new QueryBindingSet(); + bs.addBinding("person", vf.createURI("urn:Charlie")); + bs.addBinding("action", vf.createURI("urn:walksWith")); + expected.add( new VisibilityBindingSet(bs, "b") ); + + // Run the test. + KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3471cb7e/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java new file mode 100644 index 0000000..14a559f --- /dev/null +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java @@ -0,0 +1,507 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors.join; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.processor.StateStoreSupplier; +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.rya.api.function.join.LeftOuterJoin; +import org.apache.rya.api.function.join.NaturalJoin; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.model.VisibilityStatement; +import org.apache.rya.streams.kafka.KafkaTestUtil; +import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.RdfTestUtil; +import org.apache.rya.streams.kafka.processors.ProcessorResult; +import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult; +import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side; +import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult; +import org.apache.rya.streams.kafka.processors.RyaStreamsSinkFormatterSupplier.RyaStreamsSinkFormatter; +import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier; +import org.apache.rya.streams.kafka.processors.join.JoinProcessorSupplier.JoinProcessor; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerde; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer; +import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer; +import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.junit.Rule; +import org.junit.Test; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.impl.MapBindingSet; + +import com.google.common.collect.Lists; + +/** + * Integration tests the methods of {@link JoinProcessor}. + */ +public class JoinProcessorIT { + + @Rule + public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true); + + @Test(expected = IllegalArgumentException.class) + public void badAllVars() throws IllegalArgumentException { + new JoinProcessorSupplier( + "NATURAL_JOIN", + new NaturalJoin(), + Lists.newArrayList("employee"), + Lists.newArrayList("person", "employee", "business"), + result -> ProcessorResult.make( new UnaryResult(result) )); + } + + @Test + public void newLeftResult() throws Exception { + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Get the StatementPatterns that will be evaluated. + final StatementPattern leftSp = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?employee }"); + final StatementPattern rightSp = RdfTestUtil.getSp("SELECT * WHERE { ?employee <urn:worksAt> ?business }"); + + // Setup a topology. + final TopologyBuilder builder = new TopologyBuilder(); + + // The topic that Statements are written to is used as a source. + builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic); + + // Add a processor that handles the first statement pattern. + builder.addProcessor("LEFT_SP", new StatementPatternProcessorSupplier(leftSp, + result -> ProcessorResult.make( new BinaryResult(Side.LEFT, result) )), "STATEMENTS"); + + // Add a processor that handles the second statement pattern. + builder.addProcessor("RIGHT_SP", new StatementPatternProcessorSupplier(rightSp, + result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, result) )), "STATEMENTS"); + + // Add a processor that handles a natrual join over the SPs. + builder.addProcessor("NATURAL_JOIN", new JoinProcessorSupplier( + "NATURAL_JOIN", + new NaturalJoin(), + Lists.newArrayList("employee"), + Lists.newArrayList("employee", "person", "business"), + result -> ProcessorResult.make( new UnaryResult(result) )), "LEFT_SP", "RIGHT_SP"); + + // Add a state store for the join processor. + final StateStoreSupplier joinStoreSupplier = + Stores.create( "NATURAL_JOIN" ) + .withStringKeys() + .withValues(new VisibilityBindingSetSerde()) + .inMemory() + .build(); + builder.addStateStore(joinStoreSupplier, "NATURAL_JOIN"); + + // Add a processor that formats the VisibilityBindingSet for output. + builder.addProcessor("SINK_FORMATTER", RyaStreamsSinkFormatter::new, "NATURAL_JOIN"); + + // Add a sink that writes the data out to a new Kafka topic. + builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER"); + + // Create some statements that generate a bunch of right SP results. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoPlace")), "a&b") ); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "a") ); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:worksAt"), vf.createURI("urn:CoffeeShop")), "b") ); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "b|c") ); + + // Add a statement that will generate a left result that joins with some of those right results. + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "c") ); + + // Make the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("employee", vf.createURI("urn:Bob")); + bs.addBinding("business", vf.createURI("urn:TacoPlace")); + expected.add( new VisibilityBindingSet(bs, "a&b&c") ); + + bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("employee", vf.createURI("urn:Bob")); + bs.addBinding("business", vf.createURI("urn:BurgerJoint")); + expected.add( new VisibilityBindingSet(bs, "c&(b|c)") ); + + // Run the test. + KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected); + } + + @Test + public void newRightResult() throws Exception { + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Get the StatementPatterns that will be evaluated. + final StatementPattern leftSp = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?employee }"); + final StatementPattern rightSp = RdfTestUtil.getSp("SELECT * WHERE { ?employee <urn:worksAt> ?business }"); + + // Setup a topology. + final TopologyBuilder builder = new TopologyBuilder(); + + // The topic that Statements are written to is used as a source. + builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic); + + // Add a processor that handles the first statement pattern. + builder.addProcessor("LEFT_SP", new StatementPatternProcessorSupplier(leftSp, + result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, result) )), "STATEMENTS"); + + // Add a processor that handles the second statement pattern. + builder.addProcessor("RIGHT_SP", new StatementPatternProcessorSupplier(rightSp, + result -> ProcessorResult.make( new BinaryResult(Side.LEFT, result) )), "STATEMENTS"); + + // Add a processor that handles a natrual join over the SPs. + builder.addProcessor("NATURAL_JOIN", new JoinProcessorSupplier( + "NATURAL_JOIN", + new NaturalJoin(), + Lists.newArrayList("employee"), + Lists.newArrayList("employee", "person", "business"), + result -> ProcessorResult.make( new UnaryResult(result) )), "LEFT_SP", "RIGHT_SP"); + + // Add a state store for the join processor. + final StateStoreSupplier joinStoreSupplier = + Stores.create( "NATURAL_JOIN" ) + .withStringKeys() + .withValues(new VisibilityBindingSetSerde()) + .inMemory() + .build(); + builder.addStateStore(joinStoreSupplier, "NATURAL_JOIN"); + + // Add a processor that formats the VisibilityBindingSet for output. + builder.addProcessor("SINK_FORMATTER", RyaStreamsSinkFormatter::new, "NATURAL_JOIN"); + + // Add a sink that writes the data out to a new Kafka topic. + builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER"); + + // Create some statements that generate a bunch of right SP results. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoPlace")), "a&b") ); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "a") ); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:worksAt"), vf.createURI("urn:CoffeeShop")), "b") ); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "b|c") ); + + // Add a statement that will generate a left result that joins with some of those right results. + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "c") ); + + // Make the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("employee", vf.createURI("urn:Bob")); + bs.addBinding("business", vf.createURI("urn:TacoPlace")); + expected.add( new VisibilityBindingSet(bs, "a&b&c") ); + + bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("employee", vf.createURI("urn:Bob")); + bs.addBinding("business", vf.createURI("urn:BurgerJoint")); + expected.add( new VisibilityBindingSet(bs, "c&(b|c)") ); + + // Run the test. + KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected); + } + + @Test + public void newResultsBothSides() throws Exception { + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Get the StatementPatterns that will be evaluated. + final StatementPattern leftSp = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?employee }"); + final StatementPattern rightSp = RdfTestUtil.getSp("SELECT * WHERE { ?employee <urn:worksAt> ?business }"); + + // Setup a topology. + final TopologyBuilder builder = new TopologyBuilder(); + + // The topic that Statements are written to is used as a source. + builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic); + + // Add a processor that handles the first statement pattern. + builder.addProcessor("LEFT_SP", new StatementPatternProcessorSupplier(leftSp, + result -> ProcessorResult.make( new BinaryResult(Side.LEFT, result) )), "STATEMENTS"); + + // Add a processor that handles the second statement pattern. + builder.addProcessor("RIGHT_SP", new StatementPatternProcessorSupplier(rightSp, + result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, result) )), "STATEMENTS"); + + // Add a processor that handles a natrual join over the SPs. + builder.addProcessor("NATURAL_JOIN", new JoinProcessorSupplier( + "NATURAL_JOIN", + new NaturalJoin(), + Lists.newArrayList("employee"), + Lists.newArrayList("employee", "person", "business"), + result -> ProcessorResult.make( new UnaryResult(result) )), "LEFT_SP", "RIGHT_SP"); + + // Add a state store for the join processor. + final StateStoreSupplier joinStoreSupplier = + Stores.create( "NATURAL_JOIN" ) + .withStringKeys() + .withValues(new VisibilityBindingSetSerde()) + .inMemory() + .build(); + builder.addStateStore(joinStoreSupplier, "NATURAL_JOIN"); + + // Add a processor that formats the VisibilityBindingSet for output. + builder.addProcessor("SINK_FORMATTER", RyaStreamsSinkFormatter::new, "NATURAL_JOIN"); + + // Add a sink that writes the data out to a new Kafka topic. + builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER"); + + // Create some statements that generate a bunch of right SP results. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoPlace")), "a&b") ); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "c") ); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "a") ); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:worksAt"), vf.createURI("urn:CoffeeShop")), "b") ); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "b|c") ); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:talksTo"), vf.createURI("urn:Charlie")), "c") ); + + // Make the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("employee", vf.createURI("urn:Bob")); + bs.addBinding("business", vf.createURI("urn:TacoPlace")); + expected.add( new VisibilityBindingSet(bs, "a&b&c") ); + + bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("employee", vf.createURI("urn:Bob")); + bs.addBinding("business", vf.createURI("urn:BurgerJoint")); + expected.add( new VisibilityBindingSet(bs, "c&(b|c)") ); + + bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Bob")); + bs.addBinding("employee", vf.createURI("urn:Charlie")); + bs.addBinding("business", vf.createURI("urn:BurgerJoint")); + expected.add( new VisibilityBindingSet(bs, "a&c") ); + + // Run the test. + KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected); + } + + @Test + public void manyJoins() throws Exception { + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Get the StatementPatterns that will be evaluated. + final StatementPattern sp1 = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?employee }"); + final StatementPattern sp2 = RdfTestUtil.getSp("SELECT * WHERE { ?employee <urn:worksAt> ?business }"); + final StatementPattern sp3 = RdfTestUtil.getSp("SELECT * WHERE { ?employee <urn:hourlyWage> ?wage }"); + + // Setup a topology. + final TopologyBuilder builder = new TopologyBuilder(); + + // The topic that Statements are written to is used as a source. + builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic); + + // Add a processor that handles the first statement pattern. + builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp1, + result -> ProcessorResult.make( new BinaryResult(Side.LEFT, result) )), "STATEMENTS"); + + // Add a processor that handles the second statement pattern. + builder.addProcessor("SP2", new StatementPatternProcessorSupplier(sp2, + result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, result) )), "STATEMENTS"); + + // Add a processor that handles a natural join over SPs 1 and 2. + builder.addProcessor("JOIN1", new JoinProcessorSupplier( + "JOIN1", + new NaturalJoin(), + Lists.newArrayList("employee"), + Lists.newArrayList("employee", "person", "business"), + result -> ProcessorResult.make( new BinaryResult(Side.LEFT, result) )), "SP1", "SP2"); + + // Add a processor that handles the third statement pattern. + builder.addProcessor("SP3", new StatementPatternProcessorSupplier(sp3, + result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, result) )), "STATEMENTS"); + + // Add a processor that handles a natural join over JOIN1 and SP3. + builder.addProcessor("JOIN2", new JoinProcessorSupplier( + "JOIN2", + new NaturalJoin(), + Lists.newArrayList("employee"), + Lists.newArrayList("employee", "business", "wage"), + result -> ProcessorResult.make( new UnaryResult(result) )), "JOIN1", "SP3"); + + // Setup the join state suppliers. + final StateStoreSupplier join1StoreSupplier = + Stores.create( "JOIN1" ) + .withStringKeys() + .withValues(new VisibilityBindingSetSerde()) + .inMemory() + .build(); + builder.addStateStore(join1StoreSupplier, "JOIN1"); + + final StateStoreSupplier join2StoreSupplier = + Stores.create( "JOIN2" ) + .withStringKeys() + .withValues(new VisibilityBindingSetSerde()) + .inMemory() + .build(); + builder.addStateStore(join2StoreSupplier, "JOIN2"); + + // Add a processor that formats the VisibilityBindingSet for output. + builder.addProcessor("SINK_FORMATTER", RyaStreamsSinkFormatter::new, "JOIN2"); + + // Add a sink that writes the data out to a new Kafka topic. + builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER"); + + // Create some statements that generate a bunch of right SP results. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") ); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "a") ); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hourlyWage"), vf.createLiteral(7.25)), "a") ); + + // Make the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + final MapBindingSet bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("employee", vf.createURI("urn:Bob")); + bs.addBinding("business", vf.createURI("urn:BurgerJoint")); + bs.addBinding("wage", vf.createLiteral(7.25)); + expected.add( new VisibilityBindingSet(bs, "a") ); + + // Run the test. + KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 3000, statements, expected); + } + + @Test + public void leftJoin() throws Exception { + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Get the StatementPatterns that will be evaluated. + final StatementPattern requiredSp = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?employee }"); + final StatementPattern optionalSp = RdfTestUtil.getSp("SELECT * WHERE { ?employee <urn:worksAt> ?business }"); + + // Setup a topology. + final TopologyBuilder builder = new TopologyBuilder(); + + // The topic that Statements are written to is used as a source. + builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic); + + // Add a processor that handles the first statement pattern. + builder.addProcessor("REQUIRED_SP", new StatementPatternProcessorSupplier(requiredSp, + result -> ProcessorResult.make( new BinaryResult(Side.LEFT, result) )), "STATEMENTS"); + + // Add a processor that handles the second statement pattern. + builder.addProcessor("OPTIONAL_SP", new StatementPatternProcessorSupplier(optionalSp, + result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, result) )), "STATEMENTS"); + + // Add a processor that handles a natrual join over the SPs. + builder.addProcessor("LEFT_JOIN", new JoinProcessorSupplier( + "LEFT_JOIN", + new LeftOuterJoin(), + Lists.newArrayList("employee"), + Lists.newArrayList("employee", "person", "business"), + result -> ProcessorResult.make( new UnaryResult(result) )), "REQUIRED_SP", "OPTIONAL_SP"); + + // Add a state store for the join processor. + final StateStoreSupplier joinStoreSupplier = + Stores.create( "LEFT_JOIN" ) + .withStringKeys() + .withValues(new VisibilityBindingSetSerde()) + .inMemory() + .build(); + builder.addStateStore(joinStoreSupplier, "LEFT_JOIN"); + + // Add a processor that formats the VisibilityBindingSet for output. + builder.addProcessor("SINK_FORMATTER", RyaStreamsSinkFormatter::new, "LEFT_JOIN"); + + // Add a sink that writes the data out to a new Kafka topic. + builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER"); + + // Create some statements that generate a result that includes the optional value as well as one that does not. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") ); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoPlace")), "b") ); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:talksTo"), vf.createURI("urn:Charlie")), "c") ); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:David"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "d") ); + + // Make the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("employee", vf.createURI("urn:Bob")); + expected.add( new VisibilityBindingSet(bs, "a") ); + + bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("employee", vf.createURI("urn:Bob")); + bs.addBinding("business", vf.createURI("urn:TacoPlace")); + expected.add( new VisibilityBindingSet(bs, "a&b") ); + + bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Bob")); + bs.addBinding("employee", vf.createURI("urn:Charlie")); + expected.add( new VisibilityBindingSet(bs, "c") ); + + // Run the test. + KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3471cb7e/extras/rya.streams/kafka/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/resources/log4j.properties b/extras/rya.streams/kafka/src/test/resources/log4j.properties new file mode 100644 index 0000000..05e7be5 --- /dev/null +++ b/extras/rya.streams/kafka/src/test/resources/log4j.properties @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Root logger option +log4j.rootLogger=ERROR, stdout + +# Direct log messages to stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n + +log4j.logger.org.apache.rya.streams.kafka.processors=debug
