Hey Ferenc Csaky, Thanks for the feedback—great catch. The intent was a fluent API that returns the same instance (“returns this”), but I agree a Builder makes the design clearer and keeps the runner immutable. I have switched the doc to a builder-style API, updated the examples. Thanks!
On Fri, Oct 17, 2025 at 4:40 PM Ferenc Csaky <[email protected]> wrote: > Hi Poorvank, > > Thanks for starting this discussion, I also agree that such functionality > would > be very useful, +1 for the idea. > > One question/suggestion. Based on the current class design it seems every > `with...` method of `PipelineTestRunner` produces a new instance, which > does > not seem reasonable. I believe that the current design should utilize the > builder pattern instead of a static factory method, and all `with...` > should be > moved into its builder. WDYT? > > Best, > Ferenc > > > On Thursday, October 16th, 2025 at 06:48, Gyula Fora <[email protected]> > wrote: > > > > > > > Hey Poorvank! > > > > +1 from my side. I think this would be a nice addition to Flink. The > built in testing utilities are more targeted towards single operator > tests/completely black box testing. > > > > In our experience we have found pipeline testing utils exceedingly > useful. > > > > Cheers > > Gyula > > > > On 2025/10/14 12:33:21 Poorvank Bhatia wrote: > > > > > Thanks Gustavo, > > > > > > Thanks for the review. > > > I have modified the doc and added a minimal SQL/Table example as well. > > > > > > On Tue, Oct 14, 2025 at 6:03 PM Poorvank Bhatia [email protected] > > > wrote: > > > > > > > Thanks Etienne, > > > > > > > > The pointers are super helpful. The proposal is very much in the same > > > > spirit: > > > > > > > > 1. Beam TestPipeline ⇄ PipelineTestRunner (own the job lifecycle, > wire > > > > sources/sinks, await completion). > > > > 2. Beam TestStream ⇄ ManualPipelineSource/Sink (inject elements + > > > > watermarks deterministically; finish stream). > > > > 3. Beam PAssert ⇄ SinkAssertions (eventually/any-order/exact-order > > > > checks; we’re also adding side-output assertions). > > > > > > > > The key difference is that this is driven by Flink’s runtime. > > > > > > > > On Mon, Oct 13, 2025 at 8:04 PM Gustavo de Morais > [email protected] > > > > wrote: > > > > > > > > > Hey Poorvank, > > > > > > > > > > Thanks for the proposal. It's nice that it's compatible with both > Table & > > > > > DataStream API’s. Could you add a basic example with TableAPI or > just > > > > > extend the current one showing how it'd look like? > > > > > > > > > > Kind regards, > > > > > Gustavo > > > > > > > > > > On Mon, 13 Oct 2025 at 16:14, Etienne Chauchot > [email protected] > > > > > wrote: > > > > > > > > > > > Hi Poorvank, > > > > > > > > > > > > Thanks for this proposition. I agree with the need and the > general > > > > > > architecture. > > > > > > > > > > > > In Apache Beam project there was a similar test framework that > allowed > > > > > > to run a complete pipeline in test with control on watermark > (set the > > > > > > watermark of the source), processing time (change clock time), > wait for > > > > > > pipeline finish and then do some assertions on the output stream. > > > > > > > > > > > > Here are the pointers to the different classes to take some > inspiration > > > > > > from: > > > > > > > > > > > > - > > > > > > > > > > > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java > > > > > > > > > > > - > > > > > > > > > > > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java > > > > > > > > > > > - > > > > > > > > > > > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java > > > > > > > > > > > Best > > > > > > > > > > > > Etienne > > > > > > > > > > > > Le 07/10/2025 à 05:57, Poorvank Bhatia a écrit : > > > > > > > > > > > > > Hi Samrat, > > > > > > > > > > > > > > Thanks for the review. Please find the responses inline below. > > > > > > > > > > > > > > 1. For complex stateful applications, verifying only the final > output > > > > > > > at > > > > > > > > > > > > > > > the sink may not be sufficient. Sometimes, there is a need to > > > > > > > > validate > > > > > > > > the > > > > > > > > intermediate state being built up within operators to ensure > > > > > > > > correctness. > > > > > > > > Is there a mechanism to inspect and verify the contents of an > > > > > > > > operator's > > > > > > > > state? > > > > > > > > > > > > > > IMO For fine-grained, deterministic validation of > > > > > > > Value/List/Map/Broadcast > > > > > > > state and processing-time timers, the existing > Operator/Function test > > > > > > > harnesses are the right tool > > > > > > > > > > > > > > A common pattern in many of the Flink jobs is the use of side > outputs > > > > > > > to > > > > > > > > > > > > > > > route different types of data to different downstream > processes. The > > > > > > > > singular *assertSink() *method appears to target only the > main data > > > > > > > > stream. > > > > > > > > Can you consider extending the interface to allow tests to > capture > > > > > > > > and > > > > > > > > assert data emitted to named side outputs? > > > > > > > > This could take the form of an assertion method like > > > > > > > > assertSideOutput(OutputTag<T> > > > > > > > > tag, T... expected), enabling comprehensive testing of > pipelines > > > > > > > > with > > > > > > > > side > > > > > > > > output logic. WDUT? > > > > > > > > > > > > > > Agreed. Can add first-class support for side outputs by > > > > > > > registering OutputTag<T>s and collecting them via a named > buffer, > > > > > > > e.g.: > > > > > > > > > > > > > > - > > > > > > > > > > > > > > Runner: withSideOutput(OutputTag<T> tag[, String name]) > > > > > > > - > > > > > > > > > > > > > > Assertions: > assertSideOutput(tag).receivedExactlyInAnyOrder(...) > > > > > > > Implementation reuses the same shared sink buffer under a > > > > > > > namespaced > > > > > > > key > > > > > > > for each tag. > > > > > > > > > > > > > > a. sendWatermarkAndAwaitIdle(): A method that sends a watermark > > > > > > > > > > > > > > > and then blocks until the framework can determine that all > in-flight > > > > > > > > records and timers triggered by that watermark have been > fully > > > > > > > > processed. > > > > > > > > This would help users to add tests for windowed operations by > > > > > > > > removing > > > > > > > > the > > > > > > > > need for manual waits or sleeps. > > > > > > > > > > > > > > Not sure what would be the right way to do that. The runtime > doesn’t > > > > > > > expose > > > > > > > a reliable, global “quiescent after W” signal. Watermark > firings can > > > > > > > cascade, and in-flight work sits in mailboxes, and async > callbacks—so > > > > > > > "fully processed" is unverifiable end-to-end. > > > > > > > > > > > > > > > b.* advanceProcessingTime(Duration duration)*: Method to > > > > > > > > advance > > > > > > > > the notion of processing time within the test environment. > This will > > > > > > > > help > > > > > > > > add deterministic testing logic that depends on > processing-time > > > > > > > > timers > > > > > > > > and > > > > > > > > windows. > > > > > > > > > > > > > > > > AFAIK, same as above i think the the Operator/Function > harnesses > > > > > > > > already > > > > > > > > support deterministic processing-time control. > > > > > > > > > > > > > > Thanks, > > > > > > > Poorvank Bhatia > > > > > > > > > > > > > > On Wed, Sep 24, 2025 at 11:59 AM Samrat Deb > [email protected] > > > > > > > wrote: > > > > > > > > > > > > > > > Thank you, Poorvank, for the proposal. This would be a > valuable > > > > > > > > addition > > > > > > > > toward building reliable and well-tested Flink applications. > The > > > > > > > > proposal > > > > > > > > is already in excellent shape. > > > > > > > > > > > > > > > > Below are my take and suggestions on the proposal : > > > > > > > > > > > > > > > > 1. For complex stateful applications, verifying only the > final > > > > > > > > output at > > > > > > > > the sink may not be sufficient. Sometimes, there is a need to > > > > > > > > validate > > > > > > > > the > > > > > > > > intermediate state being built up within operators to ensure > > > > > > > > correctness. > > > > > > > > Is there a mechanism to inspect and verify the contents of an > > > > > > > > operator's > > > > > > > > state? > > > > > > > > > > > > > > > > 2. A common pattern in many of the Flink jobs is the use of > side > > > > > > > > outputs to > > > > > > > > route different types of data to different downstream > processes. The > > > > > > > > singular *assertSink() *method appears to target only the > main data > > > > > > > > stream. > > > > > > > > Can you consider extending the interface to allow tests to > capture > > > > > > > > and > > > > > > > > assert data emitted to named side outputs? > > > > > > > > This could take the form of an assertion method like > > > > > > > > assertSideOutput(OutputTag<T> > > > > > > > > tag, T... expected), enabling comprehensive testing of > pipelines > > > > > > > > with > > > > > > > > side > > > > > > > > output logic. WDUT? > > > > > > > > > > > > > > > > 3. The sendWatermark() method is a solid foundation for > event-time > > > > > > > > testing. However, more complex scenarios involving windows > and timers > > > > > > > > would > > > > > > > > benefit from finer-grained control. > > > > > > > > My suggestion would be to consider adding the following 2 > methods : > > > > > > > > > > > > > > > > a. sendWatermarkAndAwaitIdle(): A method that sends a > > > > > > > > watermark > > > > > > > > and then blocks until the framework can determine that all > in-flight > > > > > > > > records and timers triggered by that watermark have been > fully > > > > > > > > processed. > > > > > > > > This would help users to add tests for windowed operations by > > > > > > > > removing > > > > > > > > the > > > > > > > > need for manual waits or sleeps. > > > > > > > > > > > > > > > > b.* advanceProcessingTime(Duration duration)*: Method to > > > > > > > > advance > > > > > > > > the notion of processing time within the test environment. > This will > > > > > > > > help > > > > > > > > add deterministic testing logic that depends on > processing-time > > > > > > > > timers > > > > > > > > and > > > > > > > > windows. > > > > > > > > > > > > > > > > Bests, > > > > > > > > Samrat > > > > > > > > > > > > > > > > On Wed, Sep 17, 2025 at 1:48 AM Őrhidi Mátyás < > > > > > > > > [email protected]> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Thanks Poorvank, > > > > > > > > > > > > > > > > > > The proposal looks great. > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > Matyas > > > > > > > > > > > > > > > > > > On Thu, Sep 11, 2025 at 3:44 AM Poorvank Bhatia < > > > > > > > > > [email protected]> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > I’d like to propose a Pipeline Testing Framework (on > Source/Sink > > > > > > > > > > V2) > > > > > > > > > > as a > > > > > > > > > > part of flink's testing framework, built on concepts > presented by > > > > > > > > > > the > > > > > > > > > > talk > > > > > > > > > > *“*Testing Production Streaming Applications > > > > > > > > > > https://www.youtube.com/watch?v=lXHleussX9Q” (Gyula > Fóra & > > > > > > > > > > Mátyás > > > > > > > > > > Orhidi). > > > > > > > > > > > > > > > > > > > > The aim is to let users write end-to-end pipeline flow > tests — > > > > > > > > > > replacing > > > > > > > > > > real connectors with lightweight TestSources/TestSinks — > so they > > > > > > > > > > can > > > > > > > > > > deterministically inject records/watermarks and assert > outputs > > > > > > > > > > under > > > > > > > > > > parallelism. > > > > > > > > > > > > > > > > > > > > Unlike MiniCluster, local env, or operator harnesses, > this > > > > > > > > > > provides a > > > > > > > > > > small > > > > > > > > > > Source/Sink V2-based testkit (send(), sendWatermark(), > finish(), > > > > > > > > > > assertSink()), backed by isolated in-JVM buffers for > reproducible > > > > > > > > > > control. > > > > > > > > > > The focus is on testing pipeline logic and wiring, not > connector > > > > > > > > > > implementations themselves. > > > > > > > > > > > > > > > > > > > > For more details refer to this doc > > > > > > > > > > < > > > > > > > > > > > https://docs.google.com/document/d/1lwA4BE4vHTkIBQ-IyEBjPh2l7y7Z3MGzlbOlDaKDnzE/edit?tab=t.0 > > > > > > > > > > > > > > > . > > > > > > > > > > Looking forward to your feedback. > > > > > > > > > > Thanks, > > > > > > > > > > Poorvank Bhatia >
