Hi Niels,

if you have multiple inputs going into a single Kafka partition then you
have to calculate the effective watermark by looking at the min watermark
from all inputs. You could insert a Flink operator that takes care of it
and then writes to a set of partitions in 1:n relationship. Alternatively,
you could take a look at Pulsar that wants to support this functionality
out of the box [1].

[1] https://github.com/apache/pulsar/issues/12267

Cheers,
Till

On Sun, Dec 19, 2021 at 4:46 PM Niels Basjes <ni...@basjes.nl> wrote:

> Hi,
>
> About a year ago I spoke at the Flink Forward conference (
> https://www.youtube.com/watch?v=wqRDyrE3dwg ) about handling development
> problems regarding streaming applications and handling the lack of events
> in a stream.
> Something I spoke about towards the end of this talk was the idea to ship
> the watermarks of a Flink topology into the intermediate transport between
> applications so you wouldn't need to recreate them.
>
> At that time it was just an idea, today I'm actually trying to build that
> and see if this idea is actually possible.
>
> So the class of applications I work on usually do a keyBy on something like
> a SessionId, SensorId or IP address.
> In low traffic scenarios this means that in Kafka some partitions are
> completely idle which makes Windows/GroupBy type operations impossible (in
> my talk I explain it a lot better).
>
> I have a test setup right now to play around with this and I'm running into
> a bit of a conceptual hurdle for which I'm looking for help.
>
> My goal is to ship the watermarks from within a topology into Kafka and
> then let a follow up application extract those watermarks again and simply
> continue.
> The new SinkWriter interface has a void writeWatermark(Watermark
> watermark) method
> that seems intended for this kind of thing.
> The basic operations like writing a watermark into Kafka, reading it again
> and then recreating the watermark again works in my test setup (very messy
> code but it works).
>
> My hurdle has to do with the combination of
> - different parallelism numbers between Flink and Kafka (how do I ship 2
> watermarks into 3 partitions)
> - the fact that if you do a keyBy (both in Flink and Kafka) there is a
> likely mismatch between the Flink 'partition' and the Kafka `partition`.
> - processing speed differences between various threads (like session "A"
> needs more CPU cycles/time/processing than session "B") will lead to
> skewing of the progression between them.
> - watermarks in separate threads in a single Flink topology are not
> synchronized (they cannot and should not be).
>
> Has anyone any pointers on possible ways to handle this?
>
> Right now my only idea is to ship the watermark into all partitions (as
> they do not have a key!) and let the consuming application determine the
> "real watermark" based on the mix of watermarks coming in from the upstream
> threads.
>
> All suggestions and ideas are appreciated.
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>

Reply via email to