Hi Jan, The wrapping is almost exactly what I had un mind (I would pass the expected Class to support a bit more like in most jre or javax API but that's a detail) but I would really try to align it on java stream just to keep the dev comfortable: https://github.com/hazelcast/hazelcast-jet/blob/9c4ea86a59ae3b899498f389b5459d67c2b4cdcd/hazelcast-jet-core/src/main/java/com/hazelcast/jet/pipeline/StreamStage.java
Romain Manni-Bucau @rmannibucau <https://twitter.com/rmannibucau> | Blog <https://rmannibucau.metawerx.net/> | Old Blog <http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> | LinkedIn <https://www.linkedin.com/in/rmannibucau> | Book <https://www.packtpub.com/application-development/java-ee-8-high-performance> 2018-03-14 9:03 GMT+01:00 Jan Lukavský <je...@seznam.cz>: > Hi all, > > the are actually some steps taken in this direction - a few emails already > went to this channel about donation of Euphoria API ( > https://github.com/seznam/euphoria) to Apache Beam. SGA has already been > signed, currently there is work in progress for porting all Euphoria's > features to Beam. The idea is that Euphoria could be this "user friendly" > layer on top of Beam. In our proof-of-concept this works like this: > > // create input > String raw = "hi there hi hi sue bob hi sue ZOW bob"; > List<String> words = Arrays.asList(raw.split(" ")); > > Pipeline pipeline = Pipeline.create(options()); > > // create input PCollection > PCollection<String> input = pipeline.apply( > Create.of(words)).setTypeDescriptor(TypeDescriptor.of(String. > class)); > > // holder of mapping between Euphoria and Beam > BeamFlow flow = BeamFlow.create(pipeline); > > // lift this PCollection to Euphoria API > Dataset<String> dataset = flow.wrapped(input); > > // do something with the data > Dataset<Pair<String, Long>> output = CountByKey.of(dataset) > .keyBy(e -> e) > .output(); > > // convert Euphoria API back to Beam > PCollection<Pair<String, Long>> beamOut = flow.unwrapped(output); > > // do whatever with the resulting PCollection > PAssert.that(beamOut) > .containsInAnyOrder( > Pair.of("hi", 4L), > Pair.of("there", 1L), > Pair.of("sue", 2L), > Pair.of("ZOW", 1L), > Pair.of("bob", 2L)); > > // run, forrest, run > pipeline.run(); > I'm aware that this is not the "stream" API this thread was about, but > Euphoria also has a "fluent" package - https://github.com/seznam/ > euphoria/tree/master/euphoria-fluent. This is by no means a complete or > production ready API, but it could help solve this dichotomy between > whether to keep Beam API as is, or introduce some more use-friendly API. As > I said, there is work in progress in this, so if anyone could spare some > time and give us helping hand with this porting, it would be just awesome. > :-) > > Jan > > > On 03/13/2018 07:06 PM, Romain Manni-Bucau wrote: > > Yep > > I know the rational and it makes sense but it also increases the entering > steps for users and is not that smooth in ides, in particular for custom > code. > > So I really think it makes sense to build an user friendly api on top of > beam core dev one. > > > Le 13 mars 2018 18:35, "Aljoscha Krettek" <aljos...@apache.org> a écrit : > >> https://beam.apache.org/blog/2016/05/27/where-is-my-pcollect >> ion-dot-map.html >> >> On 11. Mar 2018, at 22:21, Romain Manni-Bucau <rmannibu...@gmail.com> >> wrote: >> >> >> >> Le 12 mars 2018 00:16, "Reuven Lax" <re...@google.com> a écrit : >> >> I think it would be interesting to see what a Java stream-based API would >> look like. As I mentioned elsewhere, we are not limited to having only one >> API for Beam. >> >> If I remember correctly, a Java stream API was considered for Dataflow >> back at the very beginning. I don't completely remember why it was >> rejected, but I suspect at least part of the reason might have been that >> Java streams were considered too new and untested back then. >> >> >> Coders are broken - typevariables dont have bounds except object - and >> reducers are not trivial to impl generally I guess. >> >> However being close of this api can help a lot so +1 to try to have a >> java dsl on top of current api. Would also be neat to integrate it with >> completionstage :). >> >> >> >> Reuven >> >> >> On Sun, Mar 11, 2018 at 2:29 PM Romain Manni-Bucau <rmannibu...@gmail.com> >> wrote: >> >>> >>> >>> Le 11 mars 2018 21:18, "Jean-Baptiste Onofré" <j...@nanthrax.net> a >>> écrit : >>> >>> Hi Romain, >>> >>> I remember we have discussed about the way to express pipeline while ago. >>> >>> I was fan of a "DSL" compared to the one we have in Camel: instead of >>> using apply(), use a dedicated form (like .map(), .reduce(), etc, AFAIR, >>> it's the approach in flume). >>> However, we agreed that apply() syntax gives a more flexible approach. >>> >>> Using Java Stream is interesting but I'm afraid we would have the same >>> issue as the one we identified discussing "fluent Java SDK". However, we >>> can have a Stream API DSL on top of the SDK IMHO. >>> >>> >>> Agree and a beam stream interface (copying jdk api but making lambda >>> serializable to avoid the cast need). >>> >>> On my side i think it enables user to discover the api. If you check my >>> poc impl you quickly see the steps needed to do simple things like a map >>> which is a first citizen. >>> >>> Also curious if we could impl reduce with pipeline result = get an >>> output of a batch from the runner (client) jvm. I see how to do it for >>> longs - with metrics - but not for collect(). >>> >>> >>> Regards >>> JB >>> >>> >>> On 11/03/2018 19:46, Romain Manni-Bucau wrote: >>> >>>> Hi guys, >>>> >>>> don't know if you already experienced using java Stream API as a >>>> replacement for pipeline API but did some tests: >>>> https://github.com/rmannibucau/jbeam >>>> >>>> It is far to be complete but already shows where it fails (beam doesn't >>>> have a way to reduce in the caller machine for instance, coder handling is >>>> not that trivial, lambda are not working well with default Stream API >>>> etc...). >>>> >>>> However it is interesting to see that having such an API is pretty >>>> natural compare to the pipeline API >>>> so wonder if beam should work on its own Stream API (with surely >>>> another name for obvious reasons ;)). >>>> >>>> Romain Manni-Bucau >>>> @rmannibucau <https://twitter.com/rmannibucau> | Blog < >>>> https://rmannibucau.metawerx.net/> | Old Blog < >>>> http://rmannibucau.wordpress.com> | Github < >>>> https://github.com/rmannibucau> | LinkedIn < >>>> https://www.linkedin.com/in/rmannibucau> | Book < >>>> https://www.packtpub.com/application-development/java-ee-8- >>>> high-performance> >>>> >>> >>> >> >> >