RYA-377 Implemented a common RyaStreams Processor and ProcessorSupplier.

Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/516e8908
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/516e8908
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/516e8908

Branch: refs/heads/master
Commit: 516e8908b5a359ba153ad48beaae5cb20fccfbc5
Parents: 07fcb5f
Author: kchilton2 <[email protected]>
Authored: Mon Nov 6 15:17:44 2017 -0500
Committer: caleb <[email protected]>
Committed: Tue Jan 9 15:13:00 2018 -0500

----------------------------------------------------------------------
 .../kafka/processors/ProcessorResult.java       | 205 +++++++++++++++++++
 .../processors/ProcessorResultFactory.java      |  43 ++++
 .../kafka/processors/RyaStreamsProcessor.java   |  56 +++++
 .../processors/RyaStreamsProcessorSupplier.java |  51 +++++
 4 files changed, 355 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/516e8908/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java
new file mode 100644
index 0000000..ac3d849
--- /dev/null
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors;
+
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.rya.api.model.VisibilityBindingSet;
+
+import com.google.common.base.Optional;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Represents a value that is emitted from a Rya Streams {@link Processor}. We 
can't just emit a
+ * {@link VisibilityBindingSet} because some downstream processors require 
more information about
+ * which upstream processor is emitting the result in order to do their work.
+ * </p>
+ * Currently there are only two types processors:
+ * <ul>
+ *   <li>Unary Processor - A processor that only has a single upstream node 
feeding it input.</li>
+ *   <li>Binary Processor - A processor that has two upstream nodes feeding it 
input.</li>
+ * </ul>
+ * If a processor is emitting to a unary processor, then use {@link 
#make(UnaryResult)} to create its
+ * result. If it is emitting to a binary processor, then use {@link 
#make(BinaryResult)}.
+ */
+@DefaultAnnotation(NonNull.class)
+public class ProcessorResult {
+
+    private final ResultType type;
+    private final Optional<UnaryResult> unary;
+    private final Optional<BinaryResult> binary;
+
+    /**
+     * Constructs an instance of {@link ProcessorResult}. Private to force 
users to use the static factory methods.
+     *
+     * @param type - Indicates the type of result this object holds. (not null)
+     * @param unary - The unary result if that is this object's type. (not 
null)
+     * @param binary - The binary result if that is this object's type. (not 
null)
+     */
+    private  ProcessorResult(
+            final ResultType type,
+            final Optional<UnaryResult> unary,
+            final Optional<BinaryResult> binary) {
+        this.type = requireNonNull(type);
+        this.unary= requireNonNull(unary);
+        this.binary= requireNonNull(binary);
+    }
+
+    /**
+     * @return Indicates the type of result this object holds.
+     */
+    public ResultType getType() {
+        return type;
+    }
+
+    /**
+     * @return The unary result if that is this object's type.
+     * @throws IllegalStateException If this object's type is not {@link 
ResultType#UNARY}.
+     */
+    public UnaryResult getUnary() throws IllegalStateException {
+        checkState(type == ResultType.UNARY, "The ResultType must be " + 
ResultType.UNARY + " to invoke this method, " +
+                "but it is " + type + ".");
+        return unary.get();
+    }
+
+    /**
+     * @return The binary result if that is this object's type.
+     * @throws IllegalStateException If this object's type is not {@link 
ResultType#BINARY}.
+     */
+    public BinaryResult getBinary() throws IllegalStateException {
+        checkState(type == ResultType.BINARY, "The ResultType must be " + 
ResultType.BINARY + " to invoke this method, " +
+                "but it is " + type + ".");
+        return binary.get();
+    }
+
+    /**
+     * Creates a {@link ProcessorResult} using the supplied value.
+     *
+     * @param result - The result that will be held by the created object. 
(not null)
+     * @return An object holding the provided result.
+     */
+    public static ProcessorResult make(final UnaryResult result) {
+        requireNonNull(result);
+        return new ProcessorResult(ResultType.UNARY, Optional.of(result), 
Optional.absent());
+    }
+
+    /**
+     * Creates a {@link ProcessorResult} using the supplied value.
+     *
+     * @param result - The result that will be held by the created object. 
(not null)
+     * @return An object holding the provided result.
+     */
+    public static ProcessorResult make(final BinaryResult result) {
+        requireNonNull(result);
+        return new ProcessorResult(ResultType.BINARY, Optional.absent(), 
Optional.of(result));
+    }
+
+    /**
+     * Indicates the type of result held by a {@link ProcessorResult}.
+     */
+    public static enum ResultType {
+        /**
+         * The {@link ProcessorResult} holds a {@link UnaryResult}.
+         */
+        UNARY,
+
+        /**
+         * The {@link ProcessorResult} holds a {@link BinaryResult}.
+         */
+        BINARY;
+    }
+
+    /**
+     * The result of a Rya Streams {@link Processor} whose downstream 
processor is unary.
+     */
+    @DefaultAnnotation(NonNull.class)
+    public static final class UnaryResult {
+        private final VisibilityBindingSet result;
+
+        /**
+         * Constructs an instance of {@link UnaryResult}.
+         *
+         * @param result - The binding set that is being emitted to the 
downstream unary processor. (not null)
+         */
+        public UnaryResult(final VisibilityBindingSet result) {
+            this.result = requireNonNull(result);
+        }
+
+        /**
+         * @return The binding set that is being emitted to the downstream 
unary processor.
+         */
+        public VisibilityBindingSet getResult() {
+            return result;
+        }
+    }
+
+    /**
+     * The result of a Rya Streams {@link Processor} whose downstream 
processor is binary.
+     */
+    @DefaultAnnotation(NonNull.class)
+    public static final class BinaryResult {
+        private final Side side;
+        private final VisibilityBindingSet result;
+
+        /**
+         * Constructs an instance of {@link BinaryResult}.
+         *
+         * @param side - Which side of the downstream binary processor the 
result is being emitted to. (not null)
+         * @param result - The binding set that is being emitted to the 
downstream binary processor. (not null)
+         */
+        public BinaryResult(final Side side, final VisibilityBindingSet 
result) {
+            this.side = requireNonNull(side);
+            this.result = requireNonNull(result);
+        }
+
+        /**
+         * @return Which side of the downstream binary processor the result is 
being emitted to.
+         */
+        public Side getSide() {
+            return side;
+        }
+
+        /**
+         * @return The binding set that is being emitted to the downstream 
binary processor.
+         */
+        public VisibilityBindingSet getResult() {
+            return result;
+        }
+
+        /**
+         * A label that is used to by the downstream binary prcoessor to 
distinguish which upstream processor
+         * produced the {@link BinaryResult}.
+         */
+        public static enum Side {
+            /**
+             * The result is being emitted from the "left" upstream processor.
+             */
+            LEFT,
+
+            /**
+             * The result is being emitted from the "right" upstream processor.
+             */
+            RIGHT;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/516e8908/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResultFactory.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResultFactory.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResultFactory.java
new file mode 100644
index 0000000..4bc42a9
--- /dev/null
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResultFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors;
+
+import org.apache.rya.api.model.VisibilityBindingSet;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Augments a {@link VisibilityBidingSet] that needs to be output by a {@link 
RyaStreamsProcessor}
+ * so that it has all of the information the downstream processor needs as 
well as whatever
+ * key the downstream processor requires.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface ProcessorResultFactory {
+
+    /**
+     * Augments a {@link VisibilityBidingSet] that needs to be output by a 
{@link RyaStreamsProcessor}
+     * so that it has all of the information the downstream processor needs as 
well as whatever
+     * key the downstream processor requires.
+     *
+     * @param result - The result that is being emitted. (not null)
+     * @return A {@link ProcessorResult} that is formatted for the downstream 
processor.
+     */
+    public ProcessorResult make(VisibilityBindingSet result);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/516e8908/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/RyaStreamsProcessor.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/RyaStreamsProcessor.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/RyaStreamsProcessor.java
new file mode 100644
index 0000000..f6262c8
--- /dev/null
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/RyaStreamsProcessor.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * All Rya Streams {@link Processor} implementations emit {@link 
ProcessorResult} objects. This
+ * abstract class holds onto the {@link ProcessorResultFactory} that is used 
to augment results
+ * before sending them to the downstream processor via {@link 
ProcessorContext#forward(Object, Object)}.
+ */
+@DefaultAnnotation(NonNull.class)
+public abstract class RyaStreamsProcessor implements Processor<Object, 
ProcessorResult> {
+
+    private final ProcessorResultFactory resultFactory;
+
+    /**
+     * Constructs an instance of {@link RyaStreamsProcessor}.
+     *
+     * @param resultFactory - The {@link ProcessorResultFactory} the child 
class will used to format results
+     *   before sending them to {@link ProcessorContext#forward(Object, 
Object)}. (not null)
+     */
+    public RyaStreamsProcessor(final ProcessorResultFactory resultFactory) {
+        this.resultFactory = requireNonNull(resultFactory);
+    }
+
+    /**
+     * @return The {@link ProcessorResultFactory} the child class will used to 
format results
+     *   before sending them to {@link ProcessorContext#forward(Object, 
Object)}.
+     */
+    public ProcessorResultFactory getResultFactory() {
+        return resultFactory;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/516e8908/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/RyaStreamsProcessorSupplier.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/RyaStreamsProcessorSupplier.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/RyaStreamsProcessorSupplier.java
new file mode 100644
index 0000000..fef0bba
--- /dev/null
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/RyaStreamsProcessorSupplier.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link ProcessorSupplier} that should be implemented for each {@link 
RyaStreamsProcessor} that is implemented.
+ */
+@DefaultAnnotation(NonNull.class)
+public abstract class RyaStreamsProcessorSupplier implements 
ProcessorSupplier<Object, ProcessorResult> {
+
+    private final ProcessorResultFactory resultFactory;
+
+    /**
+     * Constructs an instance of {@link RyaStreamsProcessorSupplier}.
+     *
+     * @param resultFactory - The {@link ProcessorResultFactory} that will be 
used by processors created by this class. (not null)
+     */
+    public RyaStreamsProcessorSupplier(final ProcessorResultFactory 
resultFactory) {
+        this.resultFactory = requireNonNull(resultFactory);
+    }
+
+    /**
+     * @return The {@link ProcessorResultFactory} that will be used by 
processors created by this class. (not null)
+     */
+    public ProcessorResultFactory getResultFactory() {
+        return resultFactory;
+    }
+}
\ No newline at end of file

Reply via email to