Hey Mika, thanks for the reply, it makes sense.

On Wed, 25 Mar 2026 at 13:07, Mika Naylor <[email protected]> wrote:

> Hey Gustavo,
>
> Sorry for missing your feedback, thank you for taking the time to look at
> the proposal!
>
> I think your point on throwing errors during harness building for table
> arguments present in the eval but not configured makes sense! In terms of
> the processing time, you're completely right - I added a method to handle
> processing time changes, since it would affect how state TTL is also
> handled.
>
> In terms of looking at the other harnesses, you're right that I left out
> parallelism concerns. I think for now it might be out of scope and warrant
> a follow up enhancement to the harness. In terms of getting side outputs -
> would this also be relevant for table functions? I wasn't very familiar
> with them, but the Flink docs suggests this is more of a datastream thing
> than a table thing.
>
> Kind regards,
> Mika
>
> On Tue, 17 Mar 2026, at 12:29 PM, Gustavo de Morais wrote:
> > Hey Mika,
> >
> > Thanks for driving this! In general, this is useful for prototyping and
> in
> > general to speed up testing. So +1. Some feedback from my side:
> >
> > >For a PTF that reads from a table argument that hasn't been configured,
> I
> > think it would return null, yes.
> > I think it'd be a better dev UX if the harness throws during build "table
> > argument 'purchases' is declared in eval() but was never configured.
> >
> > >I'm not entirely sure on the real/processing time considerations - my
> aim
> > here was mostly around letting users validate timer behaviour, and timer
> > registration/firing in PTFs is based on watermarks
> > You've addressed event time and we need to detail in the flip what would
> be
> > the approach for processing time. The API would ideally support similar
> > methods as we have here
> >
> https://github.com/confluentinc/flink/blob/a7a8dba2127ad719ca7932969b2934a0955e1bba/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java#L790-L804
> .
> > If you want this to be out of scope, you can mention that explicitly in
> the
> > FLIP.
> >
> > Also, comparing this to the current harness support for operators, here
> are
> > some other relevant things that devs might need out of harness test you
> > might want to include or mention that they are out of scope:
> >
> > - Side outputs: AbstractStreamOperatorTestHarness has
> > getSideOutput(OutputTag<X>).
> > - No parallelism config, Snapshot is opaque: The existing harness takes
> > maxParallelism, parallelism, and subtaskIndex. Key group assignment (and
> > therefore state distribution) depends on maxParallelism.
> > AbstractStreamOperatorTestHarness.snapshot() returns
> OperatorSubtaskState,
> > which you can pass to repartitionOperatorState() to simulate parallelism
> > changes
> >
> >
> > Kind regards,
> > Gustavo
> >
> > On Thu, 12 Mar 2026 at 14:38, Mika Naylor <[email protected]> wrote:
> >
> > > Hey Martijn,
> > >
> > > Thank you for the detailed feedback!
> > >
> > > > 1. The FLIP has good examples of harness construction via the
> builder,
> > > but
> > > > doesn't address lifecycle management. For comparison, the existing
> > > > DataStream test harnesses (documented at [1]) have explicit open()
> and
> > > > implement AutoCloseable. Since PTFs can acquire resources in open(),
> the
> > > > harness needs to manage this lifecycle. Could you clarify how
> > > > open()/close() on the underlying PTF is handled? Is close() called
> > > > automatically, or does the user need to trigger it? An end-to-end
> example
> > > > showing cleanup would help.
> > >
> > > This is a good point! I must have missed the AutoClosable
> implementation
> > > in the public interfaces section - but I did intend to add it! So it
> would
> > > be closed automatically (but I suppose a user could also call close()
> > > itself). For the open, I was planning on calling it during when the
> harness
> > > is built using `Builder.build()`.
> > >
> > > > 2. The existing operator test harnesses support snapshot() and
> > > > initializeState(OperatorSubtaskState) to simulate checkpoint/restore
> > > > cycles. This is important for catching state serialization bugs,
> which
> > > are
> > > > a common source of production issues. The FLIP provides
> > > withInitialState()
> > > > for setup, but there's no way to take a snapshot mid-test and restore
> > > into
> > > > a fresh harness. Are we deliberately excluding this, or should we
> > > consider
> > > > adding it?
> > >
> > > I would absolutely consider adding this, thank you for pointing it
> out. I
> > > think being able to take a `.snapshot()` from a harness and then
> initialise
> > > a new harness via a `restore()` on the builder would make sense, as
> well as
> > > maybe supporting `.restore()` on the harness itself after it has been
> built.
> > >
> > > > 3. Related to the above: PTFs with complex state (Map, List, POJO)
> can
> > > > behave differently with heap vs. RocksDB backends due to
> serialization
> > > > differences. The existing harnesses support setStateBackend().
> Should the
> > > > PTF test harness support this as well? At minimum, it would be good
> to
> > > > document which backend is used by default.
> > >
> > > I had just intended to support heap backend and document this, but
> this is
> > > a good point - supporting `setStateBackend()` makes sense here,
> similar to
> > > the existing harnesses. I'll add this to the spec and document the
> default.
> > >
> > > > 4. withTableArgument(String tableName, List<Row> rows) is useful for
> > > > testing join-like PTFs. The builder Javadoc describes when static
> rows
> > > are
> > > > passed and how table semantics (ROW_SEMANTIC vs SET_SEMANTIC) affect
> > > > delivery, but a few things remain unclear: How is the schema for
> these
> > > rows
> > > > determined: is it inferred from the Row structure, or does it need to
> > > match
> > > > the eval() signature's type hints? And what happens if a PTF reads
> from a
> > > > table argument that hasn't been configured via the builder: does it
> > > receive
> > > > null, or does the harness throw at build time?
> > >
> > > I wasn't quite sure what the right approach is here, I thought that
> > > inferring it from the Row structure would work but it feels odd to
> ignore
> > > the eval type hints. Perhaps I can try the Row structure approach, and
> it
> > > feels unergonomic explore the second approach.
> > >
> > > For a PTF that reads from a table argument that hasnt been configured,
> I
> > > think it would return null, yes.
> > >
> > > >
> > > > - Timer.hasFired() is typed as boolean (primitive) but annotated
> > > @Nullable.
> > > > This looks like a bug -> should it be Boolean (boxed)?
> > >
> > > Oops, good catch. I'm not sure why I marked this as nullable, either a
> > > timer has fired or it hasn't, im not sure returning null makes sense.
> Maybe
> > > returning a non-nullable primitive is fine here.
> > >
> > > > - getOutputByKind(RowKind) implies that output preserves RowKind
> > > metadata.
> > > > Could you confirm that getOutput() also retains this? The generic
> <OUT>
> > > > type parameter could use more specification on what's guaranteed.
> > >
> > > I would like getOutput to somehow retain this, but I'm not quite sure
> how
> > > the return type could look like in this case. Perhaps `RowData`? I'm
> not
> > > entirely sure if we have an interface that would cleanly capture this.
> > >
> > > > - Have you considered optional changelog consistency validation
> (e.g.,
> > > > verifying UPDATE_BEFORE precedes UPDATE_AFTER for the same key)?
> Could
> > > be a
> > > > useful debugging aid.
> > >
> > > I hadn't, no, but this is a useful idea. Could be togglable on the
> builder
> > > with a `.withChangelogValidation` method.
> > >
> > > > - What's the error model when eval() or a timer callback throws?
> > > Propagated
> > > > directly, or wrapped?
> > >
> > > I would say propagated directly, unless you think wrapping them could
> be
> > > useful here.
> > >
> > > > - The test plan mentions leveraging ProcessTableFunctionTestPrograms.
> > > Could
> > > > you clarify whether the harness will be validated against those
> > > scenarios,
> > > > or whether it's intended to replace them for certain use cases?
> > >
> > > I think just validated against them, as a way of making sure that the
> > > harness covers the right set of features we want to capture. I don't
> think
> > > it would replace them in this case.
> > >
> > > Thank you a ton for the feedback and ideas! I will update the FLIP
> > > documentation based on them, it's very much appreciated.
> > >
> > > Kind regards,
> > > Mika
> > >
> > > On Wed, 11 Mar 2026, at 6:02 PM, Martijn Visser wrote:
> > > > Hey Mika,
> > > >
> > > > Thanks for putting this FLIP together. A dedicated test harness for
> PTFs
> > > is
> > > > a welcome addition. The builder-pattern API and the state/timer
> > > > introspection features are well thought out.
> > > >
> > > > I have a few questions and suggestions after reviewing the FLIP:
> > > >
> > > > 1. The FLIP has good examples of harness construction via the
> builder,
> > > but
> > > > doesn't address lifecycle management. For comparison, the existing
> > > > DataStream test harnesses (documented at [1]) have explicit open()
> and
> > > > implement AutoCloseable. Since PTFs can acquire resources in open(),
> the
> > > > harness needs to manage this lifecycle. Could you clarify how
> > > > open()/close() on the underlying PTF is handled? Is close() called
> > > > automatically, or does the user need to trigger it? An end-to-end
> example
> > > > showing cleanup would help.
> > > >
> > > > 2. The existing operator test harnesses support snapshot() and
> > > > initializeState(OperatorSubtaskState) to simulate checkpoint/restore
> > > > cycles. This is important for catching state serialization bugs,
> which
> > > are
> > > > a common source of production issues. The FLIP provides
> > > withInitialState()
> > > > for setup, but there's no way to take a snapshot mid-test and restore
> > > into
> > > > a fresh harness. Are we deliberately excluding this, or should we
> > > consider
> > > > adding it?
> > > >
> > > > 3. Related to the above: PTFs with complex state (Map, List, POJO)
> can
> > > > behave differently with heap vs. RocksDB backends due to
> serialization
> > > > differences. The existing harnesses support setStateBackend().
> Should the
> > > > PTF test harness support this as well? At minimum, it would be good
> to
> > > > document which backend is used by default.
> > > >
> > > > 4. withTableArgument(String tableName, List<Row> rows) is useful for
> > > > testing join-like PTFs. The builder Javadoc describes when static
> rows
> > > are
> > > > passed and how table semantics (ROW_SEMANTIC vs SET_SEMANTIC) affect
> > > > delivery, but a few things remain unclear: How is the schema for
> these
> > > rows
> > > > determined: is it inferred from the Row structure, or does it need to
> > > match
> > > > the eval() signature's type hints? And what happens if a PTF reads
> from a
> > > > table argument that hasn't been configured via the builder: does it
> > > receive
> > > > null, or does the harness throw at build time?
> > > >
> > > > A few smaller points:
> > > >
> > > > - Timer.hasFired() is typed as boolean (primitive) but annotated
> > > @Nullable.
> > > > This looks like a bug -> should it be Boolean (boxed)?
> > > > - getOutputByKind(RowKind) implies that output preserves RowKind
> > > metadata.
> > > > Could you confirm that getOutput() also retains this? The generic
> <OUT>
> > > > type parameter could use more specification on what's guaranteed.
> > > > - Have you considered optional changelog consistency validation
> (e.g.,
> > > > verifying UPDATE_BEFORE precedes UPDATE_AFTER for the same key)?
> Could
> > > be a
> > > > useful debugging aid.
> > > > - What's the error model when eval() or a timer callback throws?
> > > Propagated
> > > > directly, or wrapped?
> > > > - The test plan mentions leveraging ProcessTableFunctionTestPrograms.
> > > Could
> > > > you clarify whether the harness will be validated against those
> > > scenarios,
> > > > or whether it's intended to replace them for certain use cases?
> > > >
> > > > Overall I'm +1 on the direction. The core API design is clean and
> covers
> > > > the main testing needs well. Addressing the lifecycle and
> > > > checkpoint/restore gaps would bring it in line with what Flink users
> > > > already have for DataStream UDF testing.
> > > >
> > > > Thanks,
> > > >
> > > > Martijn
> > > >
> > > > [1]
> > > >
> > >
> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators
> > > >
> > > > On Fri, Mar 6, 2026 at 9:30 AM Mika Naylor <[email protected]>
> wrote:
> > > >
> > > > > Hey David,
> > > > >
> > > > > Yeah, I think in terms of scope I aim for more providing a
> framework
> > > for
> > > > > unit testing the behavior of custom PTFs. I'd like to include as
> much
> > > > > validation as possible but there might be validation steps that
> aren't
> > > > > possible to do without dipping into the engine side of things.
> > > > >
> > > > > I'm not entirely sure on the real/processing time considerations -
> my
> > > aim
> > > > > here was mostly around letting users validate timer behaviour, and
> > > timer
> > > > > registration/firing in PTFs is based on watermarks, if I read the
> doc
> > > > > correctly.
> > > > >
> > > > > Kind regards,
> > > > > Mika
> > > > >
> > > > > On Wed, 4 Mar 2026, at 10:38 AM, David Radley wrote:
> > > > > > Hi Mika,
> > > > > > This sounds like a good idea, in terms of scope, Is the idea that
> > > this
> > > > > is purely for unit tests or is this additionally proposed as
> > > validation /
> > > > > test harness for use when developing custom PTFs.
> > > > > > I guess this allows us to create a common set of tests that all
> PTFs
> > > > > need to pass using this harness.
> > > > > >
> > > > > > I would assume there are real (not event)  time considerations
> for
> > > some
> > > > > PTFs, it would be worth mentioning how we should handle that.
> > > > > >
> > > > > >    Kind regards,  David.
> > > > > >
> > > > > > From: Mika Naylor <[email protected]>
> > > > > > Date: Tuesday, 3 March 2026 at 16:46
> > > > > > To: [email protected] <[email protected]>
> > > > > > Subject: [EXTERNAL] [DISCUSS] FLIP-567: Introduce a
> > > ProcessTableFunction
> > > > > Test Harness
> > > > > >
> > > > > > Hey everyone!
> > > > > >
> > > > > > I would like to kick off a discussion on FLIP-567: Introduce a
> > > > > ProcessTableFunction Test Harness[1].
> > > > > >
> > > > > > Currently, testing PTFs require full integration tests against a
> > > running
> > > > > Flink cluster. This FLIP would introduce a developer-friendly test
> > > harness
> > > > > for unit testing PTFs and would provide introspection to output,
> state,
> > > > > timers, and watermarks for assertions and behaviour validation.
> This
> > > would
> > > > > let developers iterate and test their PTFs without needing to run a
> > > > > fullscale integration test against a live Flink cluster.
> > > > > >
> > > > > > Would love any thoughts and feedback the community might have on
> this
> > > > > proposal.
> > > > > >
> > > > > > Kind regards,
> > > > > > Mika Naylor
> > > > > >
> > > > > > [1]
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-567%3A+Introduce+a+ProcessTableFunction+Test+Harness
> > > > > >
> > > > > > Unless otherwise stated above:
> > > > > >
> > > > > > IBM United Kingdom Limited
> > > > > > Registered in England and Wales with number 741598
> > > > > > Registered office: Building C, IBM Hursley Office, Hursley Park
> Road,
> > > > > Winchester, Hampshire SO21 2JN
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to