[ 
https://issues.apache.org/jira/browse/STORM-1757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15284682#comment-15284682
 ] 

Robert Joseph Evans commented on STORM-1757:
--------------------------------------------

BEAM logically has a few base transforms, that we need to support.  We can 
extend this in the future to support more if we need/want to.

* *GroupByKey* - Groups by key and window
* CreatePCollectionView
* Read.Bounded - only for batch (which we probably can support, just not right 
now)
* *Read.Unbounded*
* *ParDo.Bound*
* ParDo.BoundMulti
* *FlattenPCollectionList* - a merge of PCollections (Streams)
* *Window.Bound*

The *Bold* ones are what I think we should concentrate on first, and then we 
can add in support for side inputs and outputs in a second phase, and finally 
add in support for bounded "batch" processing if we see a need to.

Like I stated before fault tolerance in an UnboundedSource is built around a 
checkpoint restore model.  While reading data we can ask the source for 
metadata that we can then use to restore the source back to a given point in 
time.

Windowing in BEAM requires check pointing and is based around the concept of a 
pane (which is a part of a window) and triggers.  The GroupByKey transform is 
special in that it not only groups by a key, but also groups by a window.  To 
do this windowed data will not be emitted until a trigger fires to start the 
processing.  And even when the trigger fires there is a choice to discard the 
panes that have already been processed or to retain them until the trigger 
indicates that it will never fire again.  Triggers usually are based off of a 
watermark that comes from the source.

All ParDos also have a concept around "bundles" that are batches of tuples 
being processed.  The DoFns can be informed of the start and end of a bundle, 
and in non-optimized cases will be recreated for each bundle.   

When we combine all of this together, we need a way to checkpoint/restore all 
of the state in a consistent way across the entire topology, preferably 
coordinated with the bundles as well.  We could do this one of two ways.  The 
simplest approach would be to have a trident like coordinator that 

  # Tells bolts to prepare for a new batch
  # wait for ack
  # Tells spouts to emit a new batch
  # wait for ack
  # Tells everyone to checkpoint
  # wait for ack

On success repeat. 

On failure:
  # Tell everyone to roll back to last successful checkpoint
  # Wait for ack
  # Start again

This would be a simple way to get things started, but it would be very slow 
because we cannot have more then one batch of tuples outstanding at any point 
in time, and even if we can combine a few steps together we will need at least 
two round trips through the topology to finish a batch.

If we want to try and go faster it is going to require more of an exchange of 
metadata, and check-pointing similar to how flink or apex do it.

https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html

Perhaps this is something that we should think about once we have a prototype 
working with the first implementation.  More on the distributed check pointing 
coming shortly...

> Apache Beam Runner for Storm
> ----------------------------
>
>                 Key: STORM-1757
>                 URL: https://issues.apache.org/jira/browse/STORM-1757
>             Project: Apache Storm
>          Issue Type: Brainstorming
>            Reporter: P. Taylor Goetz
>            Priority: Minor
>
> This is a call for interested parties to collaborate on an Apache Beam [1] 
> runner for Storm, and express their thoughts and opinions.
> Given the addition of the Windowing API to Apache Storm, we should be able to 
> map naturally to the Beam API. If not, it may be indicative of shortcomings 
> of the Storm API that should be addressed.
> [1] http://beam.incubator.apache.org



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to