Hi Mika,

+1 for the feature

I'd love to get your thoughts on the following areas:

1. In large-scale production deployments, one of the most frequent causes
of immediate job failure upon deployment is a NotSerializableException.
In-memory test harnesses often mask this by keeping objects in the same JVM
heap without passing them through Flink's internal serializers. Will the
harness API include explicit methods (e.g., harness.snapshotState() and
harness.restoreState()) that force the state through the physical
serialization boundaries defined by the @StateHint annotations?

2. In the distributed runtime, registering an event-time timer for the same
key and timestamp multiple times only results in a single onTimer
execution. Will the test harness natively replicate this exact
deduplication logic, or will developers need to account for duplicate
callbacks in their test assertions?

3. Complex event processing inevitably deals with out-of-order data and
watermarking. Does the harness API plan to expose mechanisms for asserting
how the PTF handles late records (i.e., events older than the currently
simulated watermark), particularly regarding routing these records to side
outputs for historical reconciliation?

4. Since PTFs frequently interact with dynamic table changelogs, will the
test harness natively support injecting specific RowKind enumerations
(INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE) into the eval() method to
validate state retraction and update logic?

5. PTFs often initialize custom metrics (counters, gauges) within the
open(FunctionContext) method. Will the harness automatically inject a
 MetricGroup into the context to prevent NullPointerException failures
during local testing, or will developers be responsible for providing a
MetricGroup in the test?

6. Given that the primary user base for PTFs resides in the SQL/Table API
ecosystem, forcing users to write imperative Java `Row.of(...)`
instantiations with deeply nested schemas can be cumbersome. Is there any
consideration for allowing the harness to ingest mock data declaratively,
perhaps via JSON payloads or DDL strings that automatically resolve to the
PTF's input types?

Thank you for driving this crucial initiative forward. Our team is really
looking forward to this feature.

Bests,
Samrat

On Wed, Mar 25, 2026 at 9:37 PM Gustavo de Morais <[email protected]>
wrote:

