When testing the different IOs, we want to have the best possible coverage and be able to test with different use cases.

We create integration test pipelines, and, one "classic" use case is to implement a pipeline starting from an unbounded source (providing an unbounded PCollection like Kafka, JMS, MQTT, ...) and sending data to a bounded sink (TextIO for instance) expected a bounded PCollection.

This use case is not currently possible. Even when using a Window, it will create a chunk of the unbounded PCollection, but the PCollection is still unbounded.

That's why I created: https://issues.apache.org/jira/browse/BEAM-638.

However, I don't think a Window Fn/Trigger is the best approach.

A possible solution would be to create a specific IO (BoundedWriteFromUnboundedSourceIO similar to the one we have for Read ;)) to do that, but I think we should provide a more global way, as this use case is not specific to IO. For instance, a sorting PTransform will work only on a bounded PCollection (not an unbounded).

I wonder if we could not provide a DoFnWithStore. The purpose is to store unbounded PCollection elements (squared by a Window for instance) into a pluggable store and read from the store to provide a bounded PCollection. The store/read trigger could be on the finish bundle. We could provide "store service", for instance based on GS, HDFS, or any other storage (Elasticsearch, Cassandra, ...).

Spark users might be "confused", as in Spark, this behavior is "native" thanks to the micro-batches. In spark-streaming, basically a DStream is a bounded collection of RDDs.

Basically, the DoFnWithStore will look like a DoFn with implicit store/read from the store. Something like:

public abstract class DoFnWithStore extends DoFn {



Generally, SDF sounds like a native way to let users implement this behavior explicitly.

My proposal is to do it implicitly and transparently for the end users (they just have to provide the Window definition and the store service to use).

Thoughts ?

