This is an automated email from the ASF dual-hosted git repository.

huijun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 10c8764  Nwang/remove stateful from streamletoperator (#3034)
10c8764 is described below

commit 10c876436d63300fc74c8648271d4e3daf1dfd8b
Author: Ning Wang <nw...@twitter.com>
AuthorDate: Tue Sep 25 16:17:12 2018 -0700

    Nwang/remove stateful from streamletoperator (#3034)
    
    * Refactor stateful support into operator implementations from base class
    
    * clean up
    
    * clean up unused imports
    
    * clean up TODO
---
 .../heron/streamlet/impl/operators/StreamletOperator.java   | 13 +------------
 .../heron/streamlet/impl/operators/TransformOperator.java   | 11 +++++++++--
 .../org/apache/heron/streamlet/impl/sinks/ComplexSink.java  |  8 +++++++-
 3 files changed, 17 insertions(+), 15 deletions(-)

diff --git 
a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
 
b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
index d2451d2..27e6cd0 100644
--- 
a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
+++ 
b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java
@@ -20,11 +20,7 @@
 
 package org.apache.heron.streamlet.impl.operators;
 
-import java.io.Serializable;
-
 import org.apache.heron.api.bolt.BaseRichBolt;
-import org.apache.heron.api.state.State;
-import org.apache.heron.api.topology.IStatefulComponent;
 import org.apache.heron.api.topology.OutputFieldsDeclarer;
 import org.apache.heron.api.tuple.Fields;
 
@@ -32,17 +28,10 @@ import org.apache.heron.api.tuple.Fields;
  * The Bolt interface that other operators of the streamlet packages extend.
  * The only common stuff amongst all of them is the output streams
  */
-public abstract class StreamletOperator extends BaseRichBolt
-    implements IStatefulComponent<Serializable, Serializable> {
+public abstract class StreamletOperator extends BaseRichBolt {
   private static final long serialVersionUID = 8524238140745238942L;
   private static final String OUTPUT_FIELD_NAME = "output";
 
-  @Override
-  public void initState(State<Serializable, Serializable> state) { }
-
-  @Override
-  public void preSave(String checkpointId) { }
-
   /**
    * The operators implementing streamlet functionality have some properties.
    * 1. They all output only one stream
diff --git 
a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java
 
b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java
index b552680..e588f8b 100644
--- 
a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java
+++ 
b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java
@@ -25,6 +25,7 @@ import java.util.Map;
 
 import org.apache.heron.api.bolt.OutputCollector;
 import org.apache.heron.api.state.State;
+import org.apache.heron.api.topology.IStatefulComponent;
 import org.apache.heron.api.topology.TopologyContext;
 import org.apache.heron.api.tuple.Tuple;
 import org.apache.heron.api.tuple.Values;
@@ -38,7 +39,8 @@ import org.apache.heron.streamlet.impl.ContextImpl;
  * It calls the transformFunction setup/cleanup at the beginning/end of the
  * processing. And for every tuple, it applies the transformFunction, and 
emits the resulting value
  */
-public class TransformOperator<R, T> extends StreamletOperator {
+public class TransformOperator<R, T> extends StreamletOperator
+    implements IStatefulComponent<Serializable, Serializable> {
   private static final long serialVersionUID = 429297144878185182L;
   private SerializableTransformer<? super R, ? extends T> 
serializableTransformer;
 
@@ -56,13 +58,18 @@ public class TransformOperator<R, T> extends 
StreamletOperator {
   }
 
   @Override
+  public void preSave(String checkpointId) {
+  }
+
+  @Override
   public void cleanup() {
     serializableTransformer.cleanup();
   }
 
   @SuppressWarnings("rawtypes")
   @Override
-  public void prepare(Map<String, Object> map, TopologyContext topologyContext,
+  public void prepare(Map<String, Object> map,
+                      TopologyContext topologyContext,
                       OutputCollector outputCollector) {
     collector = outputCollector;
     Context context = new ContextImpl(topologyContext, map, state);
diff --git 
a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ComplexSink.java 
b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ComplexSink.java
index 0b4a6b5..ab4dbc4 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ComplexSink.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ComplexSink.java
@@ -25,6 +25,7 @@ import java.util.Map;
 
 import org.apache.heron.api.bolt.OutputCollector;
 import org.apache.heron.api.state.State;
+import org.apache.heron.api.topology.IStatefulComponent;
 import org.apache.heron.api.topology.TopologyContext;
 import org.apache.heron.api.tuple.Tuple;
 import org.apache.heron.streamlet.Context;
@@ -36,7 +37,8 @@ import 
org.apache.heron.streamlet.impl.operators.StreamletOperator;
  * ConsumerSink is a very simple Sink that basically invokes a user supplied
  * consume function for every tuple.
  */
-public class ComplexSink<R> extends StreamletOperator {
+public class ComplexSink<R> extends StreamletOperator
+    implements IStatefulComponent<Serializable, Serializable> {
   private static final long serialVersionUID = 8717991188885786658L;
   private Sink<R> sink;
   private OutputCollector collector;
@@ -51,6 +53,10 @@ public class ComplexSink<R> extends StreamletOperator {
     this.state = startupState;
   }
 
+  @Override
+  public void preSave(String checkpointId) {
+  }
+
   @SuppressWarnings("rawtypes")
   @Override
   public void prepare(Map<String, Object> map, TopologyContext topologyContext,

Reply via email to