Good point. I hope I got it right:

If I understand correctly, this is somehow expressed by the difference in
 'checkpointing' as e.g. implemented with the spark runner (as pointed out
by Alexey above) and finalising a checkpoint as done e.g. with Kafka
[1,2,3] (which is a noop on kinesisio implementation [4])

So it seems to be more appropriate to store with DynamoDB (as suggested by
Alexey) to kind of 'implicitly use the source's storage', right?

As a matter of fact by adopting the format used by the KCL this would lead
to the - maybe to be expected - behaviour that _any_ client connecting as
that application will resume on the stored checkpoint, which seems to be
the implemented protocol by KCL. Whereas implementing the storage
differently might be considered as 'to close to beam' especially as reusing
the implicit CheckpointMarkCoder might just be broken because of coder
incompatibilities incurred by that incompatible state changes.

[1]
https://github.com/apache/beam/blob/f805f1cf0aef34dd1e6ce8fa4e1972f0a68506c6/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
#L57-L64
[2]
https://github.com/apache/beam/blob/f805f1cf0aef34dd1e6ce8fa4e1972f0a68506c6/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L530
[3]
https://github.com/apache/beam/blob/f805f1cf0aef34dd1e6ce8fa4e1972f0a68506c6/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L547-L571
[4]
https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisReaderCheckpoint.java#L65


On Tue, Apr 13, 2021 at 12:56 AM Kenneth Knowles <[email protected]> wrote:

