http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5adda982/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java index aef7c58..6a3d749 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaRunQuery.java @@ -61,7 +61,7 @@ public class KafkaRunQuery implements RunQuery { * @param kafkaHostname - The hostname of the Kafka Broker to connect to. (not null) * @param kafkaPort - The port of the Kafka Broker to connect to. (not null) * @param statementsTopic - The name of the topic that statements will be read from. (not null) - * @param resultsTopic - The name of the topic that query results will be writen to. (not null) + * @param resultsTopic - The name of the topic that query results will be written to. (not null) * @param queryRepo - The query repository that holds queries that are registered. (not null) * @param topologyFactory - Builds Kafka Stream processing topologies from SPARQL. (not null) */
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5adda982/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java index 124bc76..402061c 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java @@ -58,7 +58,7 @@ public class ProcessorResult { * @param unary - The unary result if that is this object's type. (not null) * @param binary - The binary result if that is this object's type. (not null) */ - private ProcessorResult( + private ProcessorResult( final ResultType type, final Optional<UnaryResult> unary, final Optional<BinaryResult> binary) { @@ -235,7 +235,7 @@ public class ProcessorResult { } /** - * A label that is used to by the downstream binary prcoessor to distinguish which upstream processor + * A label that is used by the downstream binary processor to distinguish which upstream processor * produced the {@link BinaryResult}. */ public static enum Side { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5adda982/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java index 367ca6f..4ea6959 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java @@ -22,6 +22,7 @@ import static java.util.Objects.requireNonNull; import java.util.Iterator; import java.util.List; +import java.util.UUID; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; @@ -29,6 +30,7 @@ import org.apache.kafka.streams.state.KeyValueStore; import org.apache.rya.api.function.join.IterativeJoin; import org.apache.rya.api.model.VisibilityBindingSet; import org.apache.rya.api.utils.CloseableIterator; +import org.apache.rya.api.utils.UuidUtils; import org.apache.rya.streams.kafka.processors.ProcessorResult; import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult; import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side; @@ -138,13 +140,16 @@ public class JoinProcessorSupplier extends RyaStreamsProcessorSupplier { @Override public void init(final ProcessorContext context) { - // Hold onto the context so that we can foward results. + // Hold onto the context so that we can forward results. this.context = context; + final String appId = context.applicationId(); + final UUID queryId = UuidUtils.extractUuidFromStringEnd(appId); + // Get a reference to the state store that keeps track of what can be joined with. final KeyValueStore<String, VisibilityBindingSet> stateStore = (KeyValueStore<String, VisibilityBindingSet>) context.getStateStore( stateStoreName ); - joinStateStore = new KeyValueJoinStateStore( stateStore, joinVars, allVars ); + joinStateStore = new KeyValueJoinStateStore( stateStore, queryId.toString(), joinVars, allVars ); } @Override http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5adda982/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java index 254f226..8df77c4 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java @@ -77,7 +77,12 @@ public class KeyValueJoinStateStore implements JoinStateStore { /** * This is the maximum value of a UTF-8 character. */ - private static final String END_RANGE_SUFFIX = new String(new byte[] { (byte) 0XFF }, Charsets.UTF_8); + private static final String END_RANGE_SUFFIX = new String(new byte[] { (byte) 0xFF }, Charsets.UTF_8); + + /** + * Indicates where the end of the join variables occurs. + */ + private static final String JOIN_VAR_END_MARKER = new String("~!^~".getBytes(Charsets.UTF_8), Charsets.UTF_8); /** * A default empty value that is stored for a start of range or end of range marker. @@ -85,6 +90,7 @@ public class KeyValueJoinStateStore implements JoinStateStore { private static final VisibilityBindingSet RANGE_MARKER_VALUE = new VisibilityBindingSet(new MapBindingSet(), ""); private final KeyValueStore<String, VisibilityBindingSet> store; + private final String id; private final List<String> joinVars; private final List<String> allVars; @@ -92,15 +98,18 @@ public class KeyValueJoinStateStore implements JoinStateStore { * Constructs an instance of {@link KeyValueJoinStateStore}. * * @param store - The state store that will be used. (not null) + * @param id - The ID used for the state store. (not null) * @param joinVars - The variables that are used to build grouping keys. (not null) * @param allVars - The variables that are used to build full value keys. (not null) * @throws IllegalArgumentException Thrown if {@code allVars} does not start with {@code joinVars}. */ public KeyValueJoinStateStore( final KeyValueStore<String, VisibilityBindingSet> store, + final String id, final List<String> joinVars, final List<String> allVars) throws IllegalArgumentException { this.store = requireNonNull(store); + this.id = requireNonNull(id); this.joinVars = requireNonNull(joinVars); this.allVars = requireNonNull(allVars); @@ -120,16 +129,18 @@ public class KeyValueJoinStateStore implements JoinStateStore { // This is a prefix for every row that holds values for a specific set of join variable values. final Side side = result.getSide(); final VisibilityBindingSet bs = result.getResult(); - final String joinKeyPrefix = makeCommaDelimitedValues(side, joinVars, bs); + + final String joinKeyPrefix = makeCommaDelimitedValues(side, joinVars, bs, joinVars.size()); final List<KeyValue<String, VisibilityBindingSet>> values = new ArrayList<>(); - // For each join variable set, we need a start key for scanning, + // For each join variable set, we need a start key for scanning. final String startKey = joinKeyPrefix + START_RANGE_SUFFIX; values.add( new KeyValue<>(startKey, RANGE_MARKER_VALUE) ); // The actual value that was emitted as a result. - final String valueKey = makeCommaDelimitedValues(side, allVars, bs); + final String valueKey = makeCommaDelimitedValues(side, allVars, bs, joinVars.size()); + values.add( new KeyValue<>(valueKey, bs) ); // And the end key for scanning. @@ -148,10 +159,11 @@ public class KeyValueJoinStateStore implements JoinStateStore { // Get an iterator over the values that start with the join variables for the other side. final Side otherSide = result.getSide() == Side.LEFT ? Side.RIGHT : Side.LEFT; final VisibilityBindingSet bs = result.getResult(); - final String joinKeyPrefix = makeCommaDelimitedValues(otherSide, joinVars, bs); + final String joinKeyPrefix = makeCommaDelimitedValues(otherSide, joinVars, bs, joinVars.size()); final String startKey = joinKeyPrefix + START_RANGE_SUFFIX; final String endKey = joinKeyPrefix + END_RANGE_SUFFIX; + final KeyValueIterator<String, VisibilityBindingSet> rangeIt = store.range(startKey, endKey); // Return a CloseableIterator over the range's value fields, skipping the start and end entry. @@ -241,21 +253,57 @@ public class KeyValueJoinStateStore implements JoinStateStore { * @param side - The side value for the key. (not null) * @param vars - Which variables within the binding set to use for the key's values. (not null) * @param bindingSet - The binding set the key is being constructed from. (not null) + * @param joinVarSize - the number of join variables at the beginning of + * {@code vars}. * @return A comma delimited list of the binding values, leading with the side. */ - private static String makeCommaDelimitedValues(final Side side, final List<String> vars, final VisibilityBindingSet bindingSet) { + private String makeCommaDelimitedValues(final Side side, final List<String> vars, final VisibilityBindingSet bindingSet, final int joinVarSize) { requireNonNull(side); requireNonNull(vars); requireNonNull(bindingSet); // Make a an ordered list of the binding set variables. final List<String> values = new ArrayList<>(); + values.add(id); values.add(side.toString()); + int count = 0; for(final String var : vars) { - values.add( bindingSet.hasBinding(var) ? bindingSet.getBinding(var).getValue().toString() : "" ); + count++; + String value = bindingSet.hasBinding(var) ? bindingSet.getBinding(var).getValue().toString() : ""; + if (count == joinVarSize) { + // Place the marker at the end of the last joinVar String (and + // before the remaining "allVars") + // A marker is needed to indicate where the join vars end so + // that a range search from "urn:Student9[0x00]" to "urn:Student9[0xFF]" + // does not return "urn:Student95,[remainingBindingValues]". + value += JOIN_VAR_END_MARKER; + } + values.add(value); } // Return a comma delimited list of those values. return Joiner.on(",").join(values); } + + private void printStateStoreRange(final String startKey, final String endKey) { + final KeyValueIterator<String, VisibilityBindingSet> rangeIt = store.range(startKey, endKey); + printStateStoreKeyValueIterator(rangeIt); + } + + private void printStateStoreAll() { + final KeyValueIterator<String, VisibilityBindingSet> rangeIt = store.all(); + printStateStoreKeyValueIterator(rangeIt); + } + + private static void printStateStoreKeyValueIterator(final KeyValueIterator<String, VisibilityBindingSet> rangeIt) { + log.info("----------------"); + while (rangeIt.hasNext()) { + final KeyValue<String, VisibilityBindingSet> keyValue = rangeIt.next(); + log.info(keyValue.key + " :::: " + keyValue.value); + } + log.info("----------------\n\n"); + if (rangeIt != null) { + rangeIt.close(); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5adda982/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyBuilderFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyBuilderFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyBuilderFactory.java index 666cbb0..c533854 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyBuilderFactory.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyBuilderFactory.java @@ -57,8 +57,21 @@ public interface TopologyBuilderFactory { public static class TopologyBuilderException extends Exception { private static final long serialVersionUID = 1L; + /** + * Creates a new instance of {@link TopologyBuilderException}. + * @param message the detailed message. + * @param cause the {@link Throwable} cause. + */ public TopologyBuilderException(final String message, final Throwable cause) { super(message, cause); } + + /** + * Creates a new instance of {@link TopologyBuilderException}. + * @param message the detailed message. + */ + public TopologyBuilderException(final String message) { + super(message); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5adda982/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java index 68fbb83..f330fa3 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java @@ -157,6 +157,10 @@ public class TopologyFactory implements TopologyBuilderFactory { } } + if (entry == null) { + throw new TopologyBuilderException("No valid processor entries found."); + } + // Add a formatter that converts the ProcessorResults into the output format. final SinkEntry<?,?> sinkEntry = visitor.getSinkEntry(); builder.addProcessor("OUTPUT_FORMATTER", sinkEntry.getFormatterSupplier(), entry.getID()); @@ -479,7 +483,7 @@ public class TopologyFactory implements TopologyBuilderFactory { * @return The {@link Side} the current node is on. */ private Optional<Side> getSide(final QueryModelNode node) { - // if query parent is a binary operator, need to determine if its left or right. + // if query parent is a binary operator, need to determine if it's left or right. if (node.getParentNode() instanceof BinaryTupleOperator) { final BinaryTupleOperator binary = (BinaryTupleOperator) node.getParentNode(); if (node.equals(binary.getLeftArg())) { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5adda982/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 3ae3aed..9cc67e5 100644 --- a/pom.xml +++ b/pom.xml @@ -299,6 +299,12 @@ under the License. </dependency> <dependency> <groupId>org.apache.rya</groupId> + <artifactId>rya.api</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> <artifactId>rya.api.model</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5adda982/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestUtil.java ---------------------------------------------------------------------- diff --git a/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestUtil.java b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestUtil.java index fa1e0a0..f509770 100644 --- a/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestUtil.java +++ b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestUtil.java @@ -49,7 +49,7 @@ public final class KafkaTestUtil { /** * Create a {@link Producer} that is able to write to a topic that is hosted within an embedded instance of Kafka. * - * @param kafka - The Kafka rule used to connect to the embedded Kafkfa instance. (not null) + * @param kafka - The Kafka rule used to connect to the embedded Kafka instance. (not null) * @param keySerializerClass - Serializes the keys. (not null) * @param valueSerializerClass - Serializes the values. (not null) * @return A {@link Producer} that can be used to write records to a topic. @@ -72,7 +72,7 @@ public final class KafkaTestUtil { * Create a {@link Consumer} that has a unique group ID and reads everything from a topic that is hosted within an * embedded instance of Kafka starting at the earliest point by default. * - * @param kafka - The Kafka rule used to connect to the embedded Kafkfa instance. (not null) + * @param kafka - The Kafka rule used to connect to the embedded Kafka instance. (not null) * @param keyDeserializerClass - Deserializes the keys. (not null) * @param valueDeserializerClass - Deserializes the values. (not null) * @return A {@link Consumer} that can be used to read records from a topic. @@ -95,14 +95,14 @@ public final class KafkaTestUtil { } /** - * Polls a {@link Consumer> until it has either polled too many times without hitting the target number + * Polls a {@link Consumer} until it has either polled too many times without hitting the target number * of results, or it hits the target number of results. * * @param pollMs - How long each poll could take. * @param pollIterations - The maximum number of polls that will be attempted. * @param targetSize - The number of results to read before stopping. * @param consumer - The consumer that will be polled. - * @return The results that were read frmo the consumer. + * @return The results that were read from the consumer. * @throws Exception If the poll failed. */ public static <K, V> List<V> pollForResults(
