On Mon, Apr 12, 2021 at 3:02 PM Michael Luckey <[email protected]> wrote:
> > After spending some time reading relevant source code I was also > wondering, whether implementing similar approach would be feasible. > > As of my current understanding, there is a slight semantic difference in > the workings of e.g. pubsub/kafka and kinesis. Whereas the former provide a > mechanism to track the current state of the application on the serving side > by the client sending an ack, kinesis seem to not provide this > functionality. Although e.g. Kinesis Client Lib provides some kind of > checkpointing by storing the latest position within a DynamoDB table, it is > still the clients responsibility to properly position the shard iterator to > the last processed sequence number. > > This, for instance, seems to make the reliance on DynamoDB unnecessary, we > could use any persistent storage as checkpoint backend? Similar as it is > currently done with spark runner implementation using checkpointing folder > we might also use a simple gcs bucket for that. Of course, there might be > pros/cons using Dynamo vs gcs vs something completely different. > This is a really interesting space between the concept of "ACKing a stream source" (implicitly using the source's storage) and "source checkpoints" (implicitly using a runner's storage) and "checkpointing a pipeline for resume" (explicitly storing in some external system). You may want to drain and replace a pipeline with an entirely different one - for example when incompatible state changes are necessary - so the latter two do not work for this case. You would want any incomplete aggregations to _not_ be checkpointed, because they need to be reprocessed (your whole system has to be idempotent in this way anyhow). Kenn > > I was also wondering, why KinesisIO is using Kineses Data Stream API > instead using the somehow higher level KCL, but I tend to believe, this > decision was made to be more flexible in implementing exactly-once or such > semantics? We most likely will not try to be compatible with the > checkpointing mechanism implemented by KCL, which of course would be a > reason to use DynomaDB table here, but use existing checkpointmark/coder > implementations. > > What I currently do not fully understand is how we would resume. Whereas > Kafka handles this on the serving side by just not delivering acknowledged > message again, for Kinesis we would need to somehow pass the checkpointmark > to the reader creation code. Maybe some additional codepath on > DynamicCheckpointGenerator or such which could be passed > shardIds/sequencenumbers resp these stored KinesisCheckpointMarks instead? > > Best, > > michel > > On Mon, Apr 12, 2021 at 7:48 PM Alexey Romanenko <[email protected]> > wrote: > >> Just as an idea from top of my head - we probably can add a similar >> functionality (optional), as we have with KafkaIO and Kafka offsets, to >> KinesisIO and use DynamoDB for that. I didn’t go into the details too >> deeply but on the first sight it seems feasible. In this case, it will work >> for all runners despite of the Beam checkpointing mechanism implementation. >> >> Alexey >> >> On 8 Apr 2021, at 19:35, Michael Luckey <[email protected]> wrote: >> >> Hi, >> >> thanks for your help here. Really appreciated! >> >> @alexey We already found your answer (magically showed up in some search >> engine results) and - iirc - while we were still using spark as underlying >> runner on some other project I do think we relied on that mentioned >> mechanism. But as with our current project we wanted to use Google >> Dataflow, we got stuck trying to understand how checkpointing is going to >> work there. As Vincent mentioned, documentation is sparse, confusing and >> from google provided example it seems difficult to understand how >> checkpointing is supposed to work, i.e. what requirements we (as users) >> have to meet to get checkpointing on e.g. kinesis. >> >> As far as I understand now, it does not seem to be supported, which >> unfortunately is a show-stopper for us and might force us to another runner >> implementation. >> >> Regarding external checkpointing with Kinesis, iiuc there is some support >> with Kinesis Client Library relaying to DynamoDB. Which works for us in a >> different context, but this is not exposed by KinesisIO, which - of course >> - led myself to the assumption there will be support by Beam itself - or - >> as it might be by the underlying runner. Which seems to be provided by >> spark (and possibly Flink), we might check that out. We would still prefer >> to get something working on Dataflow. Without being required to roll our >> own checkpointing implementation. Any Dataflow experts around who could >> shed some light onto this? I d love to get that better documented for >> others running into this issues. >> >> Best, >> >> michel >> >> On Wed, Apr 7, 2021 at 10:46 PM Vincent Marquez < >> [email protected]> wrote: >> >>> >>> >>> >>> On Wed, Apr 7, 2021 at 11:55 AM Kenneth Knowles <[email protected]> wrote: >>> >>>> [I think this has graduated to a +dev <[email protected]> thread] >>>> >>>> Yea, in Beam it is left up to the IOs primarily, hence the bundle >>>> finalization step, or allowed runners to have their own features of course. >>>> Dataflow also does have in-place pipeline update that restores the >>>> persisted checkpoints from one pipeline to another - same basic >>>> mechanism/idea as Spark Structured Streaming but different overall >>>> workflow. +Reuven Lax <[email protected]> has put a lot of thought into >>>> updating, checkpointing, resuming, etc. Runners differ a lot in these >>>> areas. Is there something that should graduate from runner-specific to the >>>> Beam model? >>>> >>>> Kenn >>>> >>> >>> In some ways, having an external checkpoint mechanism makes programming >>> IOs simpler. Let's use Redis Streams as an example, as our company >>> recently implemented a RedisStreamIO internally so the details are fresh. >>> >>> One requirement was the need to be able to shut down a streaming Beam >>> Pipeline, and then restart it from a later point in time without lost data >>> and without starting from the beginning of time. >>> >>> This meant that I need to ensure only elements that are finished >>> processing in a bundle are committed as 'processed' back to the redis >>> server, which I accomplished by keeping track of all the elements that are >>> outputted, then on finalizeCheckpoint, which I *assume* happens at the end >>> of a bundle but I'm fuzzy on details, send those element IDs back to the >>> server as consumed (with Redis XACK). >>> >>> If instead Beam would let you persist checkpoints externally and allow a >>> pipeline to bootstrap off of the already existing checkpoint, I simply have >>> to keep track *in the checkpoint* of the last element ID read, and can use >>> that as the starting offset. I would then be able to 'eager ack' read >>> messages and not worry about delaying commits until elements are outputted >>> further down the pipeline etc, since if an element is read into a >>> checkpoint, we know it is recoverable. >>> >>> This also makes life a lot easier for anything regarding Kinesis since >>> the Kinesis servers don't have a way of managing offsets/last element read >>> (from when I used it?, maybe changed), unlike Kafka, Pubsub, Redis Streams, >>> etc. >>> >>> Hopefully this makes sense, and if I have some misunderstandings I'd >>> love to learn more. This general subject has come up a few times in the >>> beam slack so I think at the very least some extra documentation on these >>> types of use cases might be welcome. >>> >>> >>> >>> >>>> >>>> On Wed, Apr 7, 2021 at 11:28 AM Vincent Marquez < >>>> [email protected]> wrote: >>>> >>>>> Looks like this is a common source of confusion, I had similar >>>>> questions about checkpointing in the beam slack. >>>>> >>>>> In Spark Structured Streaming, checkpoints are saved to an *external* >>>>> HDFS location and persist *beyond* each run, so in the event of a stream >>>>> crashing, you can just point your next execution of the stream to the >>>>> checkpoint location. Kafka (or Kinesis/Redis Stream etc) offsets are >>>>> persisted in the checkpoint, so the stream would resume off of the last >>>>> committed checkpoint location. >>>>> >>>>> It doesn't seem Beam has an external checkpoint that persists beyond a >>>>> single stream execution, so in Beam with Kinesis I believe you'll have to >>>>> manage your own offsets deliberately with an external source if you want >>>>> to >>>>> achieve 'exactly once' semantics in the event of shutting down a stream >>>>> and >>>>> resuming it at a later point. >>>>> >>>>> In Kafka you don't need this since as long as we ensure our offsets >>>>> are committed in finalization of a bundle, the offsets for a particular >>>>> group id are stored on the server. >>>>> >>>>> >>>>> On Tue, Apr 6, 2021 at 3:13 PM Kenneth Knowles <[email protected]> >>>>> wrote: >>>>> >>>>>> This sounds similar to the "Kafka Commit" in >>>>>> https://github.com/apache/beam/pull/12572 by +Boyuan Zhang >>>>>> <[email protected]> and also to how PubsubIO ACKs messages in the >>>>>> finalizer. I don't know much about KinesisIO or how Kinesis works. I was >>>>>> just asking to clarify, in case other folks know more, like +Alexey >>>>>> Romanenko <[email protected]> and +Ismaël Mejía >>>>>> <[email protected]> have modified KinesisIO. If the feature does not >>>>>> exist today, perhaps we can identify the best practices around this >>>>>> pattern. >>>>>> >>>>>> Kenn >>>>>> >>>>>> On Tue, Apr 6, 2021 at 1:59 PM Michael Luckey <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi Kenn, >>>>>>> >>>>>>> yes, resuming reading at the proper timestamp is exactly the issue >>>>>>> we are currently struggling with. E.g. with Kinesis Client Lib we could >>>>>>> store the last read within some dynamo table. This mechanism is not used >>>>>>> with beam, as we understand, the runner is responsible to track that >>>>>>> checkpoint mark. >>>>>>> >>>>>>> Now, obviously on restarting the pipeline, e.g. on non compatible >>>>>>> upgrade, that is, an pipeline update is just not feasible, there must be >>>>>>> some mechanism in place on how Dataflow will know where to continue. Is >>>>>>> that simply the pipeline name? Or is there more involved? So how does >>>>>>> checkpointing actually work here? >>>>>>> >>>>>>> Based on 'name', wouldn't that imply that something like (example >>>>>>> taken from >>>>>>> https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates >>>>>>> ) >>>>>>> >>>>>>> export REGION="us-central1" >>>>>>> >>>>>>> gcloud dataflow flex-template run "streaming-beam-sql-`date >>>>>>> +%Y%m%d-%H%M%S`" \ >>>>>>> --template-file-gcs-location "$TEMPLATE_PATH" \ >>>>>>> --parameters inputSubscription="$SUBSCRIPTION" \ >>>>>>> --parameters outputTable="$PROJECT:$DATASET.$TABLE" \ >>>>>>> --region "$REGION" >>>>>>> >>>>>>> will not resume on last read on rerun, because the name obviously >>>>>>> changes here? >>>>>>> >>>>>>> best, >>>>>>> >>>>>>> michel >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Apr 6, 2021 at 10:38 PM Kenneth Knowles <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> I would assume the main issue is resuming reading from the Kinesis >>>>>>>> stream from the last read? In the case for Pubsub (just as another >>>>>>>> example >>>>>>>> of the idea) this is part of the internal state of a pre-created >>>>>>>> subscription. >>>>>>>> >>>>>>>> Kenn >>>>>>>> >>>>>>>> On Tue, Apr 6, 2021 at 1:26 PM Michael Luckey <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi list, >>>>>>>>> >>>>>>>>> with our current project we are implementing our streaming >>>>>>>>> pipeline based on Google Dataflow. >>>>>>>>> >>>>>>>>> Essentially we receive input via Kinesis, doing some filtering, >>>>>>>>> enrichment and sessionizing and output to PubSub and/or google >>>>>>>>> storage. >>>>>>>>> >>>>>>>>> After short investigations it is not clear to us, how >>>>>>>>> checkpointing will work running on Dataflow in connection with >>>>>>>>> KinesisIO. >>>>>>>>> Is there any documentation/discussions to get a better understanding >>>>>>>>> on how >>>>>>>>> that will be working? Especially if we are forced to restart our >>>>>>>>> pipelines, >>>>>>>>> how could we ensure not to loose any events? >>>>>>>>> >>>>>>>>> As far as I understand currently, it should work 'auto-magically' >>>>>>>>> but it is not yet clear to us, how it will actually behave. Before we >>>>>>>>> try >>>>>>>>> to start testing our expectations or even try to implement some >>>>>>>>> watermark-tracking by ourself we hoped to get some insights from other >>>>>>>>> users here. >>>>>>>>> >>>>>>>>> Any help appreciated. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> >>>>>>>>> michel >>>>>>>>> >>>>>>>> >>>>> >>>>> ~Vincent >>>>> >>>> >>
