The stream API was looked at way back when we were designing the API; one
of the primary reasons it was not further pursued at the time was the
demand for Java 7 compatibility. It is also much more natural with lambdas,
but unfortunately the Java compiler discards types in this case, making
coder inference impossible. Still is interesting to explore, and I've been
toying with using this wrapping method for other applications
(specifically, giving a Pandas Dataframe API to PCollections in Python).

There's a higher level question lingering here about making things more
fluent by putting methods on PCollections in our primary API. It was
somewhat of an experiment to go the very pure approach of *everything*
being expressed a PTransform, and this is not without its disadvantages,
and (gasp) may be worth revisiting. In particular, some things that have
changed in the meantime are

* The Java SDK is no longer *the* definition of the model. The model has
been (mostly) formalized in the portability work, and the general Beam
concepts and notion of PTransform are much more widely fleshed out and
* Java 8's lambdas, etc. allows for much more succinct representation of
operations, which makes the relative ratio of boilerplate of using apply
that much higher. This is one of the struggles we had with the Python API,
pcoll.apply(Map(lambda ...)) made the "apply" feel *very* redundant. pcoll
| Map(...) is at least closer to
* With over two years of experience with the 100% pure approach, we still
haven't "gotten used to it" enough that adding such methods isn't
appealing. (Note that by design adding such methods later is always easier
than taking them away, which was one justification for starting at the
extreme point).

Even if we go this route, there's no need to remove apply, and


flows fairly well (with map/flatMap being syntactic sugar to apply).

I think we would also have to still use apply for parameterless operations
like gbk that place constraints on the element types. I don't see how to do
combinePerKey either (though, asymmetrically, globalCombine is fine).

The largest fear I have is feature creep. There would have to be a very
clear line of what's in and what's not, likely with what's in being a very
short list (which is probably OK and would give the biggest gain, but not
much discoverability). The criteria can't be primitives (gbk is
problematic, and the most natural map isn't really the full ParDo
primitive--in fact the full ParDo might be "advanced" enough to merit
requiring apply).

Who knows, though I still think we made the right decision to attempt
apply-only at the time, maybe I'll have to flesh this out into a new blog
post that is a rebuttal to my original one :).

- Robert

On Wed, Mar 14, 2018 at 1:28 AM Romain Manni-Bucau <>

