Hi Till,

This morning I also realized what you call an 'effective watermark' is
indeed what is needed.
I'm going to read up on what Pulsar has planned.

What I realized is that the consuming application must be aware of the
parallelism of the producing application, which is independent of the
partitions in the intermediate transport.

Assume I produce in parallel 2 and have 5 kafka partition which I then read
in parallel 3; then in the consuming (parallel 3) application I must wait
for watermarks from each original input before I can continue: which is 2
Also we must assume that those watermarks are created at different
timestamps.
So my current assessment is that the watermark records must include at
least the timestamp, the number of the thread for this watermark and the
total number of threads .

Niels


On Mon, Dec 20, 2021 at 10:10 AM Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Niels,
>
> if you have multiple inputs going into a single Kafka partition then you
> have to calculate the effective watermark by looking at the min watermark
> from all inputs. You could insert a Flink operator that takes care of it
> and then writes to a set of partitions in 1:n relationship. Alternatively,
> you could take a look at Pulsar that wants to support this functionality
> out of the box [1].
>
> [1] https://github.com/apache/pulsar/issues/12267
>
> Cheers,
> Till
>
> On Sun, Dec 19, 2021 at 4:46 PM Niels Basjes <ni...@basjes.nl> wrote:
>
> > Hi,
> >
> > About a year ago I spoke at the Flink Forward conference (
> > https://www.youtube.com/watch?v=wqRDyrE3dwg ) about handling development
> > problems regarding streaming applications and handling the lack of events
> > in a stream.
> > Something I spoke about towards the end of this talk was the idea to ship
> > the watermarks of a Flink topology into the intermediate transport
> between
> > applications so you wouldn't need to recreate them.
> >
> > At that time it was just an idea, today I'm actually trying to build that
> > and see if this idea is actually possible.
> >
> > So the class of applications I work on usually do a keyBy on something
> like
> > a SessionId, SensorId or IP address.
> > In low traffic scenarios this means that in Kafka some partitions are
> > completely idle which makes Windows/GroupBy type operations impossible
> (in
> > my talk I explain it a lot better).
> >
> > I have a test setup right now to play around with this and I'm running
> into
> > a bit of a conceptual hurdle for which I'm looking for help.
> >
> > My goal is to ship the watermarks from within a topology into Kafka and
> > then let a follow up application extract those watermarks again and
> simply
> > continue.
> > The new SinkWriter interface has a void writeWatermark(Watermark
> > watermark) method
> > that seems intended for this kind of thing.
> > The basic operations like writing a watermark into Kafka, reading it
> again
> > and then recreating the watermark again works in my test setup (very
> messy
> > code but it works).
> >
> > My hurdle has to do with the combination of
> > - different parallelism numbers between Flink and Kafka (how do I ship 2
> > watermarks into 3 partitions)
> > - the fact that if you do a keyBy (both in Flink and Kafka) there is a
> > likely mismatch between the Flink 'partition' and the Kafka `partition`.
> > - processing speed differences between various threads (like session "A"
> > needs more CPU cycles/time/processing than session "B") will lead to
> > skewing of the progression between them.
> > - watermarks in separate threads in a single Flink topology are not
> > synchronized (they cannot and should not be).
> >
> > Has anyone any pointers on possible ways to handle this?
> >
> > Right now my only idea is to ship the watermark into all partitions (as
> > they do not have a key!) and let the consuming application determine the
> > "real watermark" based on the mix of watermarks coming in from the
> upstream
> > threads.
> >
> > All suggestions and ideas are appreciated.
> >
> > --
> > Best regards / Met vriendelijke groeten,
> >
> > Niels Basjes
> >
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Reply via email to