nwangtw closed pull request #3112: [Java Streamlet API] Support abstractions on 
Streamlet Operators
URL: https://github.com/apache/incubator-heron/pull/3112
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/heron/api/src/java/org/apache/heron/streamlet/impl/ContextImpl.java 
b/heron/api/src/java/org/apache/heron/streamlet/impl/ContextImpl.java
index 866a2ba3e2..f1495bd85b 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/ContextImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/ContextImpl.java
@@ -17,7 +17,6 @@
  * under the License.
  */
 
-
 package org.apache.heron.streamlet.impl;
 
 import java.io.Serializable;
diff --git 
a/heron/api/src/java/org/apache/heron/streamlet/impl/WindowConfigImpl.java 
b/heron/api/src/java/org/apache/heron/streamlet/impl/WindowConfigImpl.java
index 6e144e1a9b..09c29f5282 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/WindowConfigImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/WindowConfigImpl.java
@@ -17,10 +17,8 @@
  * under the License.
  */
 
-
 package org.apache.heron.streamlet.impl;
 
-
 import java.time.Duration;
 
 import org.apache.heron.api.bolt.BaseWindowedBolt;
@@ -42,7 +40,7 @@
   private TriggerPolicy<Tuple, ?> triggerPolicy;
   private EvictionPolicy<Tuple, ?> evictionPolicy;
 
