This is an automated email from the ASF dual-hosted git repository. huijun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push: new 10c8764 Nwang/remove stateful from streamletoperator (#3034) 10c8764 is described below commit 10c876436d63300fc74c8648271d4e3daf1dfd8b Author: Ning Wang <nw...@twitter.com> AuthorDate: Tue Sep 25 16:17:12 2018 -0700 Nwang/remove stateful from streamletoperator (#3034) * Refactor stateful support into operator implementations from base class * clean up * clean up unused imports * clean up TODO --- .../heron/streamlet/impl/operators/StreamletOperator.java | 13 +------------ .../heron/streamlet/impl/operators/TransformOperator.java | 11 +++++++++-- .../org/apache/heron/streamlet/impl/sinks/ComplexSink.java | 8 +++++++- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java index d2451d2..27e6cd0 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/StreamletOperator.java @@ -20,11 +20,7 @@ package org.apache.heron.streamlet.impl.operators; -import java.io.Serializable; - import org.apache.heron.api.bolt.BaseRichBolt; -import org.apache.heron.api.state.State; -import org.apache.heron.api.topology.IStatefulComponent; import org.apache.heron.api.topology.OutputFieldsDeclarer; import org.apache.heron.api.tuple.Fields; @@ -32,17 +28,10 @@ import org.apache.heron.api.tuple.Fields; * The Bolt interface that other operators of the streamlet packages extend. * The only common stuff amongst all of them is the output streams */ -public abstract class StreamletOperator extends BaseRichBolt - implements IStatefulComponent<Serializable, Serializable> { +public abstract class StreamletOperator extends BaseRichBolt { private static final long serialVersionUID = 8524238140745238942L; private static final String OUTPUT_FIELD_NAME = "output"; - @Override - public void initState(State<Serializable, Serializable> state) { } - - @Override - public void preSave(String checkpointId) { } - /** * The operators implementing streamlet functionality have some properties. * 1. They all output only one stream diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java index b552680..e588f8b 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/TransformOperator.java @@ -25,6 +25,7 @@ import java.util.Map; import org.apache.heron.api.bolt.OutputCollector; import org.apache.heron.api.state.State; +import org.apache.heron.api.topology.IStatefulComponent; import org.apache.heron.api.topology.TopologyContext; import org.apache.heron.api.tuple.Tuple; import org.apache.heron.api.tuple.Values; @@ -38,7 +39,8 @@ import org.apache.heron.streamlet.impl.ContextImpl; * It calls the transformFunction setup/cleanup at the beginning/end of the * processing. And for every tuple, it applies the transformFunction, and emits the resulting value */ -public class TransformOperator<R, T> extends StreamletOperator { +public class TransformOperator<R, T> extends StreamletOperator + implements IStatefulComponent<Serializable, Serializable> { private static final long serialVersionUID = 429297144878185182L; private SerializableTransformer<? super R, ? extends T> serializableTransformer; @@ -56,13 +58,18 @@ public class TransformOperator<R, T> extends StreamletOperator { } @Override + public void preSave(String checkpointId) { + } + + @Override public void cleanup() { serializableTransformer.cleanup(); } @SuppressWarnings("rawtypes") @Override - public void prepare(Map<String, Object> map, TopologyContext topologyContext, + public void prepare(Map<String, Object> map, + TopologyContext topologyContext, OutputCollector outputCollector) { collector = outputCollector; Context context = new ContextImpl(topologyContext, map, state); diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ComplexSink.java b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ComplexSink.java index 0b4a6b5..ab4dbc4 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ComplexSink.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/sinks/ComplexSink.java @@ -25,6 +25,7 @@ import java.util.Map; import org.apache.heron.api.bolt.OutputCollector; import org.apache.heron.api.state.State; +import org.apache.heron.api.topology.IStatefulComponent; import org.apache.heron.api.topology.TopologyContext; import org.apache.heron.api.tuple.Tuple; import org.apache.heron.streamlet.Context; @@ -36,7 +37,8 @@ import org.apache.heron.streamlet.impl.operators.StreamletOperator; * ConsumerSink is a very simple Sink that basically invokes a user supplied * consume function for every tuple. */ -public class ComplexSink<R> extends StreamletOperator { +public class ComplexSink<R> extends StreamletOperator + implements IStatefulComponent<Serializable, Serializable> { private static final long serialVersionUID = 8717991188885786658L; private Sink<R> sink; private OutputCollector collector; @@ -51,6 +53,10 @@ public class ComplexSink<R> extends StreamletOperator { this.state = startupState; } + @Override + public void preSave(String checkpointId) { + } + @SuppressWarnings("rawtypes") @Override public void prepare(Map<String, Object> map, TopologyContext topologyContext,