On Mon, Apr 12, 2021 at 3:02 PM Michael Luckey <[email protected]> wrote:

>
> After spending some time reading relevant source code I was also
> wondering, whether implementing similar approach would be feasible.
>
> As of my current understanding, there is a slight semantic difference in
> the workings of e.g. pubsub/kafka and kinesis. Whereas the former provide a
> mechanism to track the current state of the application on the serving side
> by the client sending an ack, kinesis seem to not provide this
> functionality. Although e.g. Kinesis Client Lib provides some kind of
> checkpointing by storing the latest position within a DynamoDB table, it is
> still the clients responsibility to properly position the shard iterator to
> the last processed sequence number.
>
> This, for instance, seems to make the reliance on DynamoDB unnecessary, we
> could use any persistent storage as checkpoint backend? Similar as it is
> currently done with spark runner implementation using checkpointing folder
> we might also use a simple gcs bucket for that. Of course, there might be
> pros/cons using Dynamo vs gcs vs something completely different.
>

This is a really interesting space between the concept of "ACKing a stream
source" (implicitly using the source's storage) and "source checkpoints"
(implicitly using a runner's storage) and "checkpointing a pipeline for
resume" (explicitly storing in some external system).

You may want to drain and replace a pipeline with an entirely different one
- for example when incompatible state changes are necessary - so the latter
two do not work for this case. You would want any incomplete aggregations
to _not_ be checkpointed, because they need to be reprocessed (your whole
system has to be idempotent in this way anyhow).

Kenn


>
> I was also wondering, why KinesisIO is using Kineses Data Stream API
> instead using the somehow higher level KCL, but I tend to believe, this
> decision was made to be more flexible in implementing exactly-once or such
> semantics? We most likely will not try to be compatible with the
> checkpointing mechanism implemented by KCL, which of course would be a
> reason to use DynomaDB table here, but use existing checkpointmark/coder
> implementations.
>
> What I currently do not fully understand is how we would resume. Whereas
> Kafka handles this on the serving side by just not delivering acknowledged
> message again, for Kinesis we would need to somehow pass the checkpointmark
> to the reader creation code. Maybe some additional codepath on
> DynamicCheckpointGenerator or such which could be passed
> shardIds/sequencenumbers resp these stored KinesisCheckpointMarks instead?
>
> Best,
>
> michel
>
> On Mon, Apr 12, 2021 at 7:48 PM Alexey Romanenko <[email protected]>
> wrote:
>
>> Just as an idea from top of my head - we probably can add a similar
>> functionality (optional), as we have with KafkaIO and Kafka offsets, to
>> KinesisIO and use DynamoDB for that. I didn’t go into the details too
>> deeply but on the first sight it seems feasible. In this case, it will work
>> for all runners despite of the Beam checkpointing mechanism implementation.
>>
>> Alexey
>>
>> On 8 Apr 2021, at 19:35, Michael Luckey <[email protected]> wrote:
>>
>> Hi,
>>
>> thanks for your help here. Really appreciated!
>>
>> @alexey We already found your answer (magically showed up in some search
>> engine results) and - iirc - while we were still using spark as underlying
>> runner on some other project I do think we relied on that mentioned
>> mechanism. But as with our current project we wanted to use Google
>> Dataflow, we got stuck trying to understand how checkpointing is going to
>> work there. As Vincent mentioned, documentation is sparse, confusing and
>> from google provided example it seems difficult to understand how
>> checkpointing is supposed to work, i.e. what requirements we (as users)
>> have to meet to get checkpointing on e.g. kinesis.
>>
>> As far as I understand now, it does not seem to be supported, which
>> unfortunately is a show-stopper for us and might force us to another runner
>> implementation.
>>
>> Regarding external checkpointing with Kinesis, iiuc there is some support
>> with Kinesis Client Library relaying to DynamoDB. Which works for us in a
>> different context, but this is not exposed by KinesisIO, which - of course
>> - led myself to the assumption there will be support by Beam itself - or -
>> as it might be by the underlying runner. Which seems to be provided by
>> spark (and possibly Flink), we might check that out. We would still prefer
>> to get something working on Dataflow. Without being required to roll our
>> own checkpointing implementation. Any Dataflow experts around who could
>> shed some light onto this? I d love to get that better documented for
>> others running into this issues.
>>
>> Best,
>>
>> michel
>>
>> On Wed, Apr 7, 2021 at 10:46 PM Vincent Marquez <
>> [email protected]> wrote:
>>
>>>
>>>
>>>
>>> On Wed, Apr 7, 2021 at 11:55 AM Kenneth Knowles <[email protected]> wrote:
>>>
>>>> [I think this has graduated to a +dev <[email protected]> thread]
>>>>
>>>> Yea, in Beam it is left up to the IOs primarily, hence the bundle
>>>> finalization step, or allowed runners to have their own features of course.
>>>> Dataflow also does have in-place pipeline update that restores the
>>>> persisted checkpoints from one pipeline to another - same basic
>>>> mechanism/idea as Spark Structured Streaming but different overall
>>>> workflow. +Reuven Lax <[email protected]> has put a lot of thought into
>>>> updating, checkpointing, resuming, etc. Runners differ a lot in these
>>>> areas. Is there something that should graduate from runner-specific to the
>>>> Beam model?
>>>>
>>>> Kenn
>>>>
>>>
>>> In some ways, having an external checkpoint mechanism makes programming
>>> IOs simpler.  Let's use Redis Streams as an example, as our company
>>> recently implemented a RedisStreamIO internally so the details are fresh.
>>>
>>> One requirement was the need to be able to shut down a streaming Beam
>>> Pipeline, and then restart it from a later point in time without lost data
>>> and without starting from the beginning of time.
>>>
>>> This meant that I need to ensure only elements that are finished
>>> processing in a bundle are committed as 'processed' back to the redis
>>> server, which I accomplished by keeping track of all the elements that are
>>> outputted, then on finalizeCheckpoint, which I *assume* happens at the end
>>> of a bundle but I'm fuzzy on details, send those element IDs back to the
>>> server as consumed (with Redis XACK).
>>>
>>> If instead Beam would let you persist checkpoints externally and allow a
>>> pipeline to bootstrap off of the already existing checkpoint, I simply have
>>> to keep track *in the checkpoint* of the last element ID read, and can use
>>> that as the starting offset.  I would then be able to 'eager ack' read
>>> messages and not worry about delaying commits until elements are outputted
>>> further down the pipeline etc, since if an element is read into a
>>> checkpoint, we know it is recoverable.
>>>
>>> This also makes life a lot easier for anything regarding Kinesis since
>>> the Kinesis servers don't have a way of managing offsets/last element read
>>> (from when I used it?, maybe changed), unlike Kafka, Pubsub, Redis Streams,
>>> etc.
>>>
>>> Hopefully this makes sense, and if I have some misunderstandings I'd
>>> love to learn more.  This general subject has come up a few times in the
>>> beam slack so I think at the very least some extra documentation on these
>>> types of use cases might be welcome.
>>>
>>>
>>>
>>>
>>>>
>>>> On Wed, Apr 7, 2021 at 11:28 AM Vincent Marquez <
>>>> [email protected]> wrote:
>>>>
>>>>> Looks like this is a common source of confusion, I had similar
>>>>> questions about checkpointing in the beam slack.
>>>>>
>>>>> In Spark Structured Streaming, checkpoints are saved to an *external*
>>>>> HDFS location and persist *beyond* each run, so in the event of a stream
>>>>> crashing, you can just point your next execution of the stream to the
>>>>> checkpoint location.  Kafka  (or Kinesis/Redis Stream etc) offsets are
>>>>> persisted in the checkpoint, so the stream would resume off of the last
>>>>> committed checkpoint location.
>>>>>
>>>>> It doesn't seem Beam has an external checkpoint that persists beyond a
>>>>> single stream execution, so in Beam with Kinesis I believe you'll have to
>>>>> manage your own offsets deliberately with an external source if you want 
>>>>> to
>>>>> achieve 'exactly once' semantics in the event of shutting down a stream 
>>>>> and
>>>>>  resuming it at a later point.
>>>>>
>>>>> In Kafka you don't need this since as long as we ensure our offsets
>>>>> are committed in finalization of a bundle, the offsets for a particular
>>>>> group id are stored on the server.
>>>>>
>>>>>
>>>>> On Tue, Apr 6, 2021 at 3:13 PM Kenneth Knowles <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> This sounds similar to the "Kafka Commit" in
>>>>>> https://github.com/apache/beam/pull/12572 by +Boyuan Zhang
>>>>>> <[email protected]> and also to how PubsubIO ACKs messages in the
>>>>>> finalizer. I don't know much about KinesisIO or how Kinesis works. I was
>>>>>> just asking to clarify, in case other folks know more, like +Alexey
>>>>>> Romanenko <[email protected]> and +Ismaël Mejía
>>>>>> <[email protected]> have modified KinesisIO. If the feature does not
>>>>>> exist today, perhaps we can identify the best practices around this 
>>>>>> pattern.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Tue, Apr 6, 2021 at 1:59 PM Michael Luckey <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Kenn,
>>>>>>>
>>>>>>> yes, resuming reading at the proper timestamp is exactly the issue
>>>>>>> we are currently struggling with. E.g. with Kinesis Client Lib we could
>>>>>>> store the last read within some dynamo table. This mechanism is not used
>>>>>>> with beam, as we understand, the runner is responsible to track that
>>>>>>> checkpoint mark.
>>>>>>>
>>>>>>> Now, obviously on restarting the pipeline, e.g. on non compatible
>>>>>>> upgrade, that is, an pipeline update is just not feasible, there must be
>>>>>>> some mechanism in place on how Dataflow will know where to continue. Is
>>>>>>> that simply the pipeline name? Or is there more involved? So how does
>>>>>>> checkpointing actually work here?
>>>>>>>
>>>>>>> Based on 'name', wouldn't that imply that something like (example
>>>>>>> taken from
>>>>>>> https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates
>>>>>>> )
>>>>>>>
>>>>>>>   export REGION="us-central1"
>>>>>>>
>>>>>>>   gcloud dataflow flex-template run "streaming-beam-sql-`date 
>>>>>>> +%Y%m%d-%H%M%S`" \
>>>>>>>     --template-file-gcs-location "$TEMPLATE_PATH" \
>>>>>>>     --parameters inputSubscription="$SUBSCRIPTION" \
>>>>>>>     --parameters outputTable="$PROJECT:$DATASET.$TABLE" \
>>>>>>>     --region "$REGION"
>>>>>>>
>>>>>>> will not resume on last read on rerun, because the name obviously
>>>>>>> changes here?
>>>>>>>
>>>>>>> best,
>>>>>>>
>>>>>>> michel
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Apr 6, 2021 at 10:38 PM Kenneth Knowles <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I would assume the main issue is resuming reading from the Kinesis
>>>>>>>> stream from the last read? In the case for Pubsub (just as another 
>>>>>>>> example
>>>>>>>> of the idea) this is part of the internal state of a pre-created
>>>>>>>> subscription.
>>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>> On Tue, Apr 6, 2021 at 1:26 PM Michael Luckey <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi list,
>>>>>>>>>
>>>>>>>>> with our current project we are implementing our streaming
>>>>>>>>> pipeline based on Google Dataflow.
>>>>>>>>>
>>>>>>>>> Essentially we receive input via Kinesis, doing some filtering,
>>>>>>>>> enrichment and sessionizing and output to PubSub and/or google 
>>>>>>>>> storage.
>>>>>>>>>
>>>>>>>>> After short investigations it is not clear to us, how
>>>>>>>>> checkpointing will work running on Dataflow in connection with 
>>>>>>>>> KinesisIO.
>>>>>>>>> Is there any documentation/discussions to get a better understanding 
>>>>>>>>> on how
>>>>>>>>> that will be working? Especially if we are forced to restart our 
>>>>>>>>> pipelines,
>>>>>>>>> how could we ensure not to loose any events?
>>>>>>>>>
>>>>>>>>> As far as I understand currently, it should work 'auto-magically'
>>>>>>>>> but it is not yet clear to us, how it will actually behave. Before we 
>>>>>>>>> try
>>>>>>>>> to start testing our expectations or even try to implement some
>>>>>>>>> watermark-tracking by ourself we hoped to get some insights from other
>>>>>>>>> users here.
>>>>>>>>>
>>>>>>>>> Any help appreciated.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>>
>>>>>>>>> michel
>>>>>>>>>
>>>>>>>>
>>>>>
>>>>> ~Vincent
>>>>>
>>>>
>>

Reply via email to