[
https://issues.apache.org/jira/browse/STORM-1175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15069187#comment-15069187
]
ASF GitHub Bot commented on STORM-1175:
---------------------------------------
Github user arunmahadevan commented on a diff in the pull request:
https://github.com/apache/storm/pull/939#discussion_r48323100
--- Diff: storm-core/src/jvm/backtype/storm/spout/CheckpointSpout.java ---
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.spout;
+
+import backtype.storm.Config;
+import backtype.storm.state.KeyValueState;
+import backtype.storm.state.StateFactory;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import static backtype.storm.spout.CheckPointState.State.COMMITTED;
+import static backtype.storm.spout.CheckPointState.State.COMMITTING;
+import static backtype.storm.spout.CheckPointState.State.PREPARING;
+
+/**
+ * Emits checkpoint tuples which is used to save the state of the {@link
backtype.storm.topology.IStatefulComponent}
+ * across the topology. If a topology contains Stateful bolts, Checkpoint
spouts are automatically added
+ * to the topology. There is only one Checkpoint task per topology.
+ * <p/>
+ * Checkpoint spout stores its internal state in a {@link KeyValueState}.
The state transitions are as follows.
+ * <p/>
+ * <pre>
+ * ROLLBACK(tx2)
+ * <------------- PREPARE(tx2)
COMMIT(tx2)
+ * COMMITTED(tx1)-------------> PREPARING(tx2) -------------->
COMMITTING(tx2) -----------------> COMMITTED (tx2)
+ *
+ *
+ * </pre>
+ */
+public class CheckpointSpout extends BaseRichSpout {
+ private static final Logger LOG =
LoggerFactory.getLogger(CheckpointSpout.class);
+
+ public static final String CHECKPOINT_STREAM_ID = "$checkpoint";
+ public static final String CHECKPOINT_COMPONENT_ID =
"$checkpointspout";
+ public static final String CHECKPOINT_FIELD_TXID = "txid";
+ public static final String CHECKPOINT_FIELD_ACTION = "action";
+ public static final String CHECKPOINT_ACTION_PREPARE = "prepare";
+ public static final String CHECKPOINT_ACTION_COMMIT = "commit";
+ public static final String CHECKPOINT_ACTION_ROLLBACK = "rollback";
+ public static final String CHECKPOINT_ACTION_INITSTATE = "initstate";
+
+ private static final String TX_STATE_KEY = "__state";
+ private static final int DEFAULT_CHECKPOINT_INTERVAL = 1000; // every
sec
+
+ private TopologyContext context;
+ private SpoutOutputCollector collector;
+ private long lastCheckpointTs;
+ private int checkpointInterval;
+ private boolean recoveryStepInProgress;
+ private boolean checkpointStepInProgress;
+ private boolean recovering;
+ private KeyValueState<String, CheckPointState> checkpointState;
+
+ @Override
+ public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
+ open(context, collector, loadCheckpointInterval(conf),
loadCheckpointState(conf, context));
+ }
+
+ // package access for unit test
+ void open(TopologyContext context, SpoutOutputCollector collector,
+ int checkpointInterval, KeyValueState<String,
CheckPointState> checkpointState) {
+ this.context = context;
+ this.collector = collector;
+ this.checkpointInterval = checkpointInterval;
+ this.checkpointState = checkpointState;
+ lastCheckpointTs = 0;
+ recoveryStepInProgress = false;
+ checkpointStepInProgress = false;
+ recovering = true;
+ }
+
+ @Override
+ public void nextTuple() {
+ if (shouldRecover()) {
+ LOG.debug("In recovery");
+ handleRecovery();
+ startProgress();
+ } else if (shouldCheckpoint()) {
+ LOG.debug("In checkpoint");
+ doCheckpoint();
+ startProgress();
+ }
+ }
+
+ @Override
+ public void ack(Object msgId) {
+ CheckPointState txState = getTxState();
+ LOG.debug("Got ack with txid {}, current txState {}", msgId,
txState);
+ if (txState.txid == ((Number) msgId).longValue()) {
+ if (recovering) {
+ handleRecoveryAck();
+ } else {
+ handleCheckpointAck();
+ }
+ } else {
+ LOG.warn("Ack msgid {}, txState.txid {} mismatch", msgId,
txState.txid);
+ }
+ resetProgress();
+ }
+
+ @Override
+ public void fail(Object msgId) {
+ LOG.debug("Got fail with msgid {}", msgId);
+ resetProgress();
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declareStream(CHECKPOINT_STREAM_ID, new
Fields(CHECKPOINT_FIELD_TXID, CHECKPOINT_FIELD_ACTION));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Config conf = new Config();
+ conf.put(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS, 100);
--- End diff --
It was added to sleep for slightly longer time rather than spinning in the
nextTuple. Instead of hard coding, will change this to 10% of the checkpoint
interval time.
> State store for windowing operations
> ------------------------------------
>
> Key: STORM-1175
> URL: https://issues.apache.org/jira/browse/STORM-1175
> Project: Apache Storm
> Issue Type: Sub-task
> Reporter: Arun Mahadevan
> Assignee: Arun Mahadevan
>
> Windowing operations should be able to save the results of partial
> computations in a state store. This should persist across restarts and bolts
> should be able to look up the values from the store.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)