-  public  WindowConfigImpl(Duration windowDuration, Duration 
slidingIntervalDuration) {
+  public WindowConfigImpl(Duration windowDuration, Duration 
slidingIntervalDuration) {
     this.windowType = WindowType.TIME;
     this.windowDuration = windowDuration;
     this.slidingIntervalDuration = slidingIntervalDuration;
diff --git 
a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FilterOperator.java
 
b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FilterOperator.java
index af9afe807e..24eea584f6 100644
--- 
a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FilterOperator.java
+++ 
b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FilterOperator.java
@@ -17,13 +17,8 @@
  * under the License.
  */
 
-
 package org.apache.heron.streamlet.impl.operators;
 
-import java.util.Map;
-
-import org.apache.heron.api.bolt.OutputCollector;
-import org.apache.heron.api.topology.TopologyContext;
 import org.apache.heron.api.tuple.Tuple;
 import org.apache.heron.api.tuple.Values;
 import org.apache.heron.streamlet.SerializablePredicate;
@@ -38,18 +33,10 @@
   private static final long serialVersionUID = -4748646871471052706L;
   private SerializablePredicate<? super R> filterFn;
 
-  private OutputCollector collector;
-
   public FilterOperator(SerializablePredicate<? super R> filterFn) {
     this.filterFn = filterFn;
   }
 
-  @SuppressWarnings("rawtypes")
-  @Override
-  public void prepare(Map map, TopologyContext topologyContext, 
OutputCollector outputCollector) {
-    collector = outputCollector;
-  }
-
   @SuppressWarnings("unchecked")
   @Override
   public void execute(Tuple tuple) {
diff --git 
a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FlatMapOperator.java
 
b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FlatMapOperator.java
index 35be0cf5d8..dce70e7426 100644
--- 
a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FlatMapOperator.java
+++ 
b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/FlatMapOperator.java
@@ -17,13 +17,8 @@
  * under the License.
  */
 
-
 package org.apache.heron.streamlet.impl.operators;
 
-import java.util.Map;
-
-import org.apache.heron.api.bolt.OutputCollector;
-import org.apache.heron.api.topology.TopologyContext;
 import org.apache.heron.api.tuple.Tuple;
 import org.apache.heron.api.tuple.Values;
 import org.apache.heron.streamlet.SerializableFunction;
@@ -38,19 +33,11 @@
   private static final long serialVersionUID = -2418329215159618998L;
   private SerializableFunction<? super R, ? extends Iterable<? extends T>> 
flatMapFn;
 
-  private OutputCollector collector;
-
   public FlatMapOperator(
       SerializableFunction<? super R, ? extends Iterable<? extends T>> 
flatMapFn) {
     this.flatMapFn = flatMapFn;
   }
 
-  @SuppressWarnings("rawtypes")
-  @Override
-  public void prepare(Map map, TopologyContext topologyContext, 
OutputCollector outputCollector) {
-    collector = outputCollector;
-  }
-
   @SuppressWarnings("unchecked")
   @Override
   public void execute(Tuple tuple) {
diff --git 
a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java
 
b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java
index 7b9f71f503..fe6d4586e8 100644
--- 
a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java
+++ 
b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java
@@ -17,14 +17,11 @@
  * under the License.
  */
 
-
 package org.apache.heron.streamlet.impl.operators;
 
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.heron.api.bolt.OutputCollector;
-import org.apache.heron.api.topology.TopologyContext;
 import org.apache.heron.api.tuple.Tuple;
 import org.apache.heron.api.tuple.Values;
 import org.apache.heron.api.windowing.TupleWindow;
@@ -45,7 +42,6 @@
   private SerializableFunction<V, K> keyExtractor;
   private VR identity;
   private SerializableBiFunction<VR, V, ? extends VR> reduceFn;
-  private OutputCollector collector;
 
   public GeneralReduceByKeyAndWindowOperator(SerializableFunction<V, K> 
keyExtractor, VR identity,
                             SerializableBiFunction<VR, V, ? extends VR> 
reduceFn) {
@@ -54,12 +50,6 @@ public 
GeneralReduceByKeyAndWindowOperator(SerializableFunction<V, K> keyExtract
     this.reduceFn = reduceFn;
   }
 
-  @SuppressWarnings("rawtypes")
-  @Override
-  public void prepare(Map map, TopologyContext topologyContext, 
OutputCollector outputCollector) {
-    collector = outputCollector;
-  }
-
   @SuppressWarnings("unchecked")
   @Override
   public void execute(TupleWindow inputWindow) {
diff --git 
a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/JoinOperator.java
 
b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/JoinOperator.java
index 00c8d80063..157b487088 100644
--- 
a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/JoinOperator.java
+++ 
b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/JoinOperator.java
@@ -17,7 +17,6 @@
  * under the License.
  */
 
-
 package org.apache.heron.streamlet.impl.operators;
 
 import java.util.HashMap;
@@ -26,8 +25,6 @@
 import java.util.Map;
 
 import org.apache.heron.api.Pair;
-import org.apache.heron.api.bolt.OutputCollector;
-import org.apache.heron.api.topology.TopologyContext;
 import org.apache.heron.api.tuple.Tuple;
 import org.apache.heron.api.tuple.Values;
 import org.apache.heron.api.windowing.TupleWindow;
@@ -58,7 +55,6 @@
   private SerializableFunction<V2, K> rightKeyExtractor;
   // The user supplied join function
   private SerializableBiFunction<V1, V2, ? extends VR> joinFn;
-  private OutputCollector collector;
 
   public JoinOperator(JoinType joinType, String leftComponent, String 
rightComponent,
                       SerializableFunction<V1, K> leftKeyExtractor,
@@ -72,12 +68,6 @@ public JoinOperator(JoinType joinType, String leftComponent, 
String rightCompone
     this.joinFn = joinFn;
   }
 
-  @SuppressWarnings("rawtypes")
-  @Override
-  public void prepare(Map map, TopologyContext topologyContext, 
OutputCollector outputCollector) {
-    collector = outputCollector;
-  }
-
   @Override
   public Map<String, Object> getComponentConfiguration() {
     Map<String, Object> cfg = super.getComponentConfiguration();
diff --git 
a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/MapOperator.java 
b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/MapOperator.java
index 7ee9be1760..73bfbf585b 100644
--- 
a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/MapOperator.java
+++ 
b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/MapOperator.java
@@ -17,13 +17,8 @@
  * under the License.
  */
 
-
 package org.apache.heron.streamlet.impl.operators;
 
-import java.util.Map;
-
-import org.apache.heron.api.bolt.OutputCollector;
-import org.apache.heron.api.topology.TopologyContext;
 import org.apache.heron.api.tuple.Tuple;
 import org.apache.heron.api.tuple.Values;
 import org.apache.heron.streamlet.SerializableFunction;
@@ -37,18 +32,10 @@
   private static final long serialVersionUID = -1303096133107278700L;
   private SerializableFunction<? super R, ? extends T> mapFn;
 
-  private OutputCollector collector;
-
   public MapOperator(SerializableFunction<? super R, ? extends T> mapFn) {
     this.mapFn = mapFn;
   }
 
-  @SuppressWarnings("rawtypes")
-  @Override
-  public void prepare(Map map, TopologyContext topologyContext, 
OutputCollector outputCollector) {
-    collector = outputCollector;
-  }
-
   @SuppressWarnings("unchecked")
   @Override
   public void execute(Tuple tuple) {
diff --git 
a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java
 
b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java
index b0155df918..92c2d20fb6 100644
--- 
a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java
+++ 
b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ReduceByKeyAndWindowOperator.java
@@ -17,14 +17,11 @@
  * under the License.
  */
 
-
 package org.apache.heron.streamlet.impl.operators;
 
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.heron.api.bolt.OutputCollector;
-import org.apache.heron.api.topology.TopologyContext;
 import org.apache.heron.api.tuple.Tuple;
 import org.apache.heron.api.tuple.Values;
 import org.apache.heron.api.windowing.TupleWindow;
@@ -41,11 +38,11 @@
  * function grouped by keys. It emits a KeyedWindow, reduced Value KeyPairs as 
outputs
  */
 public class ReduceByKeyAndWindowOperator<K, V, R> extends 
StreamletWindowOperator<R, V> {
+
   private static final long serialVersionUID = 2833576046687750496L;
   private SerializableFunction<R, K> keyExtractor;
   private SerializableFunction<R, V> valueExtractor;
   private SerializableBinaryOperator<V> reduceFn;
-  private OutputCollector collector;
 
   public ReduceByKeyAndWindowOperator(SerializableFunction<R, K> keyExtractor,
                                       SerializableFunction<R, V> 
valueExtractor,
@@ -55,12 +52,6 @@ public ReduceByKeyAndWindowOperator(SerializableFunction<R, 
K> keyExtractor,
     this.reduceFn = reduceFn;
   }
 
-  @SuppressWarnings("rawtypes")
-  @Override
-  public void prepare(Map map, TopologyContext topologyContext, 
OutputCollector outputCollector) {
-    collector = outputCollector;
-  }
-
   @SuppressWarnings("unchecked")
   @Override
   public void execute(TupleWindow inputWindow) {
diff --git 
a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
 
b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
index 75e9b7f4b0..0b08478559 100644
--- 
a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
+++ 
b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
@@ -17,11 +17,14 @@
  * under the License.
  */
 
-
 package org.apache.heron.streamlet.impl.operators;
 
+import java.util.Map;
+
 import org.apache.heron.api.bolt.BaseRichBolt;
+import org.apache.heron.api.bolt.OutputCollector;
 import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
 import org.apache.heron.api.tuple.Fields;
 import org.apache.heron.streamlet.IStreamletRichOperator;
 
@@ -35,6 +38,16 @@
   private static final long serialVersionUID = 8524238140745238942L;
   private static final String OUTPUT_FIELD_NAME = "output";
 
+  protected OutputCollector collector;
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public void prepare(Map<String, Object> map,
+                      TopologyContext topologyContext,
+                      OutputCollector outputCollector) {
+    collector = outputCollector;
+  }
+
   /**
    * The operators implementing streamlet functionality have some properties.
    * 1. They all output only one stream
diff --git 
a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletWindowOperator.java
 
b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletWindowOperator.java
index f90655e553..9c9f4f856a 100644
--- 
a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletWindowOperator.java
+++ 
b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletWindowOperator.java
@@ -17,11 +17,14 @@
  * under the License.
  */
 
-
 package org.apache.heron.streamlet.impl.operators;
 
+import java.util.Map;
+
 import org.apache.heron.api.bolt.BaseWindowedBolt;
+import org.apache.heron.api.bolt.OutputCollector;
 import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
 import org.apache.heron.api.tuple.Fields;
 import org.apache.heron.streamlet.IStreamletWindowOperator;
 
@@ -32,8 +35,18 @@
 public abstract class StreamletWindowOperator<R, T>
     extends BaseWindowedBolt
     implements IStreamletWindowOperator<R, T> {
+
   private static final long serialVersionUID = -4836560876041237959L;
   private static final String OUTPUT_FIELD_NAME = "output";
+  protected OutputCollector collector;
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public void prepare(Map<String, Object> map,
+                      TopologyContext topologyContext,
+                      OutputCollector outputCollector) {
+    collector = outputCollector;
+  }
 
   /**
    * The operators implementing streamlet functionality have some properties.
diff --git 
a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java
 
b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java
index 0bf52322f6..2379a3b0e5 100644
--- 
a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java
+++ 
b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java
@@ -17,7 +17,6 @@
  * under the License.
  */
 
-
 package org.apache.heron.streamlet.impl.operators;
 
 import java.io.Serializable;
@@ -41,10 +40,10 @@
  */
 public class TransformOperator<R, T> extends StreamletOperator<R, T>
     implements IStatefulComponent<Serializable, Serializable> {
+
   private static final long serialVersionUID = 429297144878185182L;
   private SerializableTransformer<? super R, ? extends T> 
serializableTransformer;
 
-  private OutputCollector collector;
   private State<Serializable, Serializable> state;
 
   public TransformOperator(
@@ -71,7 +70,7 @@ public void cleanup() {
   public void prepare(Map<String, Object> map,
                       TopologyContext topologyContext,
                       OutputCollector outputCollector) {
-    collector = outputCollector;
+    super.prepare(map, topologyContext, outputCollector);
     Context context = new ContextImpl(topologyContext, map, state);
     serializableTransformer.setup(context);
   }
diff --git 
a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java
 
b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java
index 151c0b45f1..4d56bf14c7 100644
--- 
a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java
+++ 
b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/UnionOperator.java
@@ -17,13 +17,8 @@
  * under the License.
  */
 
-
 package org.apache.heron.streamlet.impl.operators;
 
-import java.util.Map;
-
-import org.apache.heron.api.bolt.OutputCollector;
-import org.apache.heron.api.topology.TopologyContext;
 import org.apache.heron.api.tuple.Tuple;
 import org.apache.heron.api.tuple.Values;
 
@@ -33,17 +28,10 @@
  */
 public class UnionOperator<I> extends StreamletOperator<I, I> {
   private static final long serialVersionUID = -7326832064961413315L;
-  private OutputCollector collector;
 
   public UnionOperator() {
   }
 
-  @SuppressWarnings("rawtypes")
-  @Override
-  public void prepare(Map map, TopologyContext topologyContext, 
OutputCollector outputCollector) {
-    collector = outputCollector;
-  }
-
   @SuppressWarnings("unchecked")
   @Override
   public void execute(Tuple tuple) {
diff --git 
a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ComplexSink.java 
b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ComplexSink.java
index 943e3c588e..90b4217cf8 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ComplexSink.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ComplexSink.java
@@ -17,7 +17,6 @@
  * under the License.
  */
 
-
 package org.apache.heron.streamlet.impl.sinks;
 
 import java.io.Serializable;
@@ -39,9 +38,9 @@
  */
 public class ComplexSink<R> extends StreamletOperator<R, R>
     implements IStatefulComponent<Serializable, Serializable> {
+
   private static final long serialVersionUID = 8717991188885786658L;
   private Sink<R> sink;
-  private OutputCollector collector;
   private State<Serializable, Serializable> state;
 
   public ComplexSink(Sink<R> sink) {
diff --git 
a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ConsumerSink.java 
b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ConsumerSink.java
index af947048b5..dd5e4a59b9 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ConsumerSink.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ConsumerSink.java
@@ -17,13 +17,8 @@
  * under the License.
  */
 
-
 package org.apache.heron.streamlet.impl.sinks;
 
-import java.util.Map;
-
-import org.apache.heron.api.bolt.OutputCollector;
-import org.apache.heron.api.topology.TopologyContext;
 import org.apache.heron.api.tuple.Tuple;
 import org.apache.heron.streamlet.SerializableConsumer;
 import org.apache.heron.streamlet.impl.operators.StreamletOperator;
@@ -33,20 +28,14 @@
  * consume function for every tuple.
  */
 public class ConsumerSink<R> extends StreamletOperator<R, R> {
+
   private static final long serialVersionUID = 8716140142187667638L;
   private SerializableConsumer<R> consumer;
-  private OutputCollector collector;
 
   public ConsumerSink(SerializableConsumer<R> consumer) {
     this.consumer = consumer;
   }
 
-  @SuppressWarnings("rawtypes")
-  @Override
-  public void prepare(Map map, TopologyContext topologyContext, 
OutputCollector outputCollector) {
-    this.collector = outputCollector;
-  }
-
   @SuppressWarnings("unchecked")
   @Override
   public void execute(Tuple tuple) {
diff --git 
a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/LogSink.java 
b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/LogSink.java
index 7fdc3c95ee..ae1f6d1899 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/LogSink.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/LogSink.java
@@ -17,14 +17,10 @@
  * under the License.
  */
 
-
 package org.apache.heron.streamlet.impl.sinks;
 
-import java.util.Map;
 import java.util.logging.Logger;
 
-import org.apache.heron.api.bolt.OutputCollector;
-import org.apache.heron.api.topology.TopologyContext;
 import org.apache.heron.api.tuple.Tuple;
 import org.apache.heron.streamlet.impl.operators.StreamletOperator;
 
@@ -33,19 +29,13 @@
  * It basically logs every tuple.
  */
 public class LogSink<R> extends StreamletOperator<R, R> {
+
   private static final long serialVersionUID = -6392422646613189818L;
   private static final Logger LOG = Logger.getLogger(LogSink.class.getName());
-  private OutputCollector collector;
 
   public LogSink() {
   }
 
-  @SuppressWarnings("rawtypes")
-  @Override
-  public void prepare(Map map, TopologyContext topologyContext, 
OutputCollector outputCollector) {
-    this.collector = outputCollector;
-  }
-
   @SuppressWarnings("unchecked")
   @Override
   public void execute(Tuple tuple) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to