[STORM-1714] refactored common logic into BaseStatefulBoltExecutor

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/95e7afc2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/95e7afc2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/95e7afc2

Branch: refs/heads/1.x-branch
Commit: 95e7afc29cde1b7f16c1b0ab8ad65bbd9ed9a2af
Parents: 3b879d6
Author: Arun Mahadevan <[email protected]>
Authored: Mon Apr 18 23:17:10 2016 +0530
Committer: Jungtaek Lim <[email protected]>
Committed: Wed Apr 20 17:18:17 2016 +0900

----------------------------------------------------------------------
 .../topology/BaseStatefulBoltExecutor.java      | 209 +++++++++++++++++++
 .../topology/CheckpointTupleForwarder.java      | 165 +--------------
 .../storm/topology/StatefulBoltExecutor.java    |   4 +-
 3 files changed, 218 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/95e7afc2/storm-core/src/jvm/org/apache/storm/topology/BaseStatefulBoltExecutor.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/topology/BaseStatefulBoltExecutor.java 
b/storm-core/src/jvm/org/apache/storm/topology/BaseStatefulBoltExecutor.java
new file mode 100644
index 0000000..b93a061
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/topology/BaseStatefulBoltExecutor.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.topology;
+
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.spout.CheckPointState;
+import org.apache.storm.spout.CheckpointSpout;
+import org.apache.storm.task.IOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.storm.spout.CheckPointState.Action.ROLLBACK;
+import static org.apache.storm.spout.CheckpointSpout.CHECKPOINT_FIELD_ACTION;
+import static org.apache.storm.spout.CheckpointSpout.CHECKPOINT_FIELD_TXID;
+import static org.apache.storm.spout.CheckpointSpout.CHECKPOINT_STREAM_ID;
+
+/**
+ * Base class that abstracts the common logic for executing bolts in a 
stateful topology.
+ */
+public abstract class BaseStatefulBoltExecutor implements IRichBolt {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BaseStatefulBoltExecutor.class);
+    private final Map<TransactionRequest, Integer> transactionRequestCount;
+    private int checkPointInputTaskCount;
+    private long lastTxid = Long.MIN_VALUE;
+    protected OutputCollector collector;
+
+    public BaseStatefulBoltExecutor() {
+        transactionRequestCount = new HashMap<>();
+    }
+
+    protected void init(TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+        this.checkPointInputTaskCount = getCheckpointInputTaskCount(context);
+    }
+
+    /**
+     * returns the total number of input checkpoint streams across
+     * all input tasks to this component.
+     */
+    private int getCheckpointInputTaskCount(TopologyContext context) {
+        int count = 0;
+        for (GlobalStreamId inputStream : context.getThisSources().keySet()) {
+            if (CHECKPOINT_STREAM_ID.equals(inputStream.get_streamId())) {
+                count += 
context.getComponentTasks(inputStream.get_componentId()).size();
+            }
+        }
+        return count;
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        if (CheckpointSpout.isCheckpoint(input)) {
+            processCheckpoint(input);
+        } else {
+            handleTuple(input);
+        }
+    }
+
+    /**
+     * Invokes handleCheckpoint once checkpoint tuple is received on
+     * all input checkpoint streams to this component.
+     */
+    private void processCheckpoint(Tuple input) {
+        CheckPointState.Action action = (CheckPointState.Action) 
input.getValueByField(CHECKPOINT_FIELD_ACTION);
+        long txid = input.getLongByField(CHECKPOINT_FIELD_TXID);
+        if (shouldProcessTransaction(action, txid)) {
+            LOG.debug("Processing action {}, txid {}", action, txid);
+            try {
+                if (txid >= lastTxid) {
+                    handleCheckpoint(input, action, txid);
+                    if (action == ROLLBACK) {
+                        lastTxid = txid - 1;
+                    } else {
+                        lastTxid = txid;
+                    }
+                } else {
+                    LOG.debug("Ignoring old transaction. Action {}, txid {}", 
action, txid);
+                    collector.ack(input);
+                }
+            } catch (Throwable th) {
+                LOG.error("Got error while processing checkpoint tuple", th);
+                collector.fail(input);
+                collector.reportError(th);
+            }
+        } else {
+            LOG.debug("Waiting for action {}, txid {} from all input tasks. 
checkPointInputTaskCount {}, " +
+                              "transactionRequestCount {}", action, txid, 
checkPointInputTaskCount, transactionRequestCount);
+            collector.ack(input);
+        }
+    }
+
+    /**
+     * Checks if check points have been received from all tasks across
+     * all input streams to this component
+     */
+    private boolean shouldProcessTransaction(CheckPointState.Action action, 
long txid) {
+        TransactionRequest request = new TransactionRequest(action, txid);
+        Integer count;
+        if ((count = transactionRequestCount.get(request)) == null) {
+            transactionRequestCount.put(request, 1);
+            count = 1;
+        } else {
+            transactionRequestCount.put(request, ++count);
+        }
+        if (count == checkPointInputTaskCount) {
+            transactionRequestCount.remove(request);
+            return true;
+        }
+        return false;
+    }
+
+    protected void declareCheckpointStream(OutputFieldsDeclarer declarer) {
+        declarer.declareStream(CHECKPOINT_STREAM_ID, new 
Fields(CHECKPOINT_FIELD_TXID, CHECKPOINT_FIELD_ACTION));
+    }
+
+    /**
+     * Sub-classes can implement the logic for handling the tuple.
+     *
+     * @param input the input tuple
+     */
+    protected abstract void handleTuple(Tuple input);
+
+    /**
+     * Sub-classes can implement the logic for handling checkpoint tuple.
+     *
+     * @param checkpointTuple the checkpoint tuple
+     * @param action          the action (prepare, commit, rollback or 
initstate)
+     * @param txid            the transaction id.
+     */
+    protected abstract void handleCheckpoint(Tuple checkpointTuple, 
CheckPointState.Action action, long txid);
+
+    protected static class AnchoringOutputCollector extends OutputCollector {
+        AnchoringOutputCollector(IOutputCollector delegate) {
+            super(delegate);
+        }
+
+        @Override
+        public List<Integer> emit(String streamId, List<Object> tuple) {
+            throw new UnsupportedOperationException("Bolts in a stateful 
topology must emit anchored tuples.");
+        }
+
+        @Override
+        public void emitDirect(int taskId, String streamId, List<Object> 
tuple) {
+            throw new UnsupportedOperationException("Bolts in a stateful 
topology must emit anchored tuples.");
+        }
+    }
+
+    private static class TransactionRequest {
+        private final CheckPointState.Action action;
+
+        private final long txid;
+
+        TransactionRequest(CheckPointState.Action action, long txid) {
+            this.action = action;
+            this.txid = txid;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            TransactionRequest that = (TransactionRequest) o;
+
+            if (txid != that.txid) return false;
+            return !(action != null ? !action.equals(that.action) : 
that.action != null);
+
+        }
+
+        @Override
+        public int hashCode() {
+            int result = action != null ? action.hashCode() : 0;
+            result = 31 * result + (int) (txid ^ (txid >>> 32));
+            return result;
+        }
+
+        @Override
+        public String toString() {
+            return "TransactionRequest{" +
+                    "action='" + action + '\'' +
+                    ", txid=" + txid +
+                    '}';
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/95e7afc2/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java 
b/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
index 9d21c33..a510c0c 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
@@ -45,41 +45,18 @@ import static org.apache.storm.spout.CheckpointSpout.*;
  * can flow through the entire topology DAG.
  * </p>
  */
-public class CheckpointTupleForwarder implements IRichBolt {
+public class CheckpointTupleForwarder extends BaseStatefulBoltExecutor {
     private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointTupleForwarder.class);
     private final IRichBolt bolt;
-    private final Map<TransactionRequest, Integer> transactionRequestCount;
-    private int checkPointInputTaskCount;
-    private long lastTxid = Long.MIN_VALUE;
-    private AnchoringOutputCollector collector;
-
-    public CheckpointTupleForwarder() {
-        this(null);
-    }
 
     public CheckpointTupleForwarder(IRichBolt bolt) {
         this.bolt = bolt;
-        transactionRequestCount = new HashMap<>();
-    }
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
-        init(context, collector);
-        bolt.prepare(stormConf, context, this.collector);
-    }
-
-    protected void init(TopologyContext context, OutputCollector collector) {
-        this.collector = new AnchoringOutputCollector(collector);
-        this.checkPointInputTaskCount = getCheckpointInputTaskCount(context);
     }
 
     @Override
-    public void execute(Tuple input) {
-        if (CheckpointSpout.isCheckpoint(input)) {
-            processCheckpoint(input);
-        } else {
-            handleTuple(input);
-        }
+    public void prepare(Map stormConf, TopologyContext context, 
OutputCollector outputCollector) {
+        init(context, new AnchoringOutputCollector(outputCollector));
+        bolt.prepare(stormConf, context, collector);
     }
 
     @Override
@@ -93,18 +70,13 @@ public class CheckpointTupleForwarder implements IRichBolt {
         declareCheckpointStream(declarer);
     }
 
-    protected void declareCheckpointStream(OutputFieldsDeclarer declarer) {
-        declarer.declareStream(CHECKPOINT_STREAM_ID, new 
Fields(CHECKPOINT_FIELD_TXID, CHECKPOINT_FIELD_ACTION));
-    }
-
     @Override
     public Map<String, Object> getComponentConfiguration() {
         return bolt.getComponentConfiguration();
     }
 
     /**
-     * Forwards the checkpoint tuple downstream. Sub-classes can override
-     * with the logic for handling checkpoint tuple.
+     * Forwards the checkpoint tuple downstream.
      *
      * @param checkpointTuple  the checkpoint tuple
      * @param action the action (prepare, commit, rollback or initstate)
@@ -116,8 +88,8 @@ public class CheckpointTupleForwarder implements IRichBolt {
     }
 
     /**
-     * Hands off tuple to the wrapped bolt to execute. Sub-classes can
-     * override the behavior.
+     * Hands off tuple to the wrapped bolt to execute.
+     *
      * <p>
      * Right now tuples continue to get forwarded while waiting for 
checkpoints to arrive on other streams
      * after checkpoint arrives on one of the streams. This can cause 
duplicates but still at least once.
@@ -128,127 +100,4 @@ public class CheckpointTupleForwarder implements 
IRichBolt {
     protected void handleTuple(Tuple input) {
         bolt.execute(input);
     }
-
-    /**
-     * Invokes handleCheckpoint once checkpoint tuple is received on
-     * all input checkpoint streams to this component.
-     */
-    private void processCheckpoint(Tuple input) {
-        Action action = (Action) 
input.getValueByField(CHECKPOINT_FIELD_ACTION);
-        long txid = input.getLongByField(CHECKPOINT_FIELD_TXID);
-        if (shouldProcessTransaction(action, txid)) {
-            LOG.debug("Processing action {}, txid {}", action, txid);
-            try {
-                if (txid >= lastTxid) {
-                    handleCheckpoint(input, action, txid);
-                    if (action == ROLLBACK) {
-                        lastTxid = txid - 1;
-                    } else {
-                        lastTxid = txid;
-                    }
-                } else {
-                    LOG.debug("Ignoring old transaction. Action {}, txid {}", 
action, txid);
-                    collector.ack(input);
-                }
-            } catch (Throwable th) {
-                LOG.error("Got error while processing checkpoint tuple", th);
-                collector.fail(input);
-                collector.reportError(th);
-            }
-        } else {
-            LOG.debug("Waiting for action {}, txid {} from all input tasks. 
checkPointInputTaskCount {}, " +
-                              "transactionRequestCount {}", action, txid, 
checkPointInputTaskCount, transactionRequestCount);
-            collector.ack(input);
-        }
-    }
-
-    /**
-     * returns the total number of input checkpoint streams across
-     * all input tasks to this component.
-     */
-    private int getCheckpointInputTaskCount(TopologyContext context) {
-        int count = 0;
-        for (GlobalStreamId inputStream : context.getThisSources().keySet()) {
-            if (CHECKPOINT_STREAM_ID.equals(inputStream.get_streamId())) {
-                count += 
context.getComponentTasks(inputStream.get_componentId()).size();
-            }
-        }
-        return count;
-    }
-
-    /**
-     * Checks if check points have been received from all tasks across
-     * all input streams to this component
-     */
-    private boolean shouldProcessTransaction(Action action, long txid) {
-        TransactionRequest request = new TransactionRequest(action, txid);
-        Integer count;
-        if ((count = transactionRequestCount.get(request)) == null) {
-            transactionRequestCount.put(request, 1);
-            count = 1;
-        } else {
-            transactionRequestCount.put(request, ++count);
-        }
-        if (count == checkPointInputTaskCount) {
-            transactionRequestCount.remove(request);
-            return true;
-        }
-        return false;
-    }
-
-    private static class TransactionRequest {
-        private final Action action;
-        private final long txid;
-
-        TransactionRequest(Action action, long txid) {
-            this.action = action;
-            this.txid = txid;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-
-            TransactionRequest that = (TransactionRequest) o;
-
-            if (txid != that.txid) return false;
-            return !(action != null ? !action.equals(that.action) : 
that.action != null);
-
-        }
-
-        @Override
-        public int hashCode() {
-            int result = action != null ? action.hashCode() : 0;
-            result = 31 * result + (int) (txid ^ (txid >>> 32));
-            return result;
-        }
-
-        @Override
-        public String toString() {
-            return "TransactionRequest{" +
-                    "action='" + action + '\'' +
-                    ", txid=" + txid +
-                    '}';
-        }
-    }
-
-
-    protected static class AnchoringOutputCollector extends OutputCollector {
-        AnchoringOutputCollector(IOutputCollector delegate) {
-            super(delegate);
-        }
-
-        @Override
-        public List<Integer> emit(String streamId, List<Object> tuple) {
-            throw new UnsupportedOperationException("Bolts in a stateful 
topology must emit anchored tuples.");
-        }
-
-        @Override
-        public void emitDirect(int taskId, String streamId, List<Object> 
tuple) {
-            throw new UnsupportedOperationException("Bolts in a stateful 
topology must emit anchored tuples.");
-        }
-
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/95e7afc2/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java 
b/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
index 9873084..01b04ab 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
@@ -42,7 +42,7 @@ import static 
org.apache.storm.spout.CheckPointState.Action.INITSTATE;
 /**
  * Wraps a {@link IStatefulBolt} and manages the state of the bolt.
  */
-public class StatefulBoltExecutor<T extends State> extends 
CheckpointTupleForwarder {
+public class StatefulBoltExecutor<T extends State> extends 
BaseStatefulBoltExecutor {
     private static final Logger LOG = 
LoggerFactory.getLogger(StatefulBoltExecutor.class);
     private final IStatefulBolt<T> bolt;
     private State state;
@@ -78,7 +78,7 @@ public class StatefulBoltExecutor<T extends State> extends 
CheckpointTupleForwar
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         bolt.declareOutputFields(declarer);
-        super.declareCheckpointStream(declarer);
+        declareCheckpointStream(declarer);
     }
 
     @Override

Reply via email to