>
>
> On Mon, Apr 12, 2021 at 3:02 PM Michael Luckey <[email protected]>
> wrote:
>
>>
>> After spending some time reading relevant source code I was also
>> wondering, whether implementing similar approach would be feasible.
>>
>> As of my current understanding, there is a slight semantic difference in
>> the workings of e.g. pubsub/kafka and kinesis. Whereas the former provide a
>> mechanism to track the current state of the application on the serving side
>> by the client sending an ack, kinesis seem to not provide this
>> functionality. Although e.g. Kinesis Client Lib provides some kind of
>> checkpointing by storing the latest position within a DynamoDB table, it is
>> still the clients responsibility to properly position the shard iterator to
>> the last processed sequence number.
>>
>> This, for instance, seems to make the reliance on DynamoDB unnecessary,
>> we could use any persistent storage as checkpoint backend? Similar as it is
>> currently done with spark runner implementation using checkpointing folder
>> we might also use a simple gcs bucket for that. Of course, there might be
>> pros/cons using Dynamo vs gcs vs something completely different.
>>
>
> This is a really interesting space between the concept of "ACKing a stream
> source" (implicitly using the source's storage) and "source checkpoints"
> (implicitly using a runner's storage) and "checkpointing a pipeline for
> resume" (explicitly storing in some external system).
>
> You may want to drain and replace a pipeline with an entirely different
> one - for example when incompatible state changes are necessary - so the
> latter two do not work for this case. You would want any incomplete
> aggregations to _not_ be checkpointed, because they need to be reprocessed
> (your whole system has to be idempotent in this way anyhow).
>
> Kenn
>
>
>>
>> I was also wondering, why KinesisIO is using Kineses Data Stream API
>> instead using the somehow higher level KCL, but I tend to believe, this
>> decision was made to be more flexible in implementing exactly-once or such
>> semantics? We most likely will not try to be compatible with the
>> checkpointing mechanism implemented by KCL, which of course would be a
>> reason to use DynomaDB table here, but use existing checkpointmark/coder
>> implementations.
>>
>> What I currently do not fully understand is how we would resume. Whereas
>> Kafka handles this on the serving side by just not delivering acknowledged
>> message again, for Kinesis we would need to somehow pass the checkpointmark
>> to the reader creation code. Maybe some additional codepath on
>> DynamicCheckpointGenerator or such which could be passed
>> shardIds/sequencenumbers resp these stored KinesisCheckpointMarks instead?
>>
>> Best,
>>
>> michel
>>
>> On Mon, Apr 12, 2021 at 7:48 PM Alexey Romanenko <
>> [email protected]> wrote:
>>
>>> Just as an idea from top of my head - we probably can add a similar
>>> functionality (optional), as we have with KafkaIO and Kafka offsets, to
>>> KinesisIO and use DynamoDB for that. I didn’t go into the details too
>>> deeply but on the first sight it seems feasible. In this case, it will work
>>> for all runners despite of the Beam checkpointing mechanism implementation.
>>>
>>> Alexey
>>>
>>> On 8 Apr 2021, at 19:35, Michael Luckey <[email protected]> wrote:
>>>
>>> Hi,
>>>
>>> thanks for your help here. Really appreciated!
>>>
>>> @alexey We already found your answer (magically showed up in some search
>>> engine results) and - iirc - while we were still using spark as underlying
>>> runner on some other project I do think we relied on that mentioned
>>> mechanism. But as with our current project we wanted to use Google
>>> Dataflow, we got stuck trying to understand how checkpointing is going to
>>> work there. As Vincent mentioned, documentation is sparse, confusing and
>>> from google provided example it seems difficult to understand how
>>> checkpointing is supposed to work, i.e. what requirements we (as users)
>>> have to meet to get checkpointing on e.g. kinesis.
>>>
>>> As far as I understand now, it does not seem to be supported, which
>>> unfortunately is a show-stopper for us and might force us to another runner
>>> implementation.
>>>
>>> Regarding external checkpointing with Kinesis, iiuc there is some
>>> support with Kinesis Client Library relaying to DynamoDB. Which works for
>>> us in a different context, but this is not exposed by KinesisIO, which - of
>>> course - led myself to the assumption there will be support by Beam itself
>>> - or - as it might be by the underlying runner. Which seems to be provided
>>> by spark (and possibly Flink), we might check that out. We would still
>>> prefer to get something working on Dataflow. Without being required to roll
>>> our own checkpointing implementation. Any Dataflow experts around who could
>>> shed some light onto this? I d love to get that better documented for
>>> others running into this issues.
>>>
>>> Best,
>>>
>>> michel
>>>
>>> On Wed, Apr 7, 2021 at 10:46 PM Vincent Marquez <
>>> [email protected]> wrote:
>>>
>>>>
>>>>
>>>>
>>>> On Wed, Apr 7, 2021 at 11:55 AM Kenneth Knowles <[email protected]>
>>>> wrote:
>>>>
>>>>> [I think this has graduated to a +dev <[email protected]> thread]
>>>>>
>>>>> Yea, in Beam it is left up to the IOs primarily, hence the bundle
>>>>> finalization step, or allowed runners to have their own features of 
>>>>> course.
>>>>> Dataflow also does have in-place pipeline update that restores the
>>>>> persisted checkpoints from one pipeline to another - same basic
>>>>> mechanism/idea as Spark Structured Streaming but different overall
>>>>> workflow. +Reuven Lax <[email protected]> has put a lot of thought
>>>>> into updating, checkpointing, resuming, etc. Runners differ a lot in these
>>>>> areas. Is there something that should graduate from runner-specific to the
>>>>> Beam model?
>>>>>
>>>>> Kenn
>>>>>
>>>>
>>>> In some ways, having an external checkpoint mechanism makes programming
>>>> IOs simpler.  Let's use Redis Streams as an example, as our company
>>>> recently implemented a RedisStreamIO internally so the details are fresh.
>>>>
>>>> One requirement was the need to be able to shut down a streaming Beam
>>>> Pipeline, and then restart it from a later point in time without lost data
>>>> and without starting from the beginning of time.
>>>>
>>>> This meant that I need to ensure only elements that are finished
>>>> processing in a bundle are committed as 'processed' back to the redis
>>>> server, which I accomplished by keeping track of all the elements that are
>>>> outputted, then on finalizeCheckpoint, which I *assume* happens at the end
>>>> of a bundle but I'm fuzzy on details, send those element IDs back to the
>>>> server as consumed (with Redis XACK).
>>>>
>>>> If instead Beam would let you persist checkpoints externally and allow
>>>> a pipeline to bootstrap off of the already existing checkpoint, I simply
>>>> have to keep track *in the checkpoint* of the last element ID read, and can
>>>> use that as the starting offset.  I would then be able to 'eager ack' read
>>>> messages and not worry about delaying commits until elements are outputted
>>>> further down the pipeline etc, since if an element is read into a
>>>> checkpoint, we know it is recoverable.
>>>>
>>>> This also makes life a lot easier for anything regarding Kinesis since
>>>> the Kinesis servers don't have a way of managing offsets/last element read
>>>> (from when I used it?, maybe changed), unlike Kafka, Pubsub, Redis Streams,
>>>> etc.
>>>>
>>>> Hopefully this makes sense, and if I have some misunderstandings I'd
>>>> love to learn more.  This general subject has come up a few times in the
>>>> beam slack so I think at the very least some extra documentation on these
>>>> types of use cases might be welcome.
>>>>
>>>>
>>>>
>>>>
>>>>>
>>>>> On Wed, Apr 7, 2021 at 11:28 AM Vincent Marquez <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Looks like this is a common source of confusion, I had similar
>>>>>> questions about checkpointing in the beam slack.
>>>>>>
>>>>>> In Spark Structured Streaming, checkpoints are saved to an *external*
>>>>>> HDFS location and persist *beyond* each run, so in the event of a stream
>>>>>> crashing, you can just point your next execution of the stream to the
>>>>>> checkpoint location.  Kafka  (or Kinesis/Redis Stream etc) offsets are
>>>>>> persisted in the checkpoint, so the stream would resume off of the last
>>>>>> committed checkpoint location.
>>>>>>
>>>>>> It doesn't seem Beam has an external checkpoint that persists beyond
>>>>>> a single stream execution, so in Beam with Kinesis I believe you'll have 
>>>>>> to
>>>>>> manage your own offsets deliberately with an external source if you want 
>>>>>> to
>>>>>> achieve 'exactly once' semantics in the event of shutting down a stream 
>>>>>> and
>>>>>>  resuming it at a later point.
>>>>>>
>>>>>> In Kafka you don't need this since as long as we ensure our offsets
>>>>>> are committed in finalization of a bundle, the offsets for a particular
>>>>>> group id are stored on the server.
>>>>>>
>>>>>>
>>>>>> On Tue, Apr 6, 2021 at 3:13 PM Kenneth Knowles <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> This sounds similar to the "Kafka Commit" in
>>>>>>> https://github.com/apache/beam/pull/12572 by +Boyuan Zhang
>>>>>>> <[email protected]> and also to how PubsubIO ACKs messages in the
>>>>>>> finalizer. I don't know much about KinesisIO or how Kinesis works. I was
>>>>>>> just asking to clarify, in case other folks know more, like +Alexey
>>>>>>> Romanenko <[email protected]> and +Ismaël Mejía
>>>>>>> <[email protected]> have modified KinesisIO. If the feature does
>>>>>>> not exist today, perhaps we can identify the best practices around this
>>>>>>> pattern.
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>> On Tue, Apr 6, 2021 at 1:59 PM Michael Luckey <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Kenn,
>>>>>>>>
>>>>>>>> yes, resuming reading at the proper timestamp is exactly the issue
>>>>>>>> we are currently struggling with. E.g. with Kinesis Client Lib we could
>>>>>>>> store the last read within some dynamo table. This mechanism is not 
>>>>>>>> used
>>>>>>>> with beam, as we understand, the runner is responsible to track that
>>>>>>>> checkpoint mark.
>>>>>>>>
>>>>>>>> Now, obviously on restarting the pipeline, e.g. on non compatible
>>>>>>>> upgrade, that is, an pipeline update is just not feasible, there must 
>>>>>>>> be
>>>>>>>> some mechanism in place on how Dataflow will know where to continue. Is
>>>>>>>> that simply the pipeline name? Or is there more involved? So how does
>>>>>>>> checkpointing actually work here?
>>>>>>>>
>>>>>>>> Based on 'name', wouldn't that imply that something like (example
>>>>>>>> taken from
>>>>>>>> https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates
>>>>>>>> )
>>>>>>>>
>>>>>>>>   export REGION="us-central1"
>>>>>>>>
>>>>>>>>   gcloud dataflow flex-template run "streaming-beam-sql-`date 
>>>>>>>> +%Y%m%d-%H%M%S`" \
>>>>>>>>     --template-file-gcs-location "$TEMPLATE_PATH" \
>>>>>>>>     --parameters inputSubscription="$SUBSCRIPTION" \
>>>>>>>>     --parameters outputTable="$PROJECT:$DATASET.$TABLE" \
>>>>>>>>     --region "$REGION"
>>>>>>>>
>>>>>>>> will not resume on last read on rerun, because the name obviously
>>>>>>>> changes here?
>>>>>>>>
>>>>>>>> best,
>>>>>>>>
>>>>>>>> michel
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Apr 6, 2021 at 10:38 PM Kenneth Knowles <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I would assume the main issue is resuming reading from the Kinesis
>>>>>>>>> stream from the last read? In the case for Pubsub (just as another 
>>>>>>>>> example
>>>>>>>>> of the idea) this is part of the internal state of a pre-created
>>>>>>>>> subscription.
>>>>>>>>>
>>>>>>>>> Kenn
>>>>>>>>>
>>>>>>>>> On Tue, Apr 6, 2021 at 1:26 PM Michael Luckey <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi list,
>>>>>>>>>>
>>>>>>>>>> with our current project we are implementing our streaming
>>>>>>>>>> pipeline based on Google Dataflow.
>>>>>>>>>>
>>>>>>>>>> Essentially we receive input via Kinesis, doing some filtering,
>>>>>>>>>> enrichment and sessionizing and output to PubSub and/or google 
>>>>>>>>>> storage.
>>>>>>>>>>
>>>>>>>>>> After short investigations it is not clear to us, how
>>>>>>>>>> checkpointing will work running on Dataflow in connection with 
>>>>>>>>>> KinesisIO.
>>>>>>>>>> Is there any documentation/discussions to get a better understanding 
>>>>>>>>>> on how
>>>>>>>>>> that will be working? Especially if we are forced to restart our 
>>>>>>>>>> pipelines,
>>>>>>>>>> how could we ensure not to loose any events?
>>>>>>>>>>
>>>>>>>>>> As far as I understand currently, it should work 'auto-magically'
>>>>>>>>>> but it is not yet clear to us, how it will actually behave. Before 
>>>>>>>>>> we try
>>>>>>>>>> to start testing our expectations or even try to implement some
>>>>>>>>>> watermark-tracking by ourself we hoped to get some insights from 
>>>>>>>>>> other
>>>>>>>>>> users here.
>>>>>>>>>>
>>>>>>>>>> Any help appreciated.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>>
>>>>>>>>>> michel
>>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>>> ~Vincent
>>>>>>
>>>>>
>>>

Reply via email to