Hey Mika, thanks for the reply, it makes sense.
On Wed, 25 Mar 2026 at 13:07, Mika Naylor <[email protected]> wrote: > Hey Gustavo, > > Sorry for missing your feedback, thank you for taking the time to look at > the proposal! > > I think your point on throwing errors during harness building for table > arguments present in the eval but not configured makes sense! In terms of > the processing time, you're completely right - I added a method to handle > processing time changes, since it would affect how state TTL is also > handled. > > In terms of looking at the other harnesses, you're right that I left out > parallelism concerns. I think for now it might be out of scope and warrant > a follow up enhancement to the harness. In terms of getting side outputs - > would this also be relevant for table functions? I wasn't very familiar > with them, but the Flink docs suggests this is more of a datastream thing > than a table thing. > > Kind regards, > Mika > > On Tue, 17 Mar 2026, at 12:29 PM, Gustavo de Morais wrote: > > Hey Mika, > > > > Thanks for driving this! In general, this is useful for prototyping and > in > > general to speed up testing. So +1. Some feedback from my side: > > > > >For a PTF that reads from a table argument that hasn't been configured, > I > > think it would return null, yes. > > I think it'd be a better dev UX if the harness throws during build "table > > argument 'purchases' is declared in eval() but was never configured. > > > > >I'm not entirely sure on the real/processing time considerations - my > aim > > here was mostly around letting users validate timer behaviour, and timer > > registration/firing in PTFs is based on watermarks > > You've addressed event time and we need to detail in the flip what would > be > > the approach for processing time. The API would ideally support similar > > methods as we have here > > > https://github.com/confluentinc/flink/blob/a7a8dba2127ad719ca7932969b2934a0955e1bba/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java#L790-L804 > . > > If you want this to be out of scope, you can mention that explicitly in > the > > FLIP. > > > > Also, comparing this to the current harness support for operators, here > are > > some other relevant things that devs might need out of harness test you > > might want to include or mention that they are out of scope: > > > > - Side outputs: AbstractStreamOperatorTestHarness has > > getSideOutput(OutputTag<X>). > > - No parallelism config, Snapshot is opaque: The existing harness takes > > maxParallelism, parallelism, and subtaskIndex. Key group assignment (and > > therefore state distribution) depends on maxParallelism. > > AbstractStreamOperatorTestHarness.snapshot() returns > OperatorSubtaskState, > > which you can pass to repartitionOperatorState() to simulate parallelism > > changes > > > > > > Kind regards, > > Gustavo > > > > On Thu, 12 Mar 2026 at 14:38, Mika Naylor <[email protected]> wrote: > > > > > Hey Martijn, > > > > > > Thank you for the detailed feedback! > > > > > > > 1. The FLIP has good examples of harness construction via the > builder, > > > but > > > > doesn't address lifecycle management. For comparison, the existing > > > > DataStream test harnesses (documented at [1]) have explicit open() > and > > > > implement AutoCloseable. Since PTFs can acquire resources in open(), > the > > > > harness needs to manage this lifecycle. Could you clarify how > > > > open()/close() on the underlying PTF is handled? Is close() called > > > > automatically, or does the user need to trigger it? An end-to-end > example > > > > showing cleanup would help. > > > > > > This is a good point! I must have missed the AutoClosable > implementation > > > in the public interfaces section - but I did intend to add it! So it > would > > > be closed automatically (but I suppose a user could also call close() > > > itself). For the open, I was planning on calling it during when the > harness > > > is built using `Builder.build()`. > > > > > > > 2. The existing operator test harnesses support snapshot() and > > > > initializeState(OperatorSubtaskState) to simulate checkpoint/restore > > > > cycles. This is important for catching state serialization bugs, > which > > > are > > > > a common source of production issues. The FLIP provides > > > withInitialState() > > > > for setup, but there's no way to take a snapshot mid-test and restore > > > into > > > > a fresh harness. Are we deliberately excluding this, or should we > > > consider > > > > adding it? > > > > > > I would absolutely consider adding this, thank you for pointing it > out. I > > > think being able to take a `.snapshot()` from a harness and then > initialise > > > a new harness via a `restore()` on the builder would make sense, as > well as > > > maybe supporting `.restore()` on the harness itself after it has been > built. > > > > > > > 3. Related to the above: PTFs with complex state (Map, List, POJO) > can > > > > behave differently with heap vs. RocksDB backends due to > serialization > > > > differences. The existing harnesses support setStateBackend(). > Should the > > > > PTF test harness support this as well? At minimum, it would be good > to > > > > document which backend is used by default. > > > > > > I had just intended to support heap backend and document this, but > this is > > > a good point - supporting `setStateBackend()` makes sense here, > similar to > > > the existing harnesses. I'll add this to the spec and document the > default. > > > > > > > 4. withTableArgument(String tableName, List<Row> rows) is useful for > > > > testing join-like PTFs. The builder Javadoc describes when static > rows > > > are > > > > passed and how table semantics (ROW_SEMANTIC vs SET_SEMANTIC) affect > > > > delivery, but a few things remain unclear: How is the schema for > these > > > rows > > > > determined: is it inferred from the Row structure, or does it need to > > > match > > > > the eval() signature's type hints? And what happens if a PTF reads > from a > > > > table argument that hasn't been configured via the builder: does it > > > receive > > > > null, or does the harness throw at build time? > > > > > > I wasn't quite sure what the right approach is here, I thought that > > > inferring it from the Row structure would work but it feels odd to > ignore > > > the eval type hints. Perhaps I can try the Row structure approach, and > it > > > feels unergonomic explore the second approach. > > > > > > For a PTF that reads from a table argument that hasnt been configured, > I > > > think it would return null, yes. > > > > > > > > > > > - Timer.hasFired() is typed as boolean (primitive) but annotated > > > @Nullable. > > > > This looks like a bug -> should it be Boolean (boxed)? > > > > > > Oops, good catch. I'm not sure why I marked this as nullable, either a > > > timer has fired or it hasn't, im not sure returning null makes sense. > Maybe > > > returning a non-nullable primitive is fine here. > > > > > > > - getOutputByKind(RowKind) implies that output preserves RowKind > > > metadata. > > > > Could you confirm that getOutput() also retains this? The generic > <OUT> > > > > type parameter could use more specification on what's guaranteed. > > > > > > I would like getOutput to somehow retain this, but I'm not quite sure > how > > > the return type could look like in this case. Perhaps `RowData`? I'm > not > > > entirely sure if we have an interface that would cleanly capture this. > > > > > > > - Have you considered optional changelog consistency validation > (e.g., > > > > verifying UPDATE_BEFORE precedes UPDATE_AFTER for the same key)? > Could > > > be a > > > > useful debugging aid. > > > > > > I hadn't, no, but this is a useful idea. Could be togglable on the > builder > > > with a `.withChangelogValidation` method. > > > > > > > - What's the error model when eval() or a timer callback throws? > > > Propagated > > > > directly, or wrapped? > > > > > > I would say propagated directly, unless you think wrapping them could > be > > > useful here. > > > > > > > - The test plan mentions leveraging ProcessTableFunctionTestPrograms. > > > Could > > > > you clarify whether the harness will be validated against those > > > scenarios, > > > > or whether it's intended to replace them for certain use cases? > > > > > > I think just validated against them, as a way of making sure that the > > > harness covers the right set of features we want to capture. I don't > think > > > it would replace them in this case. > > > > > > Thank you a ton for the feedback and ideas! I will update the FLIP > > > documentation based on them, it's very much appreciated. > > > > > > Kind regards, > > > Mika > > > > > > On Wed, 11 Mar 2026, at 6:02 PM, Martijn Visser wrote: > > > > Hey Mika, > > > > > > > > Thanks for putting this FLIP together. A dedicated test harness for > PTFs > > > is > > > > a welcome addition. The builder-pattern API and the state/timer > > > > introspection features are well thought out. > > > > > > > > I have a few questions and suggestions after reviewing the FLIP: > > > > > > > > 1. The FLIP has good examples of harness construction via the > builder, > > > but > > > > doesn't address lifecycle management. For comparison, the existing > > > > DataStream test harnesses (documented at [1]) have explicit open() > and > > > > implement AutoCloseable. Since PTFs can acquire resources in open(), > the > > > > harness needs to manage this lifecycle. Could you clarify how > > > > open()/close() on the underlying PTF is handled? Is close() called > > > > automatically, or does the user need to trigger it? An end-to-end > example > > > > showing cleanup would help. > > > > > > > > 2. The existing operator test harnesses support snapshot() and > > > > initializeState(OperatorSubtaskState) to simulate checkpoint/restore > > > > cycles. This is important for catching state serialization bugs, > which > > > are > > > > a common source of production issues. The FLIP provides > > > withInitialState() > > > > for setup, but there's no way to take a snapshot mid-test and restore > > > into > > > > a fresh harness. Are we deliberately excluding this, or should we > > > consider > > > > adding it? > > > > > > > > 3. Related to the above: PTFs with complex state (Map, List, POJO) > can > > > > behave differently with heap vs. RocksDB backends due to > serialization > > > > differences. The existing harnesses support setStateBackend(). > Should the > > > > PTF test harness support this as well? At minimum, it would be good > to > > > > document which backend is used by default. > > > > > > > > 4. withTableArgument(String tableName, List<Row> rows) is useful for > > > > testing join-like PTFs. The builder Javadoc describes when static > rows > > > are > > > > passed and how table semantics (ROW_SEMANTIC vs SET_SEMANTIC) affect > > > > delivery, but a few things remain unclear: How is the schema for > these > > > rows > > > > determined: is it inferred from the Row structure, or does it need to > > > match > > > > the eval() signature's type hints? And what happens if a PTF reads > from a > > > > table argument that hasn't been configured via the builder: does it > > > receive > > > > null, or does the harness throw at build time? > > > > > > > > A few smaller points: > > > > > > > > - Timer.hasFired() is typed as boolean (primitive) but annotated > > > @Nullable. > > > > This looks like a bug -> should it be Boolean (boxed)? > > > > - getOutputByKind(RowKind) implies that output preserves RowKind > > > metadata. > > > > Could you confirm that getOutput() also retains this? The generic > <OUT> > > > > type parameter could use more specification on what's guaranteed. > > > > - Have you considered optional changelog consistency validation > (e.g., > > > > verifying UPDATE_BEFORE precedes UPDATE_AFTER for the same key)? > Could > > > be a > > > > useful debugging aid. > > > > - What's the error model when eval() or a timer callback throws? > > > Propagated > > > > directly, or wrapped? > > > > - The test plan mentions leveraging ProcessTableFunctionTestPrograms. > > > Could > > > > you clarify whether the harness will be validated against those > > > scenarios, > > > > or whether it's intended to replace them for certain use cases? > > > > > > > > Overall I'm +1 on the direction. The core API design is clean and > covers > > > > the main testing needs well. Addressing the lifecycle and > > > > checkpoint/restore gaps would bring it in line with what Flink users > > > > already have for DataStream UDF testing. > > > > > > > > Thanks, > > > > > > > > Martijn > > > > > > > > [1] > > > > > > > > https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators > > > > > > > > On Fri, Mar 6, 2026 at 9:30 AM Mika Naylor <[email protected]> > wrote: > > > > > > > > > Hey David, > > > > > > > > > > Yeah, I think in terms of scope I aim for more providing a > framework > > > for > > > > > unit testing the behavior of custom PTFs. I'd like to include as > much > > > > > validation as possible but there might be validation steps that > aren't > > > > > possible to do without dipping into the engine side of things. > > > > > > > > > > I'm not entirely sure on the real/processing time considerations - > my > > > aim > > > > > here was mostly around letting users validate timer behaviour, and > > > timer > > > > > registration/firing in PTFs is based on watermarks, if I read the > doc > > > > > correctly. > > > > > > > > > > Kind regards, > > > > > Mika > > > > > > > > > > On Wed, 4 Mar 2026, at 10:38 AM, David Radley wrote: > > > > > > Hi Mika, > > > > > > This sounds like a good idea, in terms of scope, Is the idea that > > > this > > > > > is purely for unit tests or is this additionally proposed as > > > validation / > > > > > test harness for use when developing custom PTFs. > > > > > > I guess this allows us to create a common set of tests that all > PTFs > > > > > need to pass using this harness. > > > > > > > > > > > > I would assume there are real (not event) time considerations > for > > > some > > > > > PTFs, it would be worth mentioning how we should handle that. > > > > > > > > > > > > Kind regards, David. > > > > > > > > > > > > From: Mika Naylor <[email protected]> > > > > > > Date: Tuesday, 3 March 2026 at 16:46 > > > > > > To: [email protected] <[email protected]> > > > > > > Subject: [EXTERNAL] [DISCUSS] FLIP-567: Introduce a > > > ProcessTableFunction > > > > > Test Harness > > > > > > > > > > > > Hey everyone! > > > > > > > > > > > > I would like to kick off a discussion on FLIP-567: Introduce a > > > > > ProcessTableFunction Test Harness[1]. > > > > > > > > > > > > Currently, testing PTFs require full integration tests against a > > > running > > > > > Flink cluster. This FLIP would introduce a developer-friendly test > > > harness > > > > > for unit testing PTFs and would provide introspection to output, > state, > > > > > timers, and watermarks for assertions and behaviour validation. > This > > > would > > > > > let developers iterate and test their PTFs without needing to run a > > > > > fullscale integration test against a live Flink cluster. > > > > > > > > > > > > Would love any thoughts and feedback the community might have on > this > > > > > proposal. > > > > > > > > > > > > Kind regards, > > > > > > Mika Naylor > > > > > > > > > > > > [1] > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-567%3A+Introduce+a+ProcessTableFunction+Test+Harness > > > > > > > > > > > > Unless otherwise stated above: > > > > > > > > > > > > IBM United Kingdom Limited > > > > > > Registered in England and Wales with number 741598 > > > > > > Registered office: Building C, IBM Hursley Office, Hursley Park > Road, > > > > > Winchester, Hampshire SO21 2JN > > > > > > > > > > > > > > > > > > > > >
