[I think this has graduated to a +dev <[email protected]> thread] Yea, in Beam it is left up to the IOs primarily, hence the bundle finalization step, or allowed runners to have their own features of course. Dataflow also does have in-place pipeline update that restores the persisted checkpoints from one pipeline to another - same basic mechanism/idea as Spark Structured Streaming but different overall workflow. +Reuven Lax <[email protected]> has put a lot of thought into updating, checkpointing, resuming, etc. Runners differ a lot in these areas. Is there something that should graduate from runner-specific to the Beam model?
Kenn On Wed, Apr 7, 2021 at 11:28 AM Vincent Marquez <[email protected]> wrote: > Looks like this is a common source of confusion, I had similar questions > about checkpointing in the beam slack. > > In Spark Structured Streaming, checkpoints are saved to an *external* HDFS > location and persist *beyond* each run, so in the event of a stream > crashing, you can just point your next execution of the stream to the > checkpoint location. Kafka (or Kinesis/Redis Stream etc) offsets are > persisted in the checkpoint, so the stream would resume off of the last > committed checkpoint location. > > It doesn't seem Beam has an external checkpoint that persists beyond a > single stream execution, so in Beam with Kinesis I believe you'll have to > manage your own offsets deliberately with an external source if you want to > achieve 'exactly once' semantics in the event of shutting down a stream and > resuming it at a later point. > > In Kafka you don't need this since as long as we ensure our offsets are > committed in finalization of a bundle, the offsets for a particular group > id are stored on the server. > > > On Tue, Apr 6, 2021 at 3:13 PM Kenneth Knowles <[email protected]> wrote: > >> This sounds similar to the "Kafka Commit" in >> https://github.com/apache/beam/pull/12572 by +Boyuan Zhang >> <[email protected]> and also to how PubsubIO ACKs messages in the >> finalizer. I don't know much about KinesisIO or how Kinesis works. I was >> just asking to clarify, in case other folks know more, like +Alexey >> Romanenko <[email protected]> and +Ismaël Mejía >> <[email protected]> have modified KinesisIO. If the feature does not >> exist today, perhaps we can identify the best practices around this pattern. >> >> Kenn >> >> On Tue, Apr 6, 2021 at 1:59 PM Michael Luckey <[email protected]> >> wrote: >> >>> Hi Kenn, >>> >>> yes, resuming reading at the proper timestamp is exactly the issue we >>> are currently struggling with. E.g. with Kinesis Client Lib we could store >>> the last read within some dynamo table. This mechanism is not used with >>> beam, as we understand, the runner is responsible to track that checkpoint >>> mark. >>> >>> Now, obviously on restarting the pipeline, e.g. on non compatible >>> upgrade, that is, an pipeline update is just not feasible, there must be >>> some mechanism in place on how Dataflow will know where to continue. Is >>> that simply the pipeline name? Or is there more involved? So how does >>> checkpointing actually work here? >>> >>> Based on 'name', wouldn't that imply that something like (example taken >>> from >>> https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates >>> ) >>> >>> export REGION="us-central1" >>> >>> gcloud dataflow flex-template run "streaming-beam-sql-`date >>> +%Y%m%d-%H%M%S`" \ >>> --template-file-gcs-location "$TEMPLATE_PATH" \ >>> --parameters inputSubscription="$SUBSCRIPTION" \ >>> --parameters outputTable="$PROJECT:$DATASET.$TABLE" \ >>> --region "$REGION" >>> >>> will not resume on last read on rerun, because the name obviously >>> changes here? >>> >>> best, >>> >>> michel >>> >>> >>> >>> On Tue, Apr 6, 2021 at 10:38 PM Kenneth Knowles <[email protected]> wrote: >>> >>>> I would assume the main issue is resuming reading from the Kinesis >>>> stream from the last read? In the case for Pubsub (just as another example >>>> of the idea) this is part of the internal state of a pre-created >>>> subscription. >>>> >>>> Kenn >>>> >>>> On Tue, Apr 6, 2021 at 1:26 PM Michael Luckey <[email protected]> >>>> wrote: >>>> >>>>> Hi list, >>>>> >>>>> with our current project we are implementing our streaming pipeline >>>>> based on Google Dataflow. >>>>> >>>>> Essentially we receive input via Kinesis, doing some filtering, >>>>> enrichment and sessionizing and output to PubSub and/or google storage. >>>>> >>>>> After short investigations it is not clear to us, how checkpointing >>>>> will work running on Dataflow in connection with KinesisIO. Is there any >>>>> documentation/discussions to get a better understanding on how that will >>>>> be >>>>> working? Especially if we are forced to restart our pipelines, how could >>>>> we >>>>> ensure not to loose any events? >>>>> >>>>> As far as I understand currently, it should work 'auto-magically' but >>>>> it is not yet clear to us, how it will actually behave. Before we try to >>>>> start testing our expectations or even try to implement some >>>>> watermark-tracking by ourself we hoped to get some insights from other >>>>> users here. >>>>> >>>>> Any help appreciated. >>>>> >>>>> Best, >>>>> >>>>> michel >>>>> >>>> > > ~Vincent >
