Hey Poorvank,

Thanks for the proposal. It's nice that it's compatible with both Table &
DataStream API’s. Could you add a basic example with TableAPI or just
extend the current one showing how it'd look like?

Kind regards,
Gustavo



On Mon, 13 Oct 2025 at 16:14, Etienne Chauchot <[email protected]> wrote:

> Hi Poorvank,
>
> Thanks for this proposition. I agree with the need and the general
> architecture.
>
> In Apache Beam project there was a similar test framework that allowed
> to run a complete pipeline in test with control on watermark (set the
> watermark of the source), processing time (change clock time), wait for
> pipeline finish and then do some assertions on the output stream.
>
> Here are the pointers to the different classes to take some inspiration
> from:
>
> -
>
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
>
> -
>
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
>
> -
>
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
>
> Best
>
> Etienne
>
> Le 07/10/2025 à 05:57, Poorvank Bhatia a écrit :
> > Hi Samrat,
> >
> > Thanks for the review. Please find the responses inline below.
> >
> > 1. For complex stateful applications, verifying only the final output at
> >> the sink may not be sufficient. Sometimes, there is a need to validate
> the
> >> intermediate state being built up within operators to ensure
> correctness.
> >> Is there a mechanism to inspect and verify the contents of an operator's
> >> state?
> >
> >   IMO For fine-grained, deterministic validation of
> Value/List/Map/Broadcast
> > state and processing-time timers, the existing Operator/Function test
> > harnesses are the right tool
> >
> > A common pattern in many of the Flink jobs is the use of side outputs to
> >> route different types of data to different downstream processes. The
> >> singular *assertSink() *method appears to target only the main data
> stream.
> >> Can you consider extending the interface to allow tests to capture and
> >> assert data emitted to named side outputs?
> >> This could take the form of an assertion method like
> >> *assertSideOutput(OutputTag<T>
> >> tag, T... expected)*, enabling comprehensive testing of pipelines with
> side
> >> output logic. WDUT?
> >
> > Agreed. Can add first-class support for side outputs by
> > registering OutputTag<T>s and collecting them via a named buffer, e.g.:
> >
> >     -
> >
> >     Runner: withSideOutput(OutputTag<T> tag[, String name])
> >     -
> >
> >     Assertions: assertSideOutput(tag).receivedExactlyInAnyOrder(...)
> >     Implementation reuses the same shared sink buffer under a namespaced
> key
> >     for each tag.
> >
> >
> >    a. *sendWatermarkAndAwaitIdle()*: A method that sends a watermark
> >> and then blocks until the framework can determine that all in-flight
> >> records and timers triggered by that watermark have been fully
> processed.
> >> This would help users to add tests for windowed operations by removing
> the
> >> need for manual waits or sleeps.
> >
> > Not sure what would be the right way to do that. The runtime doesn’t
> expose
> > a reliable, global “quiescent after W” signal. Watermark firings can
> > cascade, and in-flight work sits in mailboxes, and async callbacks—so
> > "fully processed" is unverifiable end-to-end.
> >
> >
> >>           b.* advanceProcessingTime(Duration duration)*: Method to
> advance
> >> the notion of processing time within the test environment. This will
> help
> >> add deterministic testing logic that depends on processing-time timers
> and
> >> windows.
> >>
> >> AFAIK, same as above i think the the Operator/Function harnesses already
> > support deterministic processing-time control.
> >
> > Thanks,
> > Poorvank Bhatia
> >
> > On Wed, Sep 24, 2025 at 11:59 AM Samrat Deb <[email protected]>
> wrote:
> >
> >> Thank you, Poorvank, for the proposal. This would be a valuable addition
> >> toward building reliable and well-tested Flink applications. The
> proposal
> >> is already in excellent shape.
> >>
> >> Below are my take and suggestions on the proposal :
> >>
> >> 1. For complex stateful applications, verifying only the final output at
> >> the sink may not be sufficient. Sometimes, there is a need to validate
> the
> >> intermediate state being built up within operators to ensure
> correctness.
> >> Is there a mechanism to inspect and verify the contents of an operator's
> >> state?
> >>
> >> 2. A common pattern in many of the Flink jobs is the use of side
> outputs to
> >> route different types of data to different downstream processes. The
> >> singular *assertSink() *method appears to target only the main data
> stream.
> >> Can you consider extending the interface to allow tests to capture and
> >> assert data emitted to named side outputs?
> >> This could take the form of an assertion method like
> >> *assertSideOutput(OutputTag<T>
> >> tag, T... expected)*, enabling comprehensive testing of pipelines with
> side
> >> output logic. WDUT?
> >>
> >> 3. The *sendWatermark()* method is a solid foundation for event-time
> >> testing. However, more complex scenarios involving windows and timers
> would
> >> benefit from finer-grained control.
> >> My suggestion would be to consider adding the following 2 methods :
> >>
> >>           a. *sendWatermarkAndAwaitIdle()*: A method that sends a
> watermark
> >> and then blocks until the framework can determine that all in-flight
> >> records and timers triggered by that watermark have been fully
> processed.
> >> This would help users to add tests for windowed operations by removing
> the
> >> need for manual waits or sleeps.
> >>
> >>           b.* advanceProcessingTime(Duration duration)*: Method to
> advance
> >> the notion of processing time within the test environment. This will
> help
> >> add deterministic testing logic that depends on processing-time timers
> and
> >> windows.
> >>
> >>
> >> Bests,
> >> Samrat
> >>
> >> On Wed, Sep 17, 2025 at 1:48 AM Őrhidi Mátyás <[email protected]>
> >> wrote:
> >>
> >>> Thanks Poorvank,
> >>>
> >>> The proposal looks great.
> >>>
> >>> Thanks,
> >>> Matyas
> >>>
> >>> On Thu, Sep 11, 2025 at 3:44 AM Poorvank Bhatia <
> [email protected]>
> >>> wrote:
> >>>
> >>>> Hi all,
> >>>> I’d like to propose a Pipeline Testing Framework (on Source/Sink V2)
> >> as a
> >>>> part of flink's testing framework, built on concepts presented by the
> >>> talk
> >>>> *“*Testing Production Streaming Applications
> >>>> <https://www.youtube.com/watch?v=lXHleussX9Q>” (Gyula Fóra & Mátyás
> >>>> Orhidi).
> >>>>
> >>>> The aim is to let users write end-to-end pipeline flow tests —
> >> replacing
> >>>> real connectors with lightweight TestSources/TestSinks — so they can
> >>>> deterministically inject records/watermarks and assert outputs under
> >>>> parallelism.
> >>>>
> >>>> Unlike MiniCluster, local env, or operator harnesses, this provides a
> >>> small
> >>>> Source/Sink V2-based *testkit* (send(), sendWatermark(), finish(),
> >>>> assertSink()), backed by isolated in-JVM buffers for reproducible
> >>> control.
> >>>> The focus is on testing *pipeline logic and wiring*, not connector
> >>>> implementations themselves.
> >>>>
> >>>> For more details refer to this doc
> >>>> <
> >>>>
> >>
> https://docs.google.com/document/d/1lwA4BE4vHTkIBQ-IyEBjPh2l7y7Z3MGzlbOlDaKDnzE/edit?tab=t.0
> >>>> .
> >>>> Looking forward to your feedback.
> >>>> Thanks,
> >>>> Poorvank Bhatia
> >>>>
>

Reply via email to