Thanks for the update Lukasz.

How would you implement a "transform" from unbounded PCollection to bounded PCollection ?


Even if I use a GroupByKey with something like KV<K, Iterable<V>>, it doesn't change the type of the PCollection.

You are right with State API. My proposal is more a way to implicitly use State in DoFn.

Regards
JB

On 10/14/2016 04:51 PM, Lukasz Cwik wrote:
SplittableDoFn is about taking a single element and turning it into
potentially many in a parallel way by allowing an element to be split
across bundles.

I believe a user could do what you describe by using a GBK to group their
data how they want. In your example it would be a single key, then they
would have KV<K, Iterable<V>> for all the values when reading from that
GBK. The proposed State API seems to also overlap with what your trying to
achieve.



On Fri, Oct 14, 2016 at 5:12 AM, Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:

Hi guys,

When testing the different IOs, we want to have the best possible coverage
and be able to test with different use cases.

We create integration test pipelines, and, one "classic" use case is to
implement a pipeline starting from an unbounded source (providing an
unbounded PCollection like Kafka, JMS, MQTT, ...) and sending data to a
bounded sink (TextIO for instance) expected a bounded PCollection.

This use case is not currently possible. Even when using a Window, it will
create a chunk of the unbounded PCollection, but the PCollection is still
unbounded.

That's why I created: https://issues.apache.org/jira/browse/BEAM-638.

However, I don't think a Window Fn/Trigger is the best approach.

A possible solution would be to create a specific IO
(BoundedWriteFromUnboundedSourceIO similar to the one we have for Read
;)) to do that, but I think we should provide a more global way, as this
use case is not specific to IO. For instance, a sorting PTransform will
work only on a bounded PCollection (not an unbounded).

I wonder if we could not provide a DoFnWithStore. The purpose is to store
unbounded PCollection elements (squared by a Window for instance) into a
pluggable store and read from the store to provide a bounded PCollection.
The store/read trigger could be on the finish bundle.
We could provide "store service", for instance based on GS, HDFS, or any
other storage (Elasticsearch, Cassandra, ...).

Spark users might be "confused", as in Spark, this behavior is "native"
thanks to the micro-batches. In spark-streaming, basically a DStream is a
bounded collection of RDDs.

Basically, the DoFnWithStore will look like a DoFn with implicit
store/read from the store. Something like:

public abstract class DoFnWithStore extends DoFn {

  @ProcessElement
  @Store(Window)
  ....

}

Generally, SDF sounds like a native way to let users implement this
behavior explicitly.

My proposal is to do it implicitly and transparently for the end users
(they just have to provide the Window definition and the store service to
use).

Thoughts ?

Regards
JB
--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com



--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to