Hey Samrat, Thank you for the great questions!
> 1. In large-scale production deployments, one of the most frequent causes > of immediate job failure upon deployment is a NotSerializableException. > In-memory test harnesses often mask this by keeping objects in the same JVM > heap without passing them through Flink's internal serializers. Will the > harness API include explicit methods (e.g., harness.snapshotState() and > harness.restoreState()) that force the state through the physical > serialization boundaries defined by the @StateHint annotations? This is an interesting point that I hadn't considered, but you're right, it would be great to validate state serialization to catch serialization errors before you throw your PTF at a live deployment. Bundling it in with the snapshot stuff does make sense, if it feels clunky to do (since snapshotting will also capture other facets of the PTF test) maybe we could consider adding some dedicated functions to force serialization (maybe even on every state change as an explicit configuration on the harness builder?). > 2. In the distributed runtime, registering an event-time timer for the same > key and timestamp multiple times only results in a single onTimer > execution. Will the test harness natively replicate this exact > deduplication logic, or will developers need to account for duplicate > callbacks in their test assertions? Yeah, the harness should handle this how we would expect - timer registration for the same key and ts will only result in a single onTimer callback. > > 3. Complex event processing inevitably deals with out-of-order data and > watermarking. Does the harness API plan to expose mechanisms for asserting > how the PTF handles late records (i.e., events older than the currently > simulated watermark), particularly regarding routing these records to side > outputs for historical reconciliation? At the moment no, it felt a little out of scope but could be nice to have as a stretch goal or follow up. But given this, I'm not quite sure how to handle late events in the current harness, perhaps drop them for now and consider adding some mechanism for channeling them to a side output. > 4. Since PTFs frequently interact with dynamic table changelogs, will the > test harness natively support injecting specific RowKind enumerations > (INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE) into the eval() method to > validate state retraction and update logic? Yes, it will properly handle RowKinds when you pass row's with a specific kind into processElement. > 5. PTFs often initialize custom metrics (counters, gauges) within the > open(FunctionContext) method. Will the harness automatically inject a > MetricGroup into the context to prevent NullPointerException failures > during local testing, or will developers be responsible for providing a > MetricGroup in the test? This is something I had been thinking about as well, specifically whether to also support metric introspection similar to state and timer introspection. By default, the FunctionContext uses an UnregisteredMetricsGroup which doesnt register any metrics with the registry, like a no-op sink. In this case the function will still get a MetricGroup so there shouldn't be any NPEs, but we could consider adding a MetricGroup that does track the metrics so that users can introspect them, if we feel it would be useful at some point. > 6. Given that the primary user base for PTFs resides in the SQL/Table API > ecosystem, forcing users to write imperative Java `Row.of(...)` > instantiations with deeply nested schemas can be cumbersome. Is there any > consideration for allowing the harness to ingest mock data declaratively, > perhaps via JSON payloads or DDL strings that automatically resolve to the > PTF's input types? This is a great point! And you're right that the current imperative flow is a little clunky. I think for the sake of keeping things scoped I would still stick to this current FLIP supporting just Rows for now but supporting a more ergonomic batch ingestion would be a great thing to add, especially for complex/nested schemas like you mentioned. Thank you for the thoughtful feedback! Warm regards, Mika On Thu, 26 Mar 2026, at 7:56 AM, Samrat Deb wrote: > Hi Mika, > > +1 for the feature > > I'd love to get your thoughts on the following areas: > > 1. In large-scale production deployments, one of the most frequent causes > of immediate job failure upon deployment is a NotSerializableException. > In-memory test harnesses often mask this by keeping objects in the same JVM > heap without passing them through Flink's internal serializers. Will the > harness API include explicit methods (e.g., harness.snapshotState() and > harness.restoreState()) that force the state through the physical > serialization boundaries defined by the @StateHint annotations? > > 2. In the distributed runtime, registering an event-time timer for the same > key and timestamp multiple times only results in a single onTimer > execution. Will the test harness natively replicate this exact > deduplication logic, or will developers need to account for duplicate > callbacks in their test assertions? > > 3. Complex event processing inevitably deals with out-of-order data and > watermarking. Does the harness API plan to expose mechanisms for asserting > how the PTF handles late records (i.e., events older than the currently > simulated watermark), particularly regarding routing these records to side > outputs for historical reconciliation? > > 4. Since PTFs frequently interact with dynamic table changelogs, will the > test harness natively support injecting specific RowKind enumerations > (INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE) into the eval() method to > validate state retraction and update logic? > > 5. PTFs often initialize custom metrics (counters, gauges) within the > open(FunctionContext) method. Will the harness automatically inject a > MetricGroup into the context to prevent NullPointerException failures > during local testing, or will developers be responsible for providing a > MetricGroup in the test? > > 6. Given that the primary user base for PTFs resides in the SQL/Table API > ecosystem, forcing users to write imperative Java `Row.of(...)` > instantiations with deeply nested schemas can be cumbersome. Is there any > consideration for allowing the harness to ingest mock data declaratively, > perhaps via JSON payloads or DDL strings that automatically resolve to the > PTF's input types? > > Thank you for driving this crucial initiative forward. Our team is really > looking forward to this feature. > > Bests, > Samrat > > On Wed, Mar 25, 2026 at 9:37 PM Gustavo de Morais <[email protected]> > wrote: > > > Hey Mika, thanks for the reply, it makes sense. > > > > > > On Wed, 25 Mar 2026 at 13:07, Mika Naylor <[email protected]> wrote: > > > > > Hey Gustavo, > > > > > > Sorry for missing your feedback, thank you for taking the time to look at > > > the proposal! > > > > > > I think your point on throwing errors during harness building for table > > > arguments present in the eval but not configured makes sense! In terms of > > > the processing time, you're completely right - I added a method to handle > > > processing time changes, since it would affect how state TTL is also > > > handled. > > > > > > In terms of looking at the other harnesses, you're right that I left out > > > parallelism concerns. I think for now it might be out of scope and > > warrant > > > a follow up enhancement to the harness. In terms of getting side outputs > > - > > > would this also be relevant for table functions? I wasn't very familiar > > > with them, but the Flink docs suggests this is more of a datastream thing > > > than a table thing. > > > > > > Kind regards, > > > Mika > > > > > > On Tue, 17 Mar 2026, at 12:29 PM, Gustavo de Morais wrote: > > > > Hey Mika, > > > > > > > > Thanks for driving this! In general, this is useful for prototyping and > > > in > > > > general to speed up testing. So +1. Some feedback from my side: > > > > > > > > >For a PTF that reads from a table argument that hasn't been > > configured, > > > I > > > > think it would return null, yes. > > > > I think it'd be a better dev UX if the harness throws during build > > "table > > > > argument 'purchases' is declared in eval() but was never configured. > > > > > > > > >I'm not entirely sure on the real/processing time considerations - my > > > aim > > > > here was mostly around letting users validate timer behaviour, and > > timer > > > > registration/firing in PTFs is based on watermarks > > > > You've addressed event time and we need to detail in the flip what > > would > > > be > > > > the approach for processing time. The API would ideally support similar > > > > methods as we have here > > > > > > > > > https://github.com/confluentinc/flink/blob/a7a8dba2127ad719ca7932969b2934a0955e1bba/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java#L790-L804 > > > . > > > > If you want this to be out of scope, you can mention that explicitly in > > > the > > > > FLIP. > > > > > > > > Also, comparing this to the current harness support for operators, here > > > are > > > > some other relevant things that devs might need out of harness test you > > > > might want to include or mention that they are out of scope: > > > > > > > > - Side outputs: AbstractStreamOperatorTestHarness has > > > > getSideOutput(OutputTag<X>). > > > > - No parallelism config, Snapshot is opaque: The existing harness takes > > > > maxParallelism, parallelism, and subtaskIndex. Key group assignment > > (and > > > > therefore state distribution) depends on maxParallelism. > > > > AbstractStreamOperatorTestHarness.snapshot() returns > > > OperatorSubtaskState, > > > > which you can pass to repartitionOperatorState() to simulate > > parallelism > > > > changes > > > > > > > > > > > > Kind regards, > > > > Gustavo > > > > > > > > On Thu, 12 Mar 2026 at 14:38, Mika Naylor <[email protected]> wrote: > > > > > > > > > Hey Martijn, > > > > > > > > > > Thank you for the detailed feedback! > > > > > > > > > > > 1. The FLIP has good examples of harness construction via the > > > builder, > > > > > but > > > > > > doesn't address lifecycle management. For comparison, the existing > > > > > > DataStream test harnesses (documented at [1]) have explicit open() > > > and > > > > > > implement AutoCloseable. Since PTFs can acquire resources in > > open(), > > > the > > > > > > harness needs to manage this lifecycle. Could you clarify how > > > > > > open()/close() on the underlying PTF is handled? Is close() called > > > > > > automatically, or does the user need to trigger it? An end-to-end > > > example > > > > > > showing cleanup would help. > > > > > > > > > > This is a good point! I must have missed the AutoClosable > > > implementation > > > > > in the public interfaces section - but I did intend to add it! So it > > > would > > > > > be closed automatically (but I suppose a user could also call close() > > > > > itself). For the open, I was planning on calling it during when the > > > harness > > > > > is built using `Builder.build()`. > > > > > > > > > > > 2. The existing operator test harnesses support snapshot() and > > > > > > initializeState(OperatorSubtaskState) to simulate > > checkpoint/restore > > > > > > cycles. This is important for catching state serialization bugs, > > > which > > > > > are > > > > > > a common source of production issues. The FLIP provides > > > > > withInitialState() > > > > > > for setup, but there's no way to take a snapshot mid-test and > > restore > > > > > into > > > > > > a fresh harness. Are we deliberately excluding this, or should we > > > > > consider > > > > > > adding it? > > > > > > > > > > I would absolutely consider adding this, thank you for pointing it > > > out. I > > > > > think being able to take a `.snapshot()` from a harness and then > > > initialise > > > > > a new harness via a `restore()` on the builder would make sense, as > > > well as > > > > > maybe supporting `.restore()` on the harness itself after it has been > > > built. > > > > > > > > > > > 3. Related to the above: PTFs with complex state (Map, List, POJO) > > > can > > > > > > behave differently with heap vs. RocksDB backends due to > > > serialization > > > > > > differences. The existing harnesses support setStateBackend(). > > > Should the > > > > > > PTF test harness support this as well? At minimum, it would be good > > > to > > > > > > document which backend is used by default. > > > > > > > > > > I had just intended to support heap backend and document this, but > > > this is > > > > > a good point - supporting `setStateBackend()` makes sense here, > > > similar to > > > > > the existing harnesses. I'll add this to the spec and document the > > > default. > > > > > > > > > > > 4. withTableArgument(String tableName, List<Row> rows) is useful > > for > > > > > > testing join-like PTFs. The builder Javadoc describes when static > > > rows > > > > > are > > > > > > passed and how table semantics (ROW_SEMANTIC vs SET_SEMANTIC) > > affect > > > > > > delivery, but a few things remain unclear: How is the schema for > > > these > > > > > rows > > > > > > determined: is it inferred from the Row structure, or does it need > > to > > > > > match > > > > > > the eval() signature's type hints? And what happens if a PTF reads > > > from a > > > > > > table argument that hasn't been configured via the builder: does it > > > > > receive > > > > > > null, or does the harness throw at build time? > > > > > > > > > > I wasn't quite sure what the right approach is here, I thought that > > > > > inferring it from the Row structure would work but it feels odd to > > > ignore > > > > > the eval type hints. Perhaps I can try the Row structure approach, > > and > > > it > > > > > feels unergonomic explore the second approach. > > > > > > > > > > For a PTF that reads from a table argument that hasnt been > > configured, > > > I > > > > > think it would return null, yes. > > > > > > > > > > > > > > > > > - Timer.hasFired() is typed as boolean (primitive) but annotated > > > > > @Nullable. > > > > > > This looks like a bug -> should it be Boolean (boxed)? > > > > > > > > > > Oops, good catch. I'm not sure why I marked this as nullable, either > > a > > > > > timer has fired or it hasn't, im not sure returning null makes sense. > > > Maybe > > > > > returning a non-nullable primitive is fine here. > > > > > > > > > > > - getOutputByKind(RowKind) implies that output preserves RowKind > > > > > metadata. > > > > > > Could you confirm that getOutput() also retains this? The generic > > > <OUT> > > > > > > type parameter could use more specification on what's guaranteed. > > > > > > > > > > I would like getOutput to somehow retain this, but I'm not quite sure > > > how > > > > > the return type could look like in this case. Perhaps `RowData`? I'm > > > not > > > > > entirely sure if we have an interface that would cleanly capture > > this. > > > > > > > > > > > - Have you considered optional changelog consistency validation > > > (e.g., > > > > > > verifying UPDATE_BEFORE precedes UPDATE_AFTER for the same key)? > > > Could > > > > > be a > > > > > > useful debugging aid. > > > > > > > > > > I hadn't, no, but this is a useful idea. Could be togglable on the > > > builder > > > > > with a `.withChangelogValidation` method. > > > > > > > > > > > - What's the error model when eval() or a timer callback throws? > > > > > Propagated > > > > > > directly, or wrapped? > > > > > > > > > > I would say propagated directly, unless you think wrapping them could > > > be > > > > > useful here. > > > > > > > > > > > - The test plan mentions leveraging > > ProcessTableFunctionTestPrograms. > > > > > Could > > > > > > you clarify whether the harness will be validated against those > > > > > scenarios, > > > > > > or whether it's intended to replace them for certain use cases? > > > > > > > > > > I think just validated against them, as a way of making sure that the > > > > > harness covers the right set of features we want to capture. I don't > > > think > > > > > it would replace them in this case. > > > > > > > > > > Thank you a ton for the feedback and ideas! I will update the FLIP > > > > > documentation based on them, it's very much appreciated. > > > > > > > > > > Kind regards, > > > > > Mika > > > > > > > > > > On Wed, 11 Mar 2026, at 6:02 PM, Martijn Visser wrote: > > > > > > Hey Mika, > > > > > > > > > > > > Thanks for putting this FLIP together. A dedicated test harness for > > > PTFs > > > > > is > > > > > > a welcome addition. The builder-pattern API and the state/timer > > > > > > introspection features are well thought out. > > > > > > > > > > > > I have a few questions and suggestions after reviewing the FLIP: > > > > > > > > > > > > 1. The FLIP has good examples of harness construction via the > > > builder, > > > > > but > > > > > > doesn't address lifecycle management. For comparison, the existing > > > > > > DataStream test harnesses (documented at [1]) have explicit open() > > > and > > > > > > implement AutoCloseable. Since PTFs can acquire resources in > > open(), > > > the > > > > > > harness needs to manage this lifecycle. Could you clarify how > > > > > > open()/close() on the underlying PTF is handled? Is close() called > > > > > > automatically, or does the user need to trigger it? An end-to-end > > > example > > > > > > showing cleanup would help. > > > > > > > > > > > > 2. The existing operator test harnesses support snapshot() and > > > > > > initializeState(OperatorSubtaskState) to simulate > > checkpoint/restore > > > > > > cycles. This is important for catching state serialization bugs, > > > which > > > > > are > > > > > > a common source of production issues. The FLIP provides > > > > > withInitialState() > > > > > > for setup, but there's no way to take a snapshot mid-test and > > restore > > > > > into > > > > > > a fresh harness. Are we deliberately excluding this, or should we > > > > > consider > > > > > > adding it? > > > > > > > > > > > > 3. Related to the above: PTFs with complex state (Map, List, POJO) > > > can > > > > > > behave differently with heap vs. RocksDB backends due to > > > serialization > > > > > > differences. The existing harnesses support setStateBackend(). > > > Should the > > > > > > PTF test harness support this as well? At minimum, it would be good > > > to > > > > > > document which backend is used by default. > > > > > > > > > > > > 4. withTableArgument(String tableName, List<Row> rows) is useful > > for > > > > > > testing join-like PTFs. The builder Javadoc describes when static > > > rows > > > > > are > > > > > > passed and how table semantics (ROW_SEMANTIC vs SET_SEMANTIC) > > affect > > > > > > delivery, but a few things remain unclear: How is the schema for > > > these > > > > > rows > > > > > > determined: is it inferred from the Row structure, or does it need > > to > > > > > match > > > > > > the eval() signature's type hints? And what happens if a PTF reads > > > from a > > > > > > table argument that hasn't been configured via the builder: does it > > > > > receive > > > > > > null, or does the harness throw at build time? > > > > > > > > > > > > A few smaller points: > > > > > > > > > > > > - Timer.hasFired() is typed as boolean (primitive) but annotated > > > > > @Nullable. > > > > > > This looks like a bug -> should it be Boolean (boxed)? > > > > > > - getOutputByKind(RowKind) implies that output preserves RowKind > > > > > metadata. > > > > > > Could you confirm that getOutput() also retains this? The generic > > > <OUT> > > > > > > type parameter could use more specification on what's guaranteed. > > > > > > - Have you considered optional changelog consistency validation > > > (e.g., > > > > > > verifying UPDATE_BEFORE precedes UPDATE_AFTER for the same key)? > > > Could > > > > > be a > > > > > > useful debugging aid. > > > > > > - What's the error model when eval() or a timer callback throws? > > > > > Propagated > > > > > > directly, or wrapped? > > > > > > - The test plan mentions leveraging > > ProcessTableFunctionTestPrograms. > > > > > Could > > > > > > you clarify whether the harness will be validated against those > > > > > scenarios, > > > > > > or whether it's intended to replace them for certain use cases? > > > > > > > > > > > > Overall I'm +1 on the direction. The core API design is clean and > > > covers > > > > > > the main testing needs well. Addressing the lifecycle and > > > > > > checkpoint/restore gaps would bring it in line with what Flink > > users > > > > > > already have for DataStream UDF testing. > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Martijn > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators > > > > > > > > > > > > On Fri, Mar 6, 2026 at 9:30 AM Mika Naylor <[email protected]> > > > wrote: > > > > > > > > > > > > > Hey David, > > > > > > > > > > > > > > Yeah, I think in terms of scope I aim for more providing a > > > framework > > > > > for > > > > > > > unit testing the behavior of custom PTFs. I'd like to include as > > > much > > > > > > > validation as possible but there might be validation steps that > > > aren't > > > > > > > possible to do without dipping into the engine side of things. > > > > > > > > > > > > > > I'm not entirely sure on the real/processing time considerations > > - > > > my > > > > > aim > > > > > > > here was mostly around letting users validate timer behaviour, > > and > > > > > timer > > > > > > > registration/firing in PTFs is based on watermarks, if I read the > > > doc > > > > > > > correctly. > > > > > > > > > > > > > > Kind regards, > > > > > > > Mika > > > > > > > > > > > > > > On Wed, 4 Mar 2026, at 10:38 AM, David Radley wrote: > > > > > > > > Hi Mika, > > > > > > > > This sounds like a good idea, in terms of scope, Is the idea > > that > > > > > this > > > > > > > is purely for unit tests or is this additionally proposed as > > > > > validation / > > > > > > > test harness for use when developing custom PTFs. > > > > > > > > I guess this allows us to create a common set of tests that all > > > PTFs > > > > > > > need to pass using this harness. > > > > > > > > > > > > > > > > I would assume there are real (not event) time considerations > > > for > > > > > some > > > > > > > PTFs, it would be worth mentioning how we should handle that. > > > > > > > > > > > > > > > > Kind regards, David. > > > > > > > > > > > > > > > > From: Mika Naylor <[email protected]> > > > > > > > > Date: Tuesday, 3 March 2026 at 16:46 > > > > > > > > To: [email protected] <[email protected]> > > > > > > > > Subject: [EXTERNAL] [DISCUSS] FLIP-567: Introduce a > > > > > ProcessTableFunction > > > > > > > Test Harness > > > > > > > > > > > > > > > > Hey everyone! > > > > > > > > > > > > > > > > I would like to kick off a discussion on FLIP-567: Introduce a > > > > > > > ProcessTableFunction Test Harness[1]. > > > > > > > > > > > > > > > > Currently, testing PTFs require full integration tests against > > a > > > > > running > > > > > > > Flink cluster. This FLIP would introduce a developer-friendly > > test > > > > > harness > > > > > > > for unit testing PTFs and would provide introspection to output, > > > state, > > > > > > > timers, and watermarks for assertions and behaviour validation. > > > This > > > > > would > > > > > > > let developers iterate and test their PTFs without needing to > > run a > > > > > > > fullscale integration test against a live Flink cluster. > > > > > > > > > > > > > > > > Would love any thoughts and feedback the community might have > > on > > > this > > > > > > > proposal. > > > > > > > > > > > > > > > > Kind regards, > > > > > > > > Mika Naylor > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-567%3A+Introduce+a+ProcessTableFunction+Test+Harness > > > > > > > > > > > > > > > > Unless otherwise stated above: > > > > > > > > > > > > > > > > IBM United Kingdom Limited > > > > > > > > Registered in England and Wales with number 741598 > > > > > > > > Registered office: Building C, IBM Hursley Office, Hursley Park > > > Road, > > > > > > > Winchester, Hampshire SO21 2JN > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
