Thanks Gustavo,

Thanks for the review.
I have modified the doc and added a minimal SQL/Table example as well.

On Tue, Oct 14, 2025 at 6:03 PM Poorvank Bhatia <[email protected]>
wrote:

> Thanks Etienne,
>
> The pointers are super helpful. The proposal is very much in the same
> spirit:
>
>    1. Beam TestPipeline ⇄ PipelineTestRunner (own the job lifecycle, wire
>    sources/sinks, await completion).
>    2. Beam TestStream ⇄ ManualPipelineSource/Sink (inject elements +
>    watermarks deterministically; finish stream).
>    3. Beam PAssert ⇄ SinkAssertions (eventually/any-order/exact-order
>    checks; we’re also adding side-output assertions).
>
> The key difference is that this is driven by Flink’s runtime.
>
>
> On Mon, Oct 13, 2025 at 8:04 PM Gustavo de Morais <[email protected]>
> wrote:
>
>> Hey Poorvank,
>>
>> Thanks for the proposal. It's nice that it's compatible with both Table &
>> DataStream API’s. Could you add a basic example with TableAPI or just
>> extend the current one showing how it'd look like?
>>
>> Kind regards,
>> Gustavo
>>
>>
>>
>> On Mon, 13 Oct 2025 at 16:14, Etienne Chauchot <[email protected]>
>> wrote:
>>
>> > Hi Poorvank,
>> >
>> > Thanks for this proposition. I agree with the need and the general
>> > architecture.
>> >
>> > In Apache Beam project there was a similar test framework that allowed
>> > to run a complete pipeline in test with control on watermark (set the
>> > watermark of the source), processing time (change clock time), wait for
>> > pipeline finish and then do some assertions on the output stream.
>> >
>> > Here are the pointers to the different classes to take some inspiration
>> > from:
>> >
>> > -
>> >
>> >
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
>> >
>> > -
>> >
>> >
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
>> >
>> > -
>> >
>> >
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
>> >
>> > Best
>> >
>> > Etienne
>> >
>> > Le 07/10/2025 à 05:57, Poorvank Bhatia a écrit :
>> > > Hi Samrat,
>> > >
>> > > Thanks for the review. Please find the responses inline below.
>> > >
>> > > 1. For complex stateful applications, verifying only the final output
>> at
>> > >> the sink may not be sufficient. Sometimes, there is a need to
>> validate
>> > the
>> > >> intermediate state being built up within operators to ensure
>> > correctness.
>> > >> Is there a mechanism to inspect and verify the contents of an
>> operator's
>> > >> state?
>> > >
>> > >   IMO For fine-grained, deterministic validation of
>> > Value/List/Map/Broadcast
>> > > state and processing-time timers, the existing Operator/Function test
>> > > harnesses are the right tool
>> > >
>> > > A common pattern in many of the Flink jobs is the use of side outputs
>> to
>> > >> route different types of data to different downstream processes. The
>> > >> singular *assertSink() *method appears to target only the main data
>> > stream.
>> > >> Can you consider extending the interface to allow tests to capture
>> and
>> > >> assert data emitted to named side outputs?
>> > >> This could take the form of an assertion method like
>> > >> *assertSideOutput(OutputTag<T>
>> > >> tag, T... expected)*, enabling comprehensive testing of pipelines
>> with
>> > side
>> > >> output logic. WDUT?
>> > >
>> > > Agreed. Can add first-class support for side outputs by
>> > > registering OutputTag<T>s and collecting them via a named buffer,
>> e.g.:
>> > >
>> > >     -
>> > >
>> > >     Runner: withSideOutput(OutputTag<T> tag[, String name])
>> > >     -
>> > >
>> > >     Assertions: assertSideOutput(tag).receivedExactlyInAnyOrder(...)
>> > >     Implementation reuses the same shared sink buffer under a
>> namespaced
>> > key
>> > >     for each tag.
>> > >
>> > >
>> > >    a. *sendWatermarkAndAwaitIdle()*: A method that sends a watermark
>> > >> and then blocks until the framework can determine that all in-flight
>> > >> records and timers triggered by that watermark have been fully
>> > processed.
>> > >> This would help users to add tests for windowed operations by
>> removing
>> > the
>> > >> need for manual waits or sleeps.
>> > >
>> > > Not sure what would be the right way to do that. The runtime doesn’t
>> > expose
>> > > a reliable, global “quiescent after W” signal. Watermark firings can
>> > > cascade, and in-flight work sits in mailboxes, and async callbacks—so
>> > > "fully processed" is unverifiable end-to-end.
>> > >
>> > >
>> > >>           b.* advanceProcessingTime(Duration duration)*: Method to
>> > advance
>> > >> the notion of processing time within the test environment. This will
>> > help
>> > >> add deterministic testing logic that depends on processing-time
>> timers
>> > and
>> > >> windows.
>> > >>
>> > >> AFAIK, same as above i think the the Operator/Function harnesses
>> already
>> > > support deterministic processing-time control.
>> > >
>> > > Thanks,
>> > > Poorvank Bhatia
>> > >
>> > > On Wed, Sep 24, 2025 at 11:59 AM Samrat Deb <[email protected]>
>> > wrote:
>> > >
>> > >> Thank you, Poorvank, for the proposal. This would be a valuable
>> addition
>> > >> toward building reliable and well-tested Flink applications. The
>> > proposal
>> > >> is already in excellent shape.
>> > >>
>> > >> Below are my take and suggestions on the proposal :
>> > >>
>> > >> 1. For complex stateful applications, verifying only the final
>> output at
>> > >> the sink may not be sufficient. Sometimes, there is a need to
>> validate
>> > the
>> > >> intermediate state being built up within operators to ensure
>> > correctness.
>> > >> Is there a mechanism to inspect and verify the contents of an
>> operator's
>> > >> state?
>> > >>
>> > >> 2. A common pattern in many of the Flink jobs is the use of side
>> > outputs to
>> > >> route different types of data to different downstream processes. The
>> > >> singular *assertSink() *method appears to target only the main data
>> > stream.
>> > >> Can you consider extending the interface to allow tests to capture
>> and
>> > >> assert data emitted to named side outputs?
>> > >> This could take the form of an assertion method like
>> > >> *assertSideOutput(OutputTag<T>
>> > >> tag, T... expected)*, enabling comprehensive testing of pipelines
>> with
>> > side
>> > >> output logic. WDUT?
>> > >>
>> > >> 3. The *sendWatermark()* method is a solid foundation for event-time
>> > >> testing. However, more complex scenarios involving windows and timers
>> > would
>> > >> benefit from finer-grained control.
>> > >> My suggestion would be to consider adding the following 2 methods :
>> > >>
>> > >>           a. *sendWatermarkAndAwaitIdle()*: A method that sends a
>> > watermark
>> > >> and then blocks until the framework can determine that all in-flight
>> > >> records and timers triggered by that watermark have been fully
>> > processed.
>> > >> This would help users to add tests for windowed operations by
>> removing
>> > the
>> > >> need for manual waits or sleeps.
>> > >>
>> > >>           b.* advanceProcessingTime(Duration duration)*: Method to
>> > advance
>> > >> the notion of processing time within the test environment. This will
>> > help
>> > >> add deterministic testing logic that depends on processing-time
>> timers
>> > and
>> > >> windows.
>> > >>
>> > >>
>> > >> Bests,
>> > >> Samrat
>> > >>
>> > >> On Wed, Sep 17, 2025 at 1:48 AM Őrhidi Mátyás <
>> [email protected]>
>> > >> wrote:
>> > >>
>> > >>> Thanks Poorvank,
>> > >>>
>> > >>> The proposal looks great.
>> > >>>
>> > >>> Thanks,
>> > >>> Matyas
>> > >>>
>> > >>> On Thu, Sep 11, 2025 at 3:44 AM Poorvank Bhatia <
>> > [email protected]>
>> > >>> wrote:
>> > >>>
>> > >>>> Hi all,
>> > >>>> I’d like to propose a Pipeline Testing Framework (on Source/Sink
>> V2)
>> > >> as a
>> > >>>> part of flink's testing framework, built on concepts presented by
>> the
>> > >>> talk
>> > >>>> *“*Testing Production Streaming Applications
>> > >>>> <https://www.youtube.com/watch?v=lXHleussX9Q>” (Gyula Fóra &
>> Mátyás
>> > >>>> Orhidi).
>> > >>>>
>> > >>>> The aim is to let users write end-to-end pipeline flow tests —
>> > >> replacing
>> > >>>> real connectors with lightweight TestSources/TestSinks — so they
>> can
>> > >>>> deterministically inject records/watermarks and assert outputs
>> under
>> > >>>> parallelism.
>> > >>>>
>> > >>>> Unlike MiniCluster, local env, or operator harnesses, this
>> provides a
>> > >>> small
>> > >>>> Source/Sink V2-based *testkit* (send(), sendWatermark(), finish(),
>> > >>>> assertSink()), backed by isolated in-JVM buffers for reproducible
>> > >>> control.
>> > >>>> The focus is on testing *pipeline logic and wiring*, not connector
>> > >>>> implementations themselves.
>> > >>>>
>> > >>>> For more details refer to this doc
>> > >>>> <
>> > >>>>
>> > >>
>> >
>> https://docs.google.com/document/d/1lwA4BE4vHTkIBQ-IyEBjPh2l7y7Z3MGzlbOlDaKDnzE/edit?tab=t.0
>> > >>>> .
>> > >>>> Looking forward to your feedback.
>> > >>>> Thanks,
>> > >>>> Poorvank Bhatia
>> > >>>>
>> >
>>
>

Reply via email to