RYA-377 Implemented a common RyaStreams Processor and ProcessorSupplier.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/516e8908 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/516e8908 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/516e8908 Branch: refs/heads/master Commit: 516e8908b5a359ba153ad48beaae5cb20fccfbc5 Parents: 07fcb5f Author: kchilton2 <[email protected]> Authored: Mon Nov 6 15:17:44 2017 -0500 Committer: caleb <[email protected]> Committed: Tue Jan 9 15:13:00 2018 -0500 ---------------------------------------------------------------------- .../kafka/processors/ProcessorResult.java | 205 +++++++++++++++++++ .../processors/ProcessorResultFactory.java | 43 ++++ .../kafka/processors/RyaStreamsProcessor.java | 56 +++++ .../processors/RyaStreamsProcessorSupplier.java | 51 +++++ 4 files changed, 355 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/516e8908/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java new file mode 100644 index 0000000..ac3d849 --- /dev/null +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors; + +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + +import org.apache.kafka.streams.processor.Processor; +import org.apache.rya.api.model.VisibilityBindingSet; + +import com.google.common.base.Optional; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Represents a value that is emitted from a Rya Streams {@link Processor}. We can't just emit a + * {@link VisibilityBindingSet} because some downstream processors require more information about + * which upstream processor is emitting the result in order to do their work. + * </p> + * Currently there are only two types processors: + * <ul> + * <li>Unary Processor - A processor that only has a single upstream node feeding it input.</li> + * <li>Binary Processor - A processor that has two upstream nodes feeding it input.</li> + * </ul> + * If a processor is emitting to a unary processor, then use {@link #make(UnaryResult)} to create its + * result. If it is emitting to a binary processor, then use {@link #make(BinaryResult)}. + */ +@DefaultAnnotation(NonNull.class) +public class ProcessorResult { + + private final ResultType type; + private final Optional<UnaryResult> unary; + private final Optional<BinaryResult> binary; + + /** + * Constructs an instance of {@link ProcessorResult}. Private to force users to use the static factory methods. + * + * @param type - Indicates the type of result this object holds. (not null) + * @param unary - The unary result if that is this object's type. (not null) + * @param binary - The binary result if that is this object's type. (not null) + */ + private ProcessorResult( + final ResultType type, + final Optional<UnaryResult> unary, + final Optional<BinaryResult> binary) { + this.type = requireNonNull(type); + this.unary= requireNonNull(unary); + this.binary= requireNonNull(binary); + } + + /** + * @return Indicates the type of result this object holds. + */ + public ResultType getType() { + return type; + } + + /** + * @return The unary result if that is this object's type. + * @throws IllegalStateException If this object's type is not {@link ResultType#UNARY}. + */ + public UnaryResult getUnary() throws IllegalStateException { + checkState(type == ResultType.UNARY, "The ResultType must be " + ResultType.UNARY + " to invoke this method, " + + "but it is " + type + "."); + return unary.get(); + } + + /** + * @return The binary result if that is this object's type. + * @throws IllegalStateException If this object's type is not {@link ResultType#BINARY}. + */ + public BinaryResult getBinary() throws IllegalStateException { + checkState(type == ResultType.BINARY, "The ResultType must be " + ResultType.BINARY + " to invoke this method, " + + "but it is " + type + "."); + return binary.get(); + } + + /** + * Creates a {@link ProcessorResult} using the supplied value. + * + * @param result - The result that will be held by the created object. (not null) + * @return An object holding the provided result. + */ + public static ProcessorResult make(final UnaryResult result) { + requireNonNull(result); + return new ProcessorResult(ResultType.UNARY, Optional.of(result), Optional.absent()); + } + + /** + * Creates a {@link ProcessorResult} using the supplied value. + * + * @param result - The result that will be held by the created object. (not null) + * @return An object holding the provided result. + */ + public static ProcessorResult make(final BinaryResult result) { + requireNonNull(result); + return new ProcessorResult(ResultType.BINARY, Optional.absent(), Optional.of(result)); + } + + /** + * Indicates the type of result held by a {@link ProcessorResult}. + */ + public static enum ResultType { + /** + * The {@link ProcessorResult} holds a {@link UnaryResult}. + */ + UNARY, + + /** + * The {@link ProcessorResult} holds a {@link BinaryResult}. + */ + BINARY; + } + + /** + * The result of a Rya Streams {@link Processor} whose downstream processor is unary. + */ + @DefaultAnnotation(NonNull.class) + public static final class UnaryResult { + private final VisibilityBindingSet result; + + /** + * Constructs an instance of {@link UnaryResult}. + * + * @param result - The binding set that is being emitted to the downstream unary processor. (not null) + */ + public UnaryResult(final VisibilityBindingSet result) { + this.result = requireNonNull(result); + } + + /** + * @return The binding set that is being emitted to the downstream unary processor. + */ + public VisibilityBindingSet getResult() { + return result; + } + } + + /** + * The result of a Rya Streams {@link Processor} whose downstream processor is binary. + */ + @DefaultAnnotation(NonNull.class) + public static final class BinaryResult { + private final Side side; + private final VisibilityBindingSet result; + + /** + * Constructs an instance of {@link BinaryResult}. + * + * @param side - Which side of the downstream binary processor the result is being emitted to. (not null) + * @param result - The binding set that is being emitted to the downstream binary processor. (not null) + */ + public BinaryResult(final Side side, final VisibilityBindingSet result) { + this.side = requireNonNull(side); + this.result = requireNonNull(result); + } + + /** + * @return Which side of the downstream binary processor the result is being emitted to. + */ + public Side getSide() { + return side; + } + + /** + * @return The binding set that is being emitted to the downstream binary processor. + */ + public VisibilityBindingSet getResult() { + return result; + } + + /** + * A label that is used to by the downstream binary prcoessor to distinguish which upstream processor + * produced the {@link BinaryResult}. + */ + public static enum Side { + /** + * The result is being emitted from the "left" upstream processor. + */ + LEFT, + + /** + * The result is being emitted from the "right" upstream processor. + */ + RIGHT; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/516e8908/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResultFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResultFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResultFactory.java new file mode 100644 index 0000000..4bc42a9 --- /dev/null +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResultFactory.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors; + +import org.apache.rya.api.model.VisibilityBindingSet; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Augments a {@link VisibilityBidingSet] that needs to be output by a {@link RyaStreamsProcessor} + * so that it has all of the information the downstream processor needs as well as whatever + * key the downstream processor requires. + */ +@DefaultAnnotation(NonNull.class) +public interface ProcessorResultFactory { + + /** + * Augments a {@link VisibilityBidingSet] that needs to be output by a {@link RyaStreamsProcessor} + * so that it has all of the information the downstream processor needs as well as whatever + * key the downstream processor requires. + * + * @param result - The result that is being emitted. (not null) + * @return A {@link ProcessorResult} that is formatted for the downstream processor. + */ + public ProcessorResult make(VisibilityBindingSet result); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/516e8908/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/RyaStreamsProcessor.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/RyaStreamsProcessor.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/RyaStreamsProcessor.java new file mode 100644 index 0000000..f6262c8 --- /dev/null +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/RyaStreamsProcessor.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors; + +import static java.util.Objects.requireNonNull; + +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * All Rya Streams {@link Processor} implementations emit {@link ProcessorResult} objects. This + * abstract class holds onto the {@link ProcessorResultFactory} that is used to augment results + * before sending them to the downstream processor via {@link ProcessorContext#forward(Object, Object)}. + */ +@DefaultAnnotation(NonNull.class) +public abstract class RyaStreamsProcessor implements Processor<Object, ProcessorResult> { + + private final ProcessorResultFactory resultFactory; + + /** + * Constructs an instance of {@link RyaStreamsProcessor}. + * + * @param resultFactory - The {@link ProcessorResultFactory} the child class will used to format results + * before sending them to {@link ProcessorContext#forward(Object, Object)}. (not null) + */ + public RyaStreamsProcessor(final ProcessorResultFactory resultFactory) { + this.resultFactory = requireNonNull(resultFactory); + } + + /** + * @return The {@link ProcessorResultFactory} the child class will used to format results + * before sending them to {@link ProcessorContext#forward(Object, Object)}. + */ + public ProcessorResultFactory getResultFactory() { + return resultFactory; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/516e8908/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/RyaStreamsProcessorSupplier.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/RyaStreamsProcessorSupplier.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/RyaStreamsProcessorSupplier.java new file mode 100644 index 0000000..fef0bba --- /dev/null +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/RyaStreamsProcessorSupplier.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors; + +import static java.util.Objects.requireNonNull; + +import org.apache.kafka.streams.processor.ProcessorSupplier; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A {@link ProcessorSupplier} that should be implemented for each {@link RyaStreamsProcessor} that is implemented. + */ +@DefaultAnnotation(NonNull.class) +public abstract class RyaStreamsProcessorSupplier implements ProcessorSupplier<Object, ProcessorResult> { + + private final ProcessorResultFactory resultFactory; + + /** + * Constructs an instance of {@link RyaStreamsProcessorSupplier}. + * + * @param resultFactory - The {@link ProcessorResultFactory} that will be used by processors created by this class. (not null) + */ + public RyaStreamsProcessorSupplier(final ProcessorResultFactory resultFactory) { + this.resultFactory = requireNonNull(resultFactory); + } + + /** + * @return The {@link ProcessorResultFactory} that will be used by processors created by this class. (not null) + */ + public ProcessorResultFactory getResultFactory() { + return resultFactory; + } +} \ No newline at end of file
