Hi Eron,

I think the FLIP is crisp and mostly good to go. Some smaller
things/questions:

   1. SinkFunction#writeWatermark could be named
   SinkFunction#invokeWatermark or invokeOnWatermark to keep it symmetric.
   2. We could add the context parameter to both. For SinkWriter#Context,
   we currently do not gain much. SinkFunction#Context also exposes processing
   time, which may or may not be handy and is currently mostly used for
   StreamingFileSink bucket policies. We may add that processing time flag
   also to SinkWriter#Context in the future.
   3. Alternatively, we could also add a different context parameter just
   to keep the API stable while allowing additional information to be passed
   in the future.
   4. Would we run into any compatibility issue if we use Flink 1.13 source
   in Flink 1.14 (with this FLIP) or vice versa?
   5. What happens with sinks that use the new methods in applications that
   do not have watermarks (batch mode, processing time)? Does this also work
   with ingestion time sufficiently?
   6. How do exactly once sinks deal with written watermarks in case of
   failure? I guess it's the same as normal records. (Either rollback of
   transaction or deduplication on resumption)

Best,

Arvid

On Tue, May 25, 2021 at 6:44 PM Eron Wright <ewri...@streamnative.io.invalid>
wrote:

> Does anyone have further comment on FLIP-167?
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-167%3A+Watermarks+for+Sink+API
>
> Thanks,
> Eron
>
>
> On Thu, May 20, 2021 at 5:02 PM Eron Wright <ewri...@streamnative.io>
> wrote:
>
> > Filed FLIP-167: Watermarks for Sink API:
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-167%3A+Watermarks+for+Sink+API
> >
> > I'd like to call a vote next week, is that reasonable?
> >
> >
> > On Wed, May 19, 2021 at 6:28 PM Zhou, Brian <b.z...@dell.com> wrote:
> >
> >> Hi Arvid and Eron,
> >>
> >> Thanks for the discussion and I read through Eron's pull request and I
> >> think this can benefit Pravega Flink connector as well.
> >>
> >> Here is some background. Pravega had the watermark concept through the
> >> event stream since two years ago, and here is a blog introduction[1] for
> >> Pravega watermark.
> >> Pravega Flink connector also had this watermark integration last year
> >> that we wanted to propagate the Flink watermark to Pravega in the
> >> SinkFunction, and at that time we just used the existing Flink API that
> we
> >> keep the last watermark in memory and check if watermark changes for
> each
> >> event[2] which is not efficient. With such new interface, we can also
> >> manage the watermark propagation much more easily.
> >>
> >> [1] https://pravega.io/blog/2019/11/08/pravega-watermarking-support/
> >> [2]
> >>
> https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L465
> >>
> >> -----Original Message-----
> >> From: Arvid Heise <ar...@apache.org>
> >> Sent: Wednesday, May 19, 2021 16:06
> >> To: dev
> >> Subject: Re: [DISCUSS] Watermark propagation with Sink API
> >>
> >>
> >> [EXTERNAL EMAIL]
> >>
> >> Hi Eron,
> >>
> >> Thanks for pushing that topic. I can now see that the benefit is even
> >> bigger than I initially thought. So it's worthwhile anyways to include
> that.
> >>
> >> I also briefly thought about exposing watermarks to all UDFs, but here I
> >> really have an issue to see specific use cases. Could you maybe take a
> few
> >> minutes to think about it as well? I could only see someone misusing
> Async
> >> IO as a sink where a real sink would be more appropriate. In general, if
> >> there is not a clear use case, we shouldn't add the functionality as
> it's
> >> just increased maintenance for no value.
> >>
> >> If we stick to the plan, I think your PR is already in a good shape. We
> >> need to create a FLIP for it though, since it changes Public interfaces
> >> [1]. I was initially not convinced that we should also change the old
> >> SinkFunction interface, but seeing how little the change is, I wouldn't
> >> mind at all to increase consistency. Only when we wrote the FLIP and
> >> approved it (which should be minimal and fast), we should actually look
> at
> >> the PR ;).
> >>
> >> The only thing which I would improve is the name of the function.
> >> processWatermark sounds as if the sink implementer really needs to
> >> implement it (as you would need to do it on a custom operator). I would
> >> make them symmetric to the record writing/invoking method (e.g.
> >> writeWatermark and invokeWatermark).
> >>
> >> As a follow-up PR, we should then migrate KafkaShuffle to the new API.
> >> But that's something I can do.
> >>
> >> [1]
> >>
> >>
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/Flink*Improvement*Proposals__;Kys!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnp6nc7o$
> >> [cwiki[.]apache[.]org]
> >>
> >> On Wed, May 19, 2021 at 3:34 AM Eron Wright <ewri...@streamnative.io
> >> .invalid>
> >> wrote:
> >>
> >> > Update: opened an issue and a PR.
> >> >
> >> >
> https://urldefense.com/v3/__https://issues.apache.org/jira/browse/FLIN
> >> > K-22700__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dM
> >> > plbgRO4$ [issues[.]apache[.]org]
> >> >
> https://urldefense.com/v3/__https://github.com/apache/flink/pull/15950
> >> > __;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMtScmG7a
> >> > $ [github[.]com]
> >> >
> >> >
> >> > On Tue, May 18, 2021 at 10:03 AM Eron Wright <ewri...@streamnative.io
> >
> >> > wrote:
> >> >
> >> > > Thanks Arvid and David for sharing your ideas on this subject.  I'm
> >> > > glad to hear that you're seeing use cases for watermark propagation
> >> > > via an enhanced sink interface.
> >> > >
> >> > > As you've guessed, my interest is in Pulsar and am exploring some
> >> > > options for brokering watermarks across stream processing pipelines.
> >> > > I think
> >> > Arvid
> >> > > is speaking to a high-fidelity solution where the difference between
> >> > intra-
> >> > > and inter-pipeline flow is eliminated.  My goal is more limited; I
> >> > > want
> >> > to
> >> > > write the watermark that arrives at the sink to Pulsar.  Simply
> >> > > imagine that Pulsar has native support for watermarking in its
> >> > > producer/consumer API, and we'll leave the details to another forum.
> >> > >
> >> > > David, I like your invariant.  I see lateness as stemming from the
> >> > problem
> >> > > domain and from system dynamics (e.g. scheduling, batching, lag).
> >> > > When
> >> > one
> >> > > depends on order-of-observation to generate watermarks, the app may
> >> > become
> >> > > unduly sensitive to dynamics which bear on order-of-observation.  My
> >> > > goal is to factor out the system dynamics from lateness
> determination.
> >> > >
> >> > > Arvid, to be most valuable (at least for my purposes) the
> >> > > enhancement is needed on SinkFunction.  This will allow us to easily
> >> > > evolve the existing Pulsar connector.
> >> > >
> >> > > Next step, I will open a PR to advance the conversation.
> >> > >
> >> > > Eron
> >> > >
> >> > > On Tue, May 18, 2021 at 5:06 AM David Morávek
> >> > > <david.mora...@gmail.com>
> >> > > wrote:
> >> > >
> >> > >> Hi Eron,
> >> > >>
> >> > >> Thanks for starting this discussion. I've been thinking about this
> >> > >> recently as we've run into "watermark related" issues, when
> >> > >> chaining multiple pipelines together. My to cents to the
> >> > >> discussion:
> >> > >>
> >> > >> How I like to think about the problem, is that there should an
> >> > >> invariant that holds for any stream processing pipeline: "NON_LATE
> >> > >> element
> >> > entering
> >> > >> the system, should never become LATE"
> >> > >>
> >> > >> Unfortunately this is exactly what happens in downstream pipelines,
> >> > >> because the upstream one can:
> >> > >> - break ordering (especially with higher parallelism)
> >> > >> - emit elements that are ahead of output watermark
> >> > >>
> >> > >> There is not enough information to re-construct upstream watermark
> >> > >> in latter stages (it's always just an estimate based on previous
> >> > >> pipeline's output).
> >> > >>
> >> > >> It would be great, if we could have a general abstraction, that is
> >> > >> reusable for various sources / sinks (not just Kafka / Pulsar,
> >> > >> thought this would probably cover most of the use-cases) and
> >> > >> systems.
> >> > >>
> >> > >> Is there any other use-case then sharing watermark between
> >> > >> pipelines,
> >> > that
> >> > >> you're trying to solve?
> >> > >>
> >> > >> Arvid:
> >> > >>
> >> > >> 1. Watermarks are closely coupled to the used system (=Flink). I
> >> > >> have a
> >> > >> > hard time imagining that it's useful to use a different stream
> >> > processor
> >> > >> > downstream. So for now, I'm assuming that both upstream and
> >> > >> > downstream
> >> > >> are
> >> > >> > Flink applications. In that case, we probably define both parts
> >> > >> > of the pipeline in the same Flink job similar to KafkaStream's
> >> #through.
> >> > >> >
> >> > >>
> >> > >> I'd slightly disagree here. For example we're "materializing"
> >> > change-logs
> >> > >> produced by Flink pipeline into serving layer (random access db /
> >> > >> in memory view / ..) and we need to know, whether responses we
> >> > >> serve meet the "freshness" requirements (eg. you may want to
> >> > >> respond differently, when watermark is lagging way too much behind
> >> > >> processing time). Also not
> >> > every
> >> > >> stream processor in the pipeline needs to be Flink. It can as well
> >> > >> be a simple element-wise transformation that reads from Kafka and
> >> > >> writes back into separate topic (that's what we do for example with
> >> > >> ML models, that have special hardware requirements).
> >> > >>
> >> > >> Best,
> >> > >> D.
> >> > >>
> >> > >>
> >> > >> On Tue, May 18, 2021 at 8:30 AM Arvid Heise <ar...@apache.org>
> >> wrote:
> >> > >>
> >> > >> > Hi Eron,
> >> > >> >
> >> > >> > I think this is a useful addition for storage systems that act as
> >> > >> > pass-through for Flink to reduce recovery time. It is only useful
> >> > >> > if
> >> > you
> >> > >> > combine it with regional fail-over as only a small part of the
> >> > pipeline
> >> > >> is
> >> > >> > restarted.
> >> > >> >
> >> > >> > A couple of thoughts on the implications:
> >> > >> > 1. Watermarks are closely coupled to the used system (=Flink). I
> >> > >> > have
> >> > a
> >> > >> > hard time imagining that it's useful to use a different stream
> >> > processor
> >> > >> > downstream. So for now, I'm assuming that both upstream and
> >> > >> > downstream
> >> > >> are
> >> > >> > Flink applications. In that case, we probably define both parts
> >> > >> > of the pipeline in the same Flink job similar to KafkaStream's
> >> #through.
> >> > >> > 2. The schema of the respective intermediate stream/topic would
> >> > >> > need
> >> > to
> >> > >> be
> >> > >> > managed by Flink to encode both records and watermarks. This
> >> > >> > reduces
> >> > the
> >> > >> > usability quite a bit and needs to be carefully crafted.
> >> > >> > 3. It's not clear to me if constructs like SchemaRegistry can be
> >> > >> properly
> >> > >> > supported (and also if they should be supported) in terms of
> >> > >> > schema evolution.
> >> > >> > 4. Potentially, StreamStatus and LatencyMarker would also need to
> >> > >> > be encoded.
> >> > >> > 5. It's important to have some way to transport backpressure from
> >> > >> > the downstream to the upstream. Or else you would have the same
> >> > >> > issue as KafkaStreams where two separate pipelines can drift so
> >> > >> > far away that
> >> > you
> >> > >> > experience data loss if the data retention period is smaller than
> >> > >> > the drift.
> >> > >> > 6. It's clear that you trade a huge chunk of throughput for lower
> >> > >> overall
> >> > >> > latency in case of failure. So it's an interesting feature for
> >> > >> > use
> >> > cases
> >> > >> > with SLAs.
> >> > >> >
> >> > >> > Since we are phasing out SinkFunction, I'd prefer to only support
> >> > >> > SinkWriter. Having a no-op default sounds good to me.
> >> > >> >
> >> > >> > We have some experimental feature for Kafka [1], which pretty
> >> > >> > much
> >> > >> reflects
> >> > >> > your idea. Here we have an ugly workaround to be able to process
> >> > >> > the watermark by using a custom StreamSink task. We could also
> >> > >> > try to
> >> > >> create a
> >> > >> > FLIP that abstracts the actual system away and then we could use
> >> > >> > the approach for both Pulsar and Kafka.
> >> > >> >
> >> > >> > [1]
> >> > >> >
> >> > >> >
> >> > >>
> >> >
> https://urldefense.com/v3/__https://github.com/apache/flink/blob/maste
> >> > r/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flin
> >> > k/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java*L103__;Iw!
> >> > !LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMvmemHrt$
> >> > [github[.]com]
> >> > >> >
> >> > >> >
> >> > >> > On Mon, May 17, 2021 at 10:44 PM Eron Wright
> >> > >> > <ewri...@streamnative.io.invalid> wrote:
> >> > >> >
> >> > >> > > I would like to propose an enhancement to the Sink API, the
> >> > >> > > ability
> >> > to
> >> > >> > > receive upstream watermarks.   I'm aware that the sink context
> >> > >> provides
> >> > >> > the
> >> > >> > > current watermark for a given record.  I'd like to be able to
> >> > >> > > write
> >> > a
> >> > >> > sink
> >> > >> > > function that is invoked whenever the watermark changes.  Out
> >> > >> > > of
> >> > scope
> >> > >> > > would be event-time timers (since sinks aren't keyed).
> >> > >> > >
> >> > >> > > For context, imagine that a stream storage system had the
> >> > >> > > ability to persist watermarks in addition to ordinary elements,
> >> > >> > > e.g. to serve
> >> > as
> >> > >> > > source watermarks in a downstream processor.  Ideally one could
> >> > >> compose a
> >> > >> > > multi-stage, event-driven application, with watermarks flowing
> >> > >> end-to-end
> >> > >> > > without need for a heuristics-based watermark at each stage.
> >> > >> > >
> >> > >> > > The specific proposal would be a new method on `SinkFunction`
> >> > >> > > and/or
> >> > >> on
> >> > >> > > `SinkWriter`, called 'processWatermark' or 'writeWatermark',
> >> > >> > > with a
> >> > >> > default
> >> > >> > > implementation that does nothing.
> >> > >> > >
> >> > >> > > Thoughts?
> >> > >> > >
> >> > >> > > Thanks!
> >> > >> > > Eron Wright
> >> > >> > > StreamNative
> >> > >> > >
> >> > >> >
> >> > >>
> >> > >
> >> > >
> >> > > --
> >> > >
> >> > > Eron Wright   Cloud Engineering Lead
> >> > >
> >> > > p: +1 425 922 8617 <18163542939>
> >> > >
> >> > > streamnative.io |  Meet with me
> >> > > <
> https://urldefense.com/v3/__https://calendly.com/eronwright/regular
> >> > > -1-hour__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5
> >> > > dMtQrD25c$ [calendly[.]com]>
> >> > >
> >> > > <
> https://urldefense.com/v3/__https://github.com/streamnative__;!!LpK
> >> > > I!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnQskrSQ$
> >> > > [github[.]com]>
> >> > > <
> https://urldefense.com/v3/__https://www.linkedin.com/company/stream
> >> > > native/__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5
> >> > > dMqO4UZJa$ [linkedin[.]com]>
> >> > > <https://urldefense.com/v3/__https://twitter.com/streamnativeio/__
> ;!
> >> > > !LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMpbyC_rP$
> >> > > [twitter[.]com]>
> >> > >
> >> >
> >> >
> >> > --
> >> >
> >> > Eron Wright   Cloud Engineering Lead
> >> >
> >> > p: +1 425 922 8617 <18163542939>
> >> >
> >> > streamnative.io |  Meet with me
> >> > <
> https://urldefense.com/v3/__https://calendly.com/eronwright/regular-1
> >> > -hour__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMtQ
> >> > rD25c$ [calendly[.]com]>
> >> >
> >> > <https://urldefense.com/v3/__https://github.com/streamnative__;!!LpKI
> !
> >> > 2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnQskrSQ$
> >> > [github[.]com]>
> >> > <
> https://urldefense.com/v3/__https://www.linkedin.com/company/streamna
> >> > tive/__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMqO
> >> > 4UZJa$ [linkedin[.]com]>
> >> > <
> https://urldefense.com/v3/__https://twitter.com/streamnativeio/__;!!L
> >> > pKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMpbyC_rP$
> >> > [twitter[.]com]>
> >> >
> >>
> >
> >
> > --
> >
> > Eron Wright   Cloud Engineering Lead
> >
> > p: +1 425 922 8617 <18163542939>
> >
> > streamnative.io |  Meet with me
> > <https://calendly.com/eronwright/regular-1-hour>
> >
> > <https://github.com/streamnative>
> > <https://www.linkedin.com/company/streamnative/>
> > <https://twitter.com/streamnativeio/>
> >
>
>
> --
>
> Eron Wright   Cloud Engineering Lead
>
> p: +1 425 922 8617 <18163542939>
>
> streamnative.io |  Meet with me
> <https://calendly.com/eronwright/regular-1-hour>
>
> <https://github.com/streamnative>
> <https://www.linkedin.com/company/streamnative/>
> <https://twitter.com/streamnativeio/>
>

Reply via email to