> Hey Mika, thanks for the reply, it makes sense.
>
>
> On Wed, 25 Mar 2026 at 13:07, Mika Naylor <[email protected]> wrote:
>
> > Hey Gustavo,
> >
> > Sorry for missing your feedback, thank you for taking the time to look at
> > the proposal!
> >
> > I think your point on throwing errors during harness building for table
> > arguments present in the eval but not configured makes sense! In terms of
> > the processing time, you're completely right - I added a method to handle
> > processing time changes, since it would affect how state TTL is also
> > handled.
> >
> > In terms of looking at the other harnesses, you're right that I left out
> > parallelism concerns. I think for now it might be out of scope and
> warrant
> > a follow up enhancement to the harness. In terms of getting side outputs
> -
> > would this also be relevant for table functions? I wasn't very familiar
> > with them, but the Flink docs suggests this is more of a datastream thing
> > than a table thing.
> >
> > Kind regards,
> > Mika
> >
> > On Tue, 17 Mar 2026, at 12:29 PM, Gustavo de Morais wrote:
> > > Hey Mika,
> > >
> > > Thanks for driving this! In general, this is useful for prototyping and
> > in
> > > general to speed up testing. So +1. Some feedback from my side:
> > >
> > > >For a PTF that reads from a table argument that hasn't been
> configured,
> > I
> > > think it would return null, yes.
> > > I think it'd be a better dev UX if the harness throws during build
> "table
> > > argument 'purchases' is declared in eval() but was never configured.
> > >
> > > >I'm not entirely sure on the real/processing time considerations - my
> > aim
> > > here was mostly around letting users validate timer behaviour, and
> timer
> > > registration/firing in PTFs is based on watermarks
> > > You've addressed event time and we need to detail in the flip what
> would
> > be
> > > the approach for processing time. The API would ideally support similar
> > > methods as we have here
> > >
> >
> https://github.com/confluentinc/flink/blob/a7a8dba2127ad719ca7932969b2934a0955e1bba/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java#L790-L804
> > .
> > > If you want this to be out of scope, you can mention that explicitly in
> > the
> > > FLIP.
> > >
> > > Also, comparing this to the current harness support for operators, here
> > are
> > > some other relevant things that devs might need out of harness test you
> > > might want to include or mention that they are out of scope:
> > >
> > > - Side outputs: AbstractStreamOperatorTestHarness has
> > > getSideOutput(OutputTag<X>).
> > > - No parallelism config, Snapshot is opaque: The existing harness takes
> > > maxParallelism, parallelism, and subtaskIndex. Key group assignment
> (and
> > > therefore state distribution) depends on maxParallelism.
> > > AbstractStreamOperatorTestHarness.snapshot() returns
> > OperatorSubtaskState,
> > > which you can pass to repartitionOperatorState() to simulate
> parallelism
> > > changes
> > >
> > >
> > > Kind regards,
> > > Gustavo
> > >
> > > On Thu, 12 Mar 2026 at 14:38, Mika Naylor <[email protected]> wrote:
> > >
> > > > Hey Martijn,
> > > >
> > > > Thank you for the detailed feedback!
> > > >
> > > > > 1. The FLIP has good examples of harness construction via the
> > builder,
> > > > but
> > > > > doesn't address lifecycle management. For comparison, the existing
> > > > > DataStream test harnesses (documented at [1]) have explicit open()
> > and
> > > > > implement AutoCloseable. Since PTFs can acquire resources in
> open(),
> > the
> > > > > harness needs to manage this lifecycle. Could you clarify how
> > > > > open()/close() on the underlying PTF is handled? Is close() called
> > > > > automatically, or does the user need to trigger it? An end-to-end
> > example
> > > > > showing cleanup would help.
> > > >
> > > > This is a good point! I must have missed the AutoClosable
> > implementation
> > > > in the public interfaces section - but I did intend to add it! So it
> > would
> > > > be closed automatically (but I suppose a user could also call close()
> > > > itself). For the open, I was planning on calling it during when the
> > harness
> > > > is built using `Builder.build()`.
> > > >
> > > > > 2. The existing operator test harnesses support snapshot() and
> > > > > initializeState(OperatorSubtaskState) to simulate
> checkpoint/restore
> > > > > cycles. This is important for catching state serialization bugs,
> > which
> > > > are
> > > > > a common source of production issues. The FLIP provides
> > > > withInitialState()
> > > > > for setup, but there's no way to take a snapshot mid-test and
> restore
> > > > into
> > > > > a fresh harness. Are we deliberately excluding this, or should we
> > > > consider
> > > > > adding it?
> > > >
> > > > I would absolutely consider adding this, thank you for pointing it
> > out. I
> > > > think being able to take a `.snapshot()` from a harness and then
> > initialise
> > > > a new harness via a `restore()` on the builder would make sense, as
> > well as
> > > > maybe supporting `.restore()` on the harness itself after it has been
> > built.
> > > >
> > > > > 3. Related to the above: PTFs with complex state (Map, List, POJO)
> > can
> > > > > behave differently with heap vs. RocksDB backends due to
> > serialization
> > > > > differences. The existing harnesses support setStateBackend().
> > Should the
> > > > > PTF test harness support this as well? At minimum, it would be good
> > to
> > > > > document which backend is used by default.
> > > >
> > > > I had just intended to support heap backend and document this, but
> > this is
> > > > a good point - supporting `setStateBackend()` makes sense here,
> > similar to
> > > > the existing harnesses. I'll add this to the spec and document the
> > default.
> > > >
> > > > > 4. withTableArgument(String tableName, List<Row> rows) is useful
> for
> > > > > testing join-like PTFs. The builder Javadoc describes when static
> > rows
> > > > are
> > > > > passed and how table semantics (ROW_SEMANTIC vs SET_SEMANTIC)
> affect
> > > > > delivery, but a few things remain unclear: How is the schema for
> > these
> > > > rows
> > > > > determined: is it inferred from the Row structure, or does it need
> to
> > > > match
> > > > > the eval() signature's type hints? And what happens if a PTF reads
> > from a
> > > > > table argument that hasn't been configured via the builder: does it
> > > > receive
> > > > > null, or does the harness throw at build time?
> > > >
> > > > I wasn't quite sure what the right approach is here, I thought that
> > > > inferring it from the Row structure would work but it feels odd to
> > ignore
> > > > the eval type hints. Perhaps I can try the Row structure approach,
> and
> > it
> > > > feels unergonomic explore the second approach.
> > > >
> > > > For a PTF that reads from a table argument that hasnt been
> configured,
> > I
> > > > think it would return null, yes.
> > > >
> > > > >
> > > > > - Timer.hasFired() is typed as boolean (primitive) but annotated
> > > > @Nullable.
> > > > > This looks like a bug -> should it be Boolean (boxed)?
> > > >
> > > > Oops, good catch. I'm not sure why I marked this as nullable, either
> a
> > > > timer has fired or it hasn't, im not sure returning null makes sense.
> > Maybe
> > > > returning a non-nullable primitive is fine here.
> > > >
> > > > > - getOutputByKind(RowKind) implies that output preserves RowKind
> > > > metadata.
> > > > > Could you confirm that getOutput() also retains this? The generic
> > <OUT>
> > > > > type parameter could use more specification on what's guaranteed.
> > > >
> > > > I would like getOutput to somehow retain this, but I'm not quite sure
> > how
> > > > the return type could look like in this case. Perhaps `RowData`? I'm
> > not
> > > > entirely sure if we have an interface that would cleanly capture
> this.
> > > >
> > > > > - Have you considered optional changelog consistency validation
> > (e.g.,
> > > > > verifying UPDATE_BEFORE precedes UPDATE_AFTER for the same key)?
> > Could
> > > > be a
> > > > > useful debugging aid.
> > > >
> > > > I hadn't, no, but this is a useful idea. Could be togglable on the
> > builder
> > > > with a `.withChangelogValidation` method.
> > > >
> > > > > - What's the error model when eval() or a timer callback throws?
> > > > Propagated
> > > > > directly, or wrapped?
> > > >
> > > > I would say propagated directly, unless you think wrapping them could
> > be
> > > > useful here.
> > > >
> > > > > - The test plan mentions leveraging
> ProcessTableFunctionTestPrograms.
> > > > Could
> > > > > you clarify whether the harness will be validated against those
> > > > scenarios,
> > > > > or whether it's intended to replace them for certain use cases?
> > > >
> > > > I think just validated against them, as a way of making sure that the
> > > > harness covers the right set of features we want to capture. I don't
> > think
> > > > it would replace them in this case.
> > > >
> > > > Thank you a ton for the feedback and ideas! I will update the FLIP
> > > > documentation based on them, it's very much appreciated.
> > > >
> > > > Kind regards,
> > > > Mika
> > > >
> > > > On Wed, 11 Mar 2026, at 6:02 PM, Martijn Visser wrote:
> > > > > Hey Mika,
> > > > >
> > > > > Thanks for putting this FLIP together. A dedicated test harness for
> > PTFs
> > > > is
> > > > > a welcome addition. The builder-pattern API and the state/timer
> > > > > introspection features are well thought out.
> > > > >
> > > > > I have a few questions and suggestions after reviewing the FLIP:
> > > > >
> > > > > 1. The FLIP has good examples of harness construction via the
> > builder,
> > > > but
> > > > > doesn't address lifecycle management. For comparison, the existing
> > > > > DataStream test harnesses (documented at [1]) have explicit open()
> > and
> > > > > implement AutoCloseable. Since PTFs can acquire resources in
> open(),
> > the
> > > > > harness needs to manage this lifecycle. Could you clarify how
> > > > > open()/close() on the underlying PTF is handled? Is close() called
> > > > > automatically, or does the user need to trigger it? An end-to-end
> > example
> > > > > showing cleanup would help.
> > > > >
> > > > > 2. The existing operator test harnesses support snapshot() and
> > > > > initializeState(OperatorSubtaskState) to simulate
> checkpoint/restore
> > > > > cycles. This is important for catching state serialization bugs,
> > which
> > > > are
> > > > > a common source of production issues. The FLIP provides
> > > > withInitialState()
> > > > > for setup, but there's no way to take a snapshot mid-test and
> restore
> > > > into
> > > > > a fresh harness. Are we deliberately excluding this, or should we
> > > > consider
> > > > > adding it?
> > > > >
> > > > > 3. Related to the above: PTFs with complex state (Map, List, POJO)
> > can
> > > > > behave differently with heap vs. RocksDB backends due to
> > serialization
> > > > > differences. The existing harnesses support setStateBackend().
> > Should the
> > > > > PTF test harness support this as well? At minimum, it would be good
> > to
> > > > > document which backend is used by default.
> > > > >
> > > > > 4. withTableArgument(String tableName, List<Row> rows) is useful
> for
> > > > > testing join-like PTFs. The builder Javadoc describes when static
> > rows
> > > > are
> > > > > passed and how table semantics (ROW_SEMANTIC vs SET_SEMANTIC)
> affect
> > > > > delivery, but a few things remain unclear: How is the schema for
> > these
> > > > rows
> > > > > determined: is it inferred from the Row structure, or does it need
> to
> > > > match
> > > > > the eval() signature's type hints? And what happens if a PTF reads
> > from a
> > > > > table argument that hasn't been configured via the builder: does it
> > > > receive
> > > > > null, or does the harness throw at build time?
> > > > >
> > > > > A few smaller points:
> > > > >
> > > > > - Timer.hasFired() is typed as boolean (primitive) but annotated
> > > > @Nullable.
> > > > > This looks like a bug -> should it be Boolean (boxed)?
> > > > > - getOutputByKind(RowKind) implies that output preserves RowKind
> > > > metadata.
> > > > > Could you confirm that getOutput() also retains this? The generic
> > <OUT>
> > > > > type parameter could use more specification on what's guaranteed.
> > > > > - Have you considered optional changelog consistency validation
> > (e.g.,
> > > > > verifying UPDATE_BEFORE precedes UPDATE_AFTER for the same key)?
> > Could
> > > > be a
> > > > > useful debugging aid.
> > > > > - What's the error model when eval() or a timer callback throws?
> > > > Propagated
> > > > > directly, or wrapped?
> > > > > - The test plan mentions leveraging
> ProcessTableFunctionTestPrograms.
> > > > Could
> > > > > you clarify whether the harness will be validated against those
> > > > scenarios,
> > > > > or whether it's intended to replace them for certain use cases?
> > > > >
> > > > > Overall I'm +1 on the direction. The core API design is clean and
> > covers
> > > > > the main testing needs well. Addressing the lifecycle and
> > > > > checkpoint/restore gaps would bring it in line with what Flink
> users
> > > > > already have for DataStream UDF testing.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Martijn
> > > > >
> > > > > [1]
> > > > >
> > > >
> >
> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators
> > > > >
> > > > > On Fri, Mar 6, 2026 at 9:30 AM Mika Naylor <[email protected]>
> > wrote:
> > > > >
> > > > > > Hey David,
> > > > > >
> > > > > > Yeah, I think in terms of scope I aim for more providing a
> > framework
> > > > for
> > > > > > unit testing the behavior of custom PTFs. I'd like to include as
> > much
> > > > > > validation as possible but there might be validation steps that
> > aren't
> > > > > > possible to do without dipping into the engine side of things.
> > > > > >
> > > > > > I'm not entirely sure on the real/processing time considerations
> -
> > my
> > > > aim
> > > > > > here was mostly around letting users validate timer behaviour,
> and
> > > > timer
> > > > > > registration/firing in PTFs is based on watermarks, if I read the
> > doc
> > > > > > correctly.
> > > > > >
> > > > > > Kind regards,
> > > > > > Mika
> > > > > >
> > > > > > On Wed, 4 Mar 2026, at 10:38 AM, David Radley wrote:
> > > > > > > Hi Mika,
> > > > > > > This sounds like a good idea, in terms of scope, Is the idea
> that
> > > > this
> > > > > > is purely for unit tests or is this additionally proposed as
> > > > validation /
> > > > > > test harness for use when developing custom PTFs.
> > > > > > > I guess this allows us to create a common set of tests that all
> > PTFs
> > > > > > need to pass using this harness.
> > > > > > >
> > > > > > > I would assume there are real (not event)  time considerations
> > for
> > > > some
> > > > > > PTFs, it would be worth mentioning how we should handle that.
> > > > > > >
> > > > > > >    Kind regards,  David.
> > > > > > >
> > > > > > > From: Mika Naylor <[email protected]>
> > > > > > > Date: Tuesday, 3 March 2026 at 16:46
> > > > > > > To: [email protected] <[email protected]>
> > > > > > > Subject: [EXTERNAL] [DISCUSS] FLIP-567: Introduce a
> > > > ProcessTableFunction
> > > > > > Test Harness
> > > > > > >
> > > > > > > Hey everyone!
> > > > > > >
> > > > > > > I would like to kick off a discussion on FLIP-567: Introduce a
> > > > > > ProcessTableFunction Test Harness[1].
> > > > > > >
> > > > > > > Currently, testing PTFs require full integration tests against
> a
> > > > running
> > > > > > Flink cluster. This FLIP would introduce a developer-friendly
> test
> > > > harness
> > > > > > for unit testing PTFs and would provide introspection to output,
> > state,
> > > > > > timers, and watermarks for assertions and behaviour validation.
> > This
> > > > would
> > > > > > let developers iterate and test their PTFs without needing to
> run a
> > > > > > fullscale integration test against a live Flink cluster.
> > > > > > >
> > > > > > > Would love any thoughts and feedback the community might have
> on
> > this
> > > > > > proposal.
> > > > > > >
> > > > > > > Kind regards,
> > > > > > > Mika Naylor
> > > > > > >
> > > > > > > [1]
> > > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-567%3A+Introduce+a+ProcessTableFunction+Test+Harness
> > > > > > >
> > > > > > > Unless otherwise stated above:
> > > > > > >
> > > > > > > IBM United Kingdom Limited
> > > > > > > Registered in England and Wales with number 741598
> > > > > > > Registered office: Building C, IBM Hursley Office, Hursley Park
> > Road,
> > > > > > Winchester, Hampshire SO21 2JN
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to