[
https://issues.apache.org/jira/browse/STORM-1175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15068482#comment-15068482
]
ASF GitHub Bot commented on STORM-1175:
---------------------------------------
Github user harshach commented on a diff in the pull request:
https://github.com/apache/storm/pull/939#discussion_r48279456
--- Diff: storm-core/src/jvm/backtype/storm/spout/CheckpointSpout.java ---
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.spout;
+
+import backtype.storm.Config;
+import backtype.storm.state.KeyValueState;
+import backtype.storm.state.StateFactory;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import static backtype.storm.spout.CheckPointState.State.COMMITTED;
+import static backtype.storm.spout.CheckPointState.State.COMMITTING;
+import static backtype.storm.spout.CheckPointState.State.PREPARING;
+
+/**
+ * Emits checkpoint tuples which is used to save the state of the {@link
backtype.storm.topology.IStatefulComponent}
+ * across the topology. If a topology contains Stateful bolts, Checkpoint
spouts are automatically added
+ * to the topology. There is only one Checkpoint task per topology.
+ * <p/>
+ * Checkpoint spout stores its internal state in a {@link KeyValueState}.
The state transitions are as follows.
+ * <p/>
+ * <pre>
+ * ROLLBACK(tx2)
+ * <------------- PREPARE(tx2)
COMMIT(tx2)
+ * COMMITTED(tx1)-------------> PREPARING(tx2) -------------->
COMMITTING(tx2) -----------------> COMMITTED (tx2)
+ *
+ *
+ * </pre>
+ */
+public class CheckpointSpout extends BaseRichSpout {
+ private static final Logger LOG =
LoggerFactory.getLogger(CheckpointSpout.class);
+
+ public static final String CHECKPOINT_STREAM_ID = "$checkpoint";
+ public static final String CHECKPOINT_COMPONENT_ID =
"$checkpointspout";
+ public static final String CHECKPOINT_FIELD_TXID = "txid";
+ public static final String CHECKPOINT_FIELD_ACTION = "action";
+ public static final String CHECKPOINT_ACTION_PREPARE = "prepare";
+ public static final String CHECKPOINT_ACTION_COMMIT = "commit";
+ public static final String CHECKPOINT_ACTION_ROLLBACK = "rollback";
+ public static final String CHECKPOINT_ACTION_INITSTATE = "initstate";
+
+ private static final String TX_STATE_KEY = "__state";
+ private static final int DEFAULT_CHECKPOINT_INTERVAL = 1000; // every
sec
--- End diff --
can we have this as part of defaults.yaml
> State store for windowing operations
> ------------------------------------
>
> Key: STORM-1175
> URL: https://issues.apache.org/jira/browse/STORM-1175
> Project: Apache Storm
> Issue Type: Sub-task
> Reporter: Arun Mahadevan
> Assignee: Arun Mahadevan
>
> Windowing operations should be able to save the results of partial
> computations in a state store. This should persist across restarts and bolts
> should be able to look up the values from the store.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)