Hey Poorvank, Thanks for the proposal. It's nice that it's compatible with both Table & DataStream API’s. Could you add a basic example with TableAPI or just extend the current one showing how it'd look like?
Kind regards, Gustavo On Mon, 13 Oct 2025 at 16:14, Etienne Chauchot <[email protected]> wrote: > Hi Poorvank, > > Thanks for this proposition. I agree with the need and the general > architecture. > > In Apache Beam project there was a similar test framework that allowed > to run a complete pipeline in test with control on watermark (set the > watermark of the source), processing time (change clock time), wait for > pipeline finish and then do some assertions on the output stream. > > Here are the pointers to the different classes to take some inspiration > from: > > - > > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java > > - > > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java > > - > > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java > > Best > > Etienne > > Le 07/10/2025 à 05:57, Poorvank Bhatia a écrit : > > Hi Samrat, > > > > Thanks for the review. Please find the responses inline below. > > > > 1. For complex stateful applications, verifying only the final output at > >> the sink may not be sufficient. Sometimes, there is a need to validate > the > >> intermediate state being built up within operators to ensure > correctness. > >> Is there a mechanism to inspect and verify the contents of an operator's > >> state? > > > > IMO For fine-grained, deterministic validation of > Value/List/Map/Broadcast > > state and processing-time timers, the existing Operator/Function test > > harnesses are the right tool > > > > A common pattern in many of the Flink jobs is the use of side outputs to > >> route different types of data to different downstream processes. The > >> singular *assertSink() *method appears to target only the main data > stream. > >> Can you consider extending the interface to allow tests to capture and > >> assert data emitted to named side outputs? > >> This could take the form of an assertion method like > >> *assertSideOutput(OutputTag<T> > >> tag, T... expected)*, enabling comprehensive testing of pipelines with > side > >> output logic. WDUT? > > > > Agreed. Can add first-class support for side outputs by > > registering OutputTag<T>s and collecting them via a named buffer, e.g.: > > > > - > > > > Runner: withSideOutput(OutputTag<T> tag[, String name]) > > - > > > > Assertions: assertSideOutput(tag).receivedExactlyInAnyOrder(...) > > Implementation reuses the same shared sink buffer under a namespaced > key > > for each tag. > > > > > > a. *sendWatermarkAndAwaitIdle()*: A method that sends a watermark > >> and then blocks until the framework can determine that all in-flight > >> records and timers triggered by that watermark have been fully > processed. > >> This would help users to add tests for windowed operations by removing > the > >> need for manual waits or sleeps. > > > > Not sure what would be the right way to do that. The runtime doesn’t > expose > > a reliable, global “quiescent after W” signal. Watermark firings can > > cascade, and in-flight work sits in mailboxes, and async callbacks—so > > "fully processed" is unverifiable end-to-end. > > > > > >> b.* advanceProcessingTime(Duration duration)*: Method to > advance > >> the notion of processing time within the test environment. This will > help > >> add deterministic testing logic that depends on processing-time timers > and > >> windows. > >> > >> AFAIK, same as above i think the the Operator/Function harnesses already > > support deterministic processing-time control. > > > > Thanks, > > Poorvank Bhatia > > > > On Wed, Sep 24, 2025 at 11:59 AM Samrat Deb <[email protected]> > wrote: > > > >> Thank you, Poorvank, for the proposal. This would be a valuable addition > >> toward building reliable and well-tested Flink applications. The > proposal > >> is already in excellent shape. > >> > >> Below are my take and suggestions on the proposal : > >> > >> 1. For complex stateful applications, verifying only the final output at > >> the sink may not be sufficient. Sometimes, there is a need to validate > the > >> intermediate state being built up within operators to ensure > correctness. > >> Is there a mechanism to inspect and verify the contents of an operator's > >> state? > >> > >> 2. A common pattern in many of the Flink jobs is the use of side > outputs to > >> route different types of data to different downstream processes. The > >> singular *assertSink() *method appears to target only the main data > stream. > >> Can you consider extending the interface to allow tests to capture and > >> assert data emitted to named side outputs? > >> This could take the form of an assertion method like > >> *assertSideOutput(OutputTag<T> > >> tag, T... expected)*, enabling comprehensive testing of pipelines with > side > >> output logic. WDUT? > >> > >> 3. The *sendWatermark()* method is a solid foundation for event-time > >> testing. However, more complex scenarios involving windows and timers > would > >> benefit from finer-grained control. > >> My suggestion would be to consider adding the following 2 methods : > >> > >> a. *sendWatermarkAndAwaitIdle()*: A method that sends a > watermark > >> and then blocks until the framework can determine that all in-flight > >> records and timers triggered by that watermark have been fully > processed. > >> This would help users to add tests for windowed operations by removing > the > >> need for manual waits or sleeps. > >> > >> b.* advanceProcessingTime(Duration duration)*: Method to > advance > >> the notion of processing time within the test environment. This will > help > >> add deterministic testing logic that depends on processing-time timers > and > >> windows. > >> > >> > >> Bests, > >> Samrat > >> > >> On Wed, Sep 17, 2025 at 1:48 AM Őrhidi Mátyás <[email protected]> > >> wrote: > >> > >>> Thanks Poorvank, > >>> > >>> The proposal looks great. > >>> > >>> Thanks, > >>> Matyas > >>> > >>> On Thu, Sep 11, 2025 at 3:44 AM Poorvank Bhatia < > [email protected]> > >>> wrote: > >>> > >>>> Hi all, > >>>> I’d like to propose a Pipeline Testing Framework (on Source/Sink V2) > >> as a > >>>> part of flink's testing framework, built on concepts presented by the > >>> talk > >>>> *“*Testing Production Streaming Applications > >>>> <https://www.youtube.com/watch?v=lXHleussX9Q>” (Gyula Fóra & Mátyás > >>>> Orhidi). > >>>> > >>>> The aim is to let users write end-to-end pipeline flow tests — > >> replacing > >>>> real connectors with lightweight TestSources/TestSinks — so they can > >>>> deterministically inject records/watermarks and assert outputs under > >>>> parallelism. > >>>> > >>>> Unlike MiniCluster, local env, or operator harnesses, this provides a > >>> small > >>>> Source/Sink V2-based *testkit* (send(), sendWatermark(), finish(), > >>>> assertSink()), backed by isolated in-JVM buffers for reproducible > >>> control. > >>>> The focus is on testing *pipeline logic and wiring*, not connector > >>>> implementations themselves. > >>>> > >>>> For more details refer to this doc > >>>> < > >>>> > >> > https://docs.google.com/document/d/1lwA4BE4vHTkIBQ-IyEBjPh2l7y7Z3MGzlbOlDaKDnzE/edit?tab=t.0 > >>>> . > >>>> Looking forward to your feedback. > >>>> Thanks, > >>>> Poorvank Bhatia > >>>> >
