Hi Gunnar,

Unfortunately, the people who worked on DS V2 have shifted their focus. I
can try to answer your questions as best as I can, but I' also not familiar
with all the details.

1) In processRecord(), you can access the state
from PartitionedContext#getStateManager(). In onWatermark(), because this
is triggered for the whole operator rather than a specific keyed partition,
you would need to NonPartitionedContext#applyToAllPartitions(), and inside
the ApplyPartitionFunction#apply() you may access the state from
PartitionedContext#getStateManager().

2) Sorry, I don't have the original image either. Can't even recall what
this image was.

3) I think the watermark combiners only work for the multiple instances of
the same process function. For the two-input process functions, IIRC, it
was not supported because we ran into some problems related to the mail-box
thread model when trying to block one input stream while allowing another
stream to proceed. It should be doable, just would require some significant
refactor that we cannot complete before Flink 2.0. Sorry I can remember
more details.

4) It's not yet supported. At the moment, the only way would be to provide
your own TypeDescriptor implementation.

Best,

Xintong



On Wed, Oct 1, 2025 at 10:51 PM Gunnar Morling
<[email protected]> wrote:

> Hey all,
>
> One more question on state management with the DS V2 API:
>
> 4) How can I define and use state of structured types (i.e. POJOs)? I can
> see how to obtain type descriptors for primitive types via TypeDescriptors,
> but I can't seem to find a way for creating a type descriptor for a POJO.
>
> Thanks a lot,
>
> --Gunnar
>
>
> On Wed, 1 Oct 2025 at 10:10, Gunnar Morling <[email protected]
> >
> wrote:
>
> > Hey all,
> >
> > I'm exploring the generalized watermark functionality in the DS V2 API.
> > This feature looks very useful to me, I have a few questions around it
> > though:
> >
> > 1) The examples shown in the FLIP [1] use instance variables to
> > exchange state between processRecord() and onWatermark() ("use variable
> > instead of state for simplicity"). How can I work with persistent state
> > instead? In particular, I don't see how I can access the Flink state
> > manager from within onWatermark().
> >
> > 2) The FLIP is missing an image in the "Combine Watermarks" section.
> Could
> > this be added?
> >
> > 3) Do watermark combiners only work to combine watermarks of multiple
> > instances of one and the same process function, or can they also be used
> to
> > combine watermarks from two separate streams processed
> > by TwoInputNonBroadcastStreamProcessFunction? For my use case, I'd like
> to
> > emit the same watermark type within the sources of two streams and then
> > forward only the minimum watermark from a TINBSPF which joins these two
> > streams. Is this possible?
> >
> > Thanks a lot for any insights,
> >
> > --Gunnar
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-467%3A+Introduce+Generalized+Watermarks
> >
> >
>

Reply via email to