[
https://issues.apache.org/jira/browse/STORM-1757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15284752#comment-15284752
]
Robert Joseph Evans commented on STORM-1757:
--------------------------------------------
To make distributed check pointing work we need more coordination between the
Sources and the bolts than ack/fail really offers. For this I would propose
that we have each Unbounded Source run as a spout. All of the spouts would
need to coordinate with each other so that the barriers can be emitted close to
one another, reducing the amount of data that would need to be buffered. We
would also need coordination for restoring checkpoints. For this I would
propose that we use zookeeper for this. Simply because it is already available
and we would not need to add anything new to base storm to support it.
All of the spouts would elect a leader through zookeeper. The leader would
then trigger all of the spouts to emit a barrier and checkpoint the spout
metadata. Because we are going to potentially have multiple checkpoints
outstanding at any point in time we will need to label all of the checkpoints.
I would label them with two numbers. The first would be the bundle/batch
number, the second would be the replay, or generation number. The bundle
number would increment for each barrier emitted, but would role back on
failure. The generation number would increment for any failure. This would
allow downstream bolts to be able to restore a checkpoint just by seeing the
bundle id.
Spouts would have acking to know if a downstream tuple failed. If an item
fails the spout would inform the leader through zookeeper of the bad batch.
The leader would then inform all of the other spouts to restore and start again.
Each spout would also have to inform the leader periodically when a batch is
fully acked. Once all of the spouts inform the leader that all of the tuples
are emitted, then the leader can inform the spouts that they can delete the old
checkpoints. They should also inform the downstream bolts as part of the
barrier so they can clean up their old checkpoints too.
We can work out the exact details of how this will work later on.
For the actual check pointing in the bolts only the GroupByKey transform would
need to do anything, and for simplicity it would checkpoint each pane and key
separately, so that the checkpoints are incremental, and so that we can support
very large windows without too much difficulty.
In general all of this seems totally doable, and actually not that difficult.
My biggest concern by far is around efficiency in the check pointing,
especially for large windows. The check pointing is something that we need to
do, and in the common case should be thrown away. So we want to be sure that
we optimize for throwing the data away. We can easily write something that can
be backed by HBase or most any nosql store. But that is going to add a lot of
iops and network load that I am not too thrilled about. But perhaps it does
not really matter for an initial deployment.
> Apache Beam Runner for Storm
> ----------------------------
>
> Key: STORM-1757
> URL: https://issues.apache.org/jira/browse/STORM-1757
> Project: Apache Storm
> Issue Type: Brainstorming
> Reporter: P. Taylor Goetz
> Priority: Minor
>
> This is a call for interested parties to collaborate on an Apache Beam [1]
> runner for Storm, and express their thoughts and opinions.
> Given the addition of the Windowing API to Apache Storm, we should be able to
> map naturally to the Beam API. If not, it may be indicative of shortcomings
> of the Storm API that should be addressed.
> [1] http://beam.incubator.apache.org
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)