[STORM-1714] refactored common logic into BaseStatefulBoltExecutor
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/95e7afc2 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/95e7afc2 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/95e7afc2 Branch: refs/heads/1.x-branch Commit: 95e7afc29cde1b7f16c1b0ab8ad65bbd9ed9a2af Parents: 3b879d6 Author: Arun Mahadevan <[email protected]> Authored: Mon Apr 18 23:17:10 2016 +0530 Committer: Jungtaek Lim <[email protected]> Committed: Wed Apr 20 17:18:17 2016 +0900 ---------------------------------------------------------------------- .../topology/BaseStatefulBoltExecutor.java | 209 +++++++++++++++++++ .../topology/CheckpointTupleForwarder.java | 165 +-------------- .../storm/topology/StatefulBoltExecutor.java | 4 +- 3 files changed, 218 insertions(+), 160 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/95e7afc2/storm-core/src/jvm/org/apache/storm/topology/BaseStatefulBoltExecutor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/topology/BaseStatefulBoltExecutor.java b/storm-core/src/jvm/org/apache/storm/topology/BaseStatefulBoltExecutor.java new file mode 100644 index 0000000..b93a061 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/topology/BaseStatefulBoltExecutor.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.topology; + +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.spout.CheckPointState; +import org.apache.storm.spout.CheckpointSpout; +import org.apache.storm.task.IOutputCollector; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.storm.spout.CheckPointState.Action.ROLLBACK; +import static org.apache.storm.spout.CheckpointSpout.CHECKPOINT_FIELD_ACTION; +import static org.apache.storm.spout.CheckpointSpout.CHECKPOINT_FIELD_TXID; +import static org.apache.storm.spout.CheckpointSpout.CHECKPOINT_STREAM_ID; + +/** + * Base class that abstracts the common logic for executing bolts in a stateful topology. + */ +public abstract class BaseStatefulBoltExecutor implements IRichBolt { + private static final Logger LOG = LoggerFactory.getLogger(BaseStatefulBoltExecutor.class); + private final Map<TransactionRequest, Integer> transactionRequestCount; + private int checkPointInputTaskCount; + private long lastTxid = Long.MIN_VALUE; + protected OutputCollector collector; + + public BaseStatefulBoltExecutor() { + transactionRequestCount = new HashMap<>(); + } + + protected void init(TopologyContext context, OutputCollector collector) { + this.collector = collector; + this.checkPointInputTaskCount = getCheckpointInputTaskCount(context); + } + + /** + * returns the total number of input checkpoint streams across + * all input tasks to this component. + */ + private int getCheckpointInputTaskCount(TopologyContext context) { + int count = 0; + for (GlobalStreamId inputStream : context.getThisSources().keySet()) { + if (CHECKPOINT_STREAM_ID.equals(inputStream.get_streamId())) { + count += context.getComponentTasks(inputStream.get_componentId()).size(); + } + } + return count; + } + + @Override + public void execute(Tuple input) { + if (CheckpointSpout.isCheckpoint(input)) { + processCheckpoint(input); + } else { + handleTuple(input); + } + } + + /** + * Invokes handleCheckpoint once checkpoint tuple is received on + * all input checkpoint streams to this component. + */ + private void processCheckpoint(Tuple input) { + CheckPointState.Action action = (CheckPointState.Action) input.getValueByField(CHECKPOINT_FIELD_ACTION); + long txid = input.getLongByField(CHECKPOINT_FIELD_TXID); + if (shouldProcessTransaction(action, txid)) { + LOG.debug("Processing action {}, txid {}", action, txid); + try { + if (txid >= lastTxid) { + handleCheckpoint(input, action, txid); + if (action == ROLLBACK) { + lastTxid = txid - 1; + } else { + lastTxid = txid; + } + } else { + LOG.debug("Ignoring old transaction. Action {}, txid {}", action, txid); + collector.ack(input); + } + } catch (Throwable th) { + LOG.error("Got error while processing checkpoint tuple", th); + collector.fail(input); + collector.reportError(th); + } + } else { + LOG.debug("Waiting for action {}, txid {} from all input tasks. checkPointInputTaskCount {}, " + + "transactionRequestCount {}", action, txid, checkPointInputTaskCount, transactionRequestCount); + collector.ack(input); + } + } + + /** + * Checks if check points have been received from all tasks across + * all input streams to this component + */ + private boolean shouldProcessTransaction(CheckPointState.Action action, long txid) { + TransactionRequest request = new TransactionRequest(action, txid); + Integer count; + if ((count = transactionRequestCount.get(request)) == null) { + transactionRequestCount.put(request, 1); + count = 1; + } else { + transactionRequestCount.put(request, ++count); + } + if (count == checkPointInputTaskCount) { + transactionRequestCount.remove(request); + return true; + } + return false; + } + + protected void declareCheckpointStream(OutputFieldsDeclarer declarer) { + declarer.declareStream(CHECKPOINT_STREAM_ID, new Fields(CHECKPOINT_FIELD_TXID, CHECKPOINT_FIELD_ACTION)); + } + + /** + * Sub-classes can implement the logic for handling the tuple. + * + * @param input the input tuple + */ + protected abstract void handleTuple(Tuple input); + + /** + * Sub-classes can implement the logic for handling checkpoint tuple. + * + * @param checkpointTuple the checkpoint tuple + * @param action the action (prepare, commit, rollback or initstate) + * @param txid the transaction id. + */ + protected abstract void handleCheckpoint(Tuple checkpointTuple, CheckPointState.Action action, long txid); + + protected static class AnchoringOutputCollector extends OutputCollector { + AnchoringOutputCollector(IOutputCollector delegate) { + super(delegate); + } + + @Override + public List<Integer> emit(String streamId, List<Object> tuple) { + throw new UnsupportedOperationException("Bolts in a stateful topology must emit anchored tuples."); + } + + @Override + public void emitDirect(int taskId, String streamId, List<Object> tuple) { + throw new UnsupportedOperationException("Bolts in a stateful topology must emit anchored tuples."); + } + } + + private static class TransactionRequest { + private final CheckPointState.Action action; + + private final long txid; + + TransactionRequest(CheckPointState.Action action, long txid) { + this.action = action; + this.txid = txid; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TransactionRequest that = (TransactionRequest) o; + + if (txid != that.txid) return false; + return !(action != null ? !action.equals(that.action) : that.action != null); + + } + + @Override + public int hashCode() { + int result = action != null ? action.hashCode() : 0; + result = 31 * result + (int) (txid ^ (txid >>> 32)); + return result; + } + + @Override + public String toString() { + return "TransactionRequest{" + + "action='" + action + '\'' + + ", txid=" + txid + + '}'; + } + + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/95e7afc2/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java b/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java index 9d21c33..a510c0c 100644 --- a/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java +++ b/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java @@ -45,41 +45,18 @@ import static org.apache.storm.spout.CheckpointSpout.*; * can flow through the entire topology DAG. * </p> */ -public class CheckpointTupleForwarder implements IRichBolt { +public class CheckpointTupleForwarder extends BaseStatefulBoltExecutor { private static final Logger LOG = LoggerFactory.getLogger(CheckpointTupleForwarder.class); private final IRichBolt bolt; - private final Map<TransactionRequest, Integer> transactionRequestCount; - private int checkPointInputTaskCount; - private long lastTxid = Long.MIN_VALUE; - private AnchoringOutputCollector collector; - - public CheckpointTupleForwarder() { - this(null); - } public CheckpointTupleForwarder(IRichBolt bolt) { this.bolt = bolt; - transactionRequestCount = new HashMap<>(); - } - - @Override - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - init(context, collector); - bolt.prepare(stormConf, context, this.collector); - } - - protected void init(TopologyContext context, OutputCollector collector) { - this.collector = new AnchoringOutputCollector(collector); - this.checkPointInputTaskCount = getCheckpointInputTaskCount(context); } @Override - public void execute(Tuple input) { - if (CheckpointSpout.isCheckpoint(input)) { - processCheckpoint(input); - } else { - handleTuple(input); - } + public void prepare(Map stormConf, TopologyContext context, OutputCollector outputCollector) { + init(context, new AnchoringOutputCollector(outputCollector)); + bolt.prepare(stormConf, context, collector); } @Override @@ -93,18 +70,13 @@ public class CheckpointTupleForwarder implements IRichBolt { declareCheckpointStream(declarer); } - protected void declareCheckpointStream(OutputFieldsDeclarer declarer) { - declarer.declareStream(CHECKPOINT_STREAM_ID, new Fields(CHECKPOINT_FIELD_TXID, CHECKPOINT_FIELD_ACTION)); - } - @Override public Map<String, Object> getComponentConfiguration() { return bolt.getComponentConfiguration(); } /** - * Forwards the checkpoint tuple downstream. Sub-classes can override - * with the logic for handling checkpoint tuple. + * Forwards the checkpoint tuple downstream. * * @param checkpointTuple the checkpoint tuple * @param action the action (prepare, commit, rollback or initstate) @@ -116,8 +88,8 @@ public class CheckpointTupleForwarder implements IRichBolt { } /** - * Hands off tuple to the wrapped bolt to execute. Sub-classes can - * override the behavior. + * Hands off tuple to the wrapped bolt to execute. + * * <p> * Right now tuples continue to get forwarded while waiting for checkpoints to arrive on other streams * after checkpoint arrives on one of the streams. This can cause duplicates but still at least once. @@ -128,127 +100,4 @@ public class CheckpointTupleForwarder implements IRichBolt { protected void handleTuple(Tuple input) { bolt.execute(input); } - - /** - * Invokes handleCheckpoint once checkpoint tuple is received on - * all input checkpoint streams to this component. - */ - private void processCheckpoint(Tuple input) { - Action action = (Action) input.getValueByField(CHECKPOINT_FIELD_ACTION); - long txid = input.getLongByField(CHECKPOINT_FIELD_TXID); - if (shouldProcessTransaction(action, txid)) { - LOG.debug("Processing action {}, txid {}", action, txid); - try { - if (txid >= lastTxid) { - handleCheckpoint(input, action, txid); - if (action == ROLLBACK) { - lastTxid = txid - 1; - } else { - lastTxid = txid; - } - } else { - LOG.debug("Ignoring old transaction. Action {}, txid {}", action, txid); - collector.ack(input); - } - } catch (Throwable th) { - LOG.error("Got error while processing checkpoint tuple", th); - collector.fail(input); - collector.reportError(th); - } - } else { - LOG.debug("Waiting for action {}, txid {} from all input tasks. checkPointInputTaskCount {}, " + - "transactionRequestCount {}", action, txid, checkPointInputTaskCount, transactionRequestCount); - collector.ack(input); - } - } - - /** - * returns the total number of input checkpoint streams across - * all input tasks to this component. - */ - private int getCheckpointInputTaskCount(TopologyContext context) { - int count = 0; - for (GlobalStreamId inputStream : context.getThisSources().keySet()) { - if (CHECKPOINT_STREAM_ID.equals(inputStream.get_streamId())) { - count += context.getComponentTasks(inputStream.get_componentId()).size(); - } - } - return count; - } - - /** - * Checks if check points have been received from all tasks across - * all input streams to this component - */ - private boolean shouldProcessTransaction(Action action, long txid) { - TransactionRequest request = new TransactionRequest(action, txid); - Integer count; - if ((count = transactionRequestCount.get(request)) == null) { - transactionRequestCount.put(request, 1); - count = 1; - } else { - transactionRequestCount.put(request, ++count); - } - if (count == checkPointInputTaskCount) { - transactionRequestCount.remove(request); - return true; - } - return false; - } - - private static class TransactionRequest { - private final Action action; - private final long txid; - - TransactionRequest(Action action, long txid) { - this.action = action; - this.txid = txid; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - TransactionRequest that = (TransactionRequest) o; - - if (txid != that.txid) return false; - return !(action != null ? !action.equals(that.action) : that.action != null); - - } - - @Override - public int hashCode() { - int result = action != null ? action.hashCode() : 0; - result = 31 * result + (int) (txid ^ (txid >>> 32)); - return result; - } - - @Override - public String toString() { - return "TransactionRequest{" + - "action='" + action + '\'' + - ", txid=" + txid + - '}'; - } - } - - - protected static class AnchoringOutputCollector extends OutputCollector { - AnchoringOutputCollector(IOutputCollector delegate) { - super(delegate); - } - - @Override - public List<Integer> emit(String streamId, List<Object> tuple) { - throw new UnsupportedOperationException("Bolts in a stateful topology must emit anchored tuples."); - } - - @Override - public void emitDirect(int taskId, String streamId, List<Object> tuple) { - throw new UnsupportedOperationException("Bolts in a stateful topology must emit anchored tuples."); - } - - } - } http://git-wip-us.apache.org/repos/asf/storm/blob/95e7afc2/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java b/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java index 9873084..01b04ab 100644 --- a/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java +++ b/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java @@ -42,7 +42,7 @@ import static org.apache.storm.spout.CheckPointState.Action.INITSTATE; /** * Wraps a {@link IStatefulBolt} and manages the state of the bolt. */ -public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwarder { +public class StatefulBoltExecutor<T extends State> extends BaseStatefulBoltExecutor { private static final Logger LOG = LoggerFactory.getLogger(StatefulBoltExecutor.class); private final IStatefulBolt<T> bolt; private State state; @@ -78,7 +78,7 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { bolt.declareOutputFields(declarer); - super.declareCheckpointStream(declarer); + declareCheckpointStream(declarer); } @Override