> Hi Jan,
> The wrapping is almost exactly what I had un mind (I would pass the
> expected Class to support a bit more like in most jre or javax API but
> that's a detail) but I would really try to align it on java stream just to
> keep the dev comfortable:
> Romain Manni-Bucau
> @rmannibucau <> |  Blog
> <> | Old Blog
> <> | Github
> <> | LinkedIn
> <> | Book
> <>
> 2018-03-14 9:03 GMT+01:00 Jan Lukavský <>:
>> Hi all,
>> the are actually some steps taken in this direction - a few emails
>> already went to this channel about donation of Euphoria API (
>> to Apache Beam. SGA has already been
>> signed, currently there is work in progress for porting all Euphoria's
>> features to Beam. The idea is that Euphoria could be this "user friendly"
>> layer on top of Beam. In our proof-of-concept this works like this:
>>    // create input
>>    String raw = "hi there hi hi sue bob hi sue ZOW bob";
>>    List<String> words = Arrays.asList(raw.split(" "));
>>    Pipeline pipeline = Pipeline.create(options());
>>    // create input PCollection
>>    PCollection<String> input = pipeline.apply(
>> Create.of(words)).setTypeDescriptor(TypeDescriptor.of(String.class));
>>    // holder of mapping between Euphoria and Beam
>>    BeamFlow flow = BeamFlow.create(pipeline);
>>    // lift this PCollection to Euphoria API
>>    Dataset<String> dataset = flow.wrapped(input);
>>    // do something with the data
>>    Dataset<Pair<String, Long>> output = CountByKey.of(dataset)
>>        .keyBy(e -> e)
>>        .output();
>>    // convert Euphoria API back to Beam
>>    PCollection<Pair<String, Long>> beamOut = flow.unwrapped(output);
>>    // do whatever with the resulting PCollection
>>    PAssert.that(beamOut)
>>        .containsInAnyOrder(
>>            Pair.of("hi", 4L),
>>            Pair.of("there", 1L),
>>            Pair.of("sue", 2L),
>>            Pair.of("ZOW", 1L),
>>            Pair.of("bob", 2L));
>>    // run, forrest, run
>> I'm aware that this is not the "stream" API this thread was about, but
>> Euphoria also has a "fluent" package -
>> This is
>> by no means a complete or production ready API, but it could help solve
>> this dichotomy between whether to keep Beam API as is, or introduce some
>> more use-friendly API. As I said, there is work in progress in this, so if
>> anyone could spare some time and give us helping hand with this porting, it
>> would be just awesome. :-)
>> Jan
>> On 03/13/2018 07:06 PM, Romain Manni-Bucau wrote:
>> Yep
>> I know the rational and it makes sense but it also increases the entering
>> steps for users and is not that smooth in ides, in particular for custom
>> code.
>> So I really think it makes sense to build an user friendly api on top of
>> beam core dev one.
>> Le 13 mars 2018 18:35, "Aljoscha Krettek" <> a écrit :
>>> On 11. Mar 2018, at 22:21, Romain Manni-Bucau <>
>>> wrote:
>>> Le 12 mars 2018 00:16, "Reuven Lax" <> a écrit :
>>> I think it would be interesting to see what a Java stream-based API
>>> would look like. As I mentioned elsewhere, we are not limited to having
>>> only one API for Beam.
>>> If I remember correctly, a Java stream API was considered for Dataflow
>>> back at the very beginning. I don't completely remember why it was
>>> rejected, but I suspect at least part of the reason might have been that
>>> Java streams were considered too new and untested back then.
>>> Coders are broken - typevariables dont have bounds except object - and
>>> reducers are not trivial to impl generally I guess.
>>> However being close of this api can help a lot so +1 to try to have a
>>> java dsl on top of current api. Would also be neat to integrate it with
>>> completionstage :).
>>> Reuven
>>> On Sun, Mar 11, 2018 at 2:29 PM Romain Manni-Bucau <
>>>> wrote:
>>>> Le 11 mars 2018 21:18, "Jean-Baptiste Onofré" <> a
>>>> écrit :
>>>> Hi Romain,
>>>> I remember we have discussed about the way to express pipeline while
>>>> ago.
>>>> I was fan of a "DSL" compared to the one we have in Camel: instead of
>>>> using apply(), use a dedicated form (like .map(), .reduce(), etc, AFAIR,
>>>> it's the approach in flume).
>>>> However, we agreed that apply() syntax gives a more flexible approach.
>>>> Using Java Stream is interesting but I'm afraid we would have the same
>>>> issue as the one we identified discussing "fluent Java SDK". However, we
>>>> can have a Stream API DSL on top of the SDK IMHO.
>>>> Agree and a beam stream interface (copying jdk api but making lambda
>>>> serializable to avoid the cast need).
>>>> On my side i think it enables user to discover the api. If you check my
>>>> poc impl you quickly see the steps needed to do simple things like a map
>>>> which is a first citizen.
>>>> Also curious if we could impl reduce with pipeline result = get an
>>>> output of a batch from the runner (client) jvm. I see how to do it for
>>>> longs - with metrics - but not for collect().
>>>> Regards
>>>> JB
>>>> On 11/03/2018 19:46, Romain Manni-Bucau wrote:
>>>>> Hi guys,
>>>>> don't know if you already experienced using java Stream API as a
>>>>> replacement for pipeline API but did some tests:
>>>>> It is far to be complete but already shows where it fails (beam
>>>>> doesn't have a way to reduce in the caller machine for instance, coder
>>>>> handling is not that trivial, lambda are not working well with default
>>>>> Stream API etc...).
>>>>> However it is interesting to see that having such an API is pretty
>>>>> natural compare to the pipeline API
>>>>> so wonder if beam should work on its own Stream API (with surely
>>>>> another name for obvious reasons ;)).
>>>>> Romain Manni-Bucau
>>>>> @rmannibucau <> | Blog <
>>>>>> | Old Blog <
>>>>>> | Github <
>>>>>> | LinkedIn <
>>>>>> | Book <
>>>>> >

Reply via email to