On Wed, Jan 31, 2018 at 1:34 AM, Jean-Baptiste Onofré <j...@nanthrax.net> wrote:
> Hi Ismaël, > > I agree that hint should not change the output of PTransforms. > > However, let me illustrate why I think hint could be interesting: > > - I agree with what you are saying about the runners: they should be smart. > However, to be smart enough, the runner could use some statements provided > by > pipeline designer. Let's take the Spark runner. In first version, we > didn't do > any caching of RDDs. Now, just for streaming, if a PCollection is read > more than > once, I added RDDs caching. That's systematic, and the user doesn't have > the > choice. When you write the same process using Spark directly, the user has > control of the way the RDDs are cached, and it can depend of the use case. > So, I > think users will be happy to give some hint to the runner at some points > of the > pipeline. > > - Hint will open new features in the IOs. Let me take a concrete example. > On a > local branch, I created a RestIO. The RestIO Write PTransform call a > remote REST > service for each element in the PCollection. It's configured that way: > > pipeline.apply(...) // PCollection<String> where String is JSON for now > .apply(RestIO.write().withUrl("http://localhost:8181/rest/")) > > So all elements will use the same URL. Now, imagine, the URL of the REST > service > depends of the element. There's no easy way to achieve that today: the > only way > is to change the IO to process a PCollection<RestRequest> where > RestRequest POJO > contains the message payload and the URL. That's OK to use such declarative > approach, it's just a bit painful to change the POJO each time we need to > deal > an additional data (let's say encoding, authentication, etc). > However we already solved this particular use case for FileIO - the user can configure the fileIO sink (TextIO, AvroIO, etc.) with a lambda that maps the record to the destination to put the files. Can't we solve things similarly for RestIO? It would look like. pipeline.apply(...) // PCollection<String> where String is JSON for now .apply(RestIO.write().withUrlFunction((String record) -> {getUrl(record)}) > In Camel, you can attach a header to a message. So basically, you have the > data > payload (the body of the message) corresponding to the element. > > That's why I was thinking about hint on PCollection first: it would allow > us to > implement EIPs (Enterprise Integration Patterns). > > Regards > JB > > On 01/31/2018 09:31 AM, Ismaël Mejía wrote: > > This is a subject we have already discussed in the past. It was part > > on the discussion on ‘data locality’ for the runners on top of HDFS. > > In that moment the argument for ‘hints’ was that a transform could > > send hints to the runners so they properly allocate the readers > > improving its execution. This is similar to the case of resource > > allocation (GPU) mentioned by Reuven. > > https://issues.apache.org/jira/browse/BEAM-2085 > > > > What is a bit tricky about the design is the optional characteristic > > of hints, we say that hints should not change the semantics of the > > transforms (its output), but they can easily be abused to configure > > how runners behave. We should limit hints only to the use case of > > resource allocation, cases where the runner can benefit of the hint > > info to pass it to the resource allocator, but runner specific > > configuration must be part only of the runner options, or runners > > should be smarter. > > > > This is to avoid potential misuse for portability and to limit extra > > knobs, Also to avoid the risky case of ending up with some sort of > > runtime ‘map-like’ configuration with hundreds of options that change > > behavior like they exist in Hadoop and Spark, We should avoid adding > > another level of this kind of variables now on top of Beam. > > > > On Wed, Jan 31, 2018 at 7:25 AM, Jean-Baptiste Onofré <j...@nanthrax.net> > wrote: > >> Hi, > >> > >> yeah, it sounds good to me. I will create the Jira to track this and > start a PoC > >> on the Composite. > >> > >> Thanks ! > >> Regards > >> JB > >> > >> On 01/30/2018 10:40 PM, Reuven Lax wrote: > >>> Did we actually reach consensus here? :) > >>> > >>> On Tue, Jan 30, 2018 at 1:29 PM, Romain Manni-Bucau < > rmannibu...@gmail.com > >>> <mailto:rmannibu...@gmail.com>> wrote: > >>> > >>> Not sure how it fits in terms of API yet but +1 for the high level > view. > >>> Makes perfect sense. > >>> > >>> Le 30 janv. 2018 21:41, "Jean-Baptiste Onofré" <j...@nanthrax.net > >>> <mailto:j...@nanthrax.net>> a écrit : > >>> > >>> Hi Robert, > >>> > >>> Good point and idea for the Composite transform. It would > apply nicely > >>> on all transforms based on composite. > >>> > >>> I also agree that the hint is more on the transform than the > PCollection > >>> itself. > >>> > >>> Thanks ! > >>> Regards > >>> JB > >>> > >>> On 30/01/2018 21:26, Robert Bradshaw wrote: > >>> > >>> Many hints make more sense for PTransforms (the > computation itself) > >>> than for PCollections. In addition, when we want > properties attached > >>> to PCollections of themselves, it often makes sense to let > these be > >>> provided by the producing PTransform (e.g. coders and > schemas are > >>> often functions of the input metadata and the operation > itself, and > >>> can't just be set arbitrarily). > >>> > >>> Also, we already have a perfectly standard way of nesting > transforms > >>> (or even sets of transforms), namely composite transforms. > In terms of > >>> API design I would propose writing a composite transform > that applies > >>> constraints/hints/requirements to all its inner > transforms. This > >>> translates nicely to the Fn API as well. > >>> > >>> On Tue, Jan 30, 2018 at 12:14 PM, Kenneth Knowles < > k...@google.com > >>> <mailto:k...@google.com>> wrote: > >>> > >>> It seems like most of these use cases are hints on a > PTransform > >>> and not a > >>> PCollection, no? CPU, memory, expected parallelism, > etc are. > >>> Then you could > >>> just have: > >>> pc.apply(WithHints(myTransform, <hints>)) > >>> > >>> For a PCollection hints that might make sense are bits > like > >>> total size, > >>> element size, and throughput. All things that the > Dataflow folks > >>> have said > >>> should be measured instead of hinted. But I understand > that we > >>> shouldn't > >>> force runners to do infeasible things like build a > whole > >>> no-knobs service on > >>> top of a super-knobby engine. > >>> > >>> Incidentally for portability, we have this > "environment" object > >>> that is > >>> basically the docker URL of an SDK harness that can > execute a > >>> function. We > >>> always intended that same area of the proto (exact > fields TBD) > >>> to have > >>> things like requirements for CPU, memory, GPUs, disk, > etc. It is > >>> likely a > >>> good place for hints. > >>> > >>> BTW good idea to ask users@ for their pain points and > bring them > >>> back to the > >>> dev list to motivate feature design discussions. > >>> > >>> Kenn > >>> > >>> On Tue, Jan 30, 2018 at 12:00 PM, Reuven Lax < > re...@google.com > >>> <mailto:re...@google.com>> wrote: > >>> > >>> > >>> I think the hints would logically be metadata in > the > >>> pcollection, just > >>> like coder and schema. > >>> > >>> On Jan 30, 2018 11:57 AM, "Jean-Baptiste Onofré" > >>> <j...@nanthrax.net <mailto:j...@nanthrax.net>> wrote: > >>> > >>> > >>> Great idea for AddHints.of() ! > >>> > >>> What would be the resulting PCollection ? Just > a > >>> PCollection of hints or > >>> the pc elements + hints ? > >>> > >>> Regards > >>> JB > >>> > >>> On 30/01/2018 20:52, Reuven Lax wrote: > >>> > >>> > >>> I think adding hints for runners is > reasonable, > >>> though hints should > >>> always be assumed to be optional - they > shouldn't > >>> change semantics of the > >>> program (otherwise you destroy the > portability > >>> promise of Beam). However > >>> there are many types of hints that some > runners > >>> might find useful (e.g. this > >>> step needs more memory. this step runs ML > >>> algorithms, and should run on a > >>> machine with GPUs. etc.) > >>> > >>> Robert has mentioned in the past that we > should try > >>> and keep PCollection > >>> an immutable object, and not introduce new > setters > >>> on it. We slightly break > >>> this already today with > PCollection.setCoder, and > >>> that has caused some > >>> problems. Hints can be set on PTransforms > though, > >>> and propagate to that > >>> PTransform's output PCollections. This is > nearly as > >>> easy to use however, as > >>> we can implement a helper PTransform that > can be > >>> used to set hints. I.e. > >>> > >>> pc.apply(AddHints.of(hint1, hint2, hint3)) > >>> > >>> Is no harder than called pc.addHint() > >>> > >>> Reuven > >>> > >>> On Tue, Jan 30, 2018 at 11:39 AM, > Jean-Baptiste > >>> Onofré <j...@nanthrax.net <mailto: > j...@nanthrax.net> > >>> <mailto:j...@nanthrax.net <mailto: > j...@nanthrax.net>>> > >>> wrote: > >>> > >>> Maybe I should have started the > discussion on > >>> the user mailing list: > >>> it would be great to have user > feedback on > >>> this, even if I got your > >>> points. > >>> > >>> Sometime, I have the feeling that > whatever we > >>> are proposing and > >>> discussing, it doesn't go anywhere. > At some > >>> point, to attract more > >>> people, we have to get ideas from > different > >>> perspective/standpoint. > >>> > >>> Thanks for the feedback anyway. > >>> > >>> Regards > >>> JB > >>> > >>> On 30/01/2018 20:27, Romain > Manni-Bucau wrote: > >>> > >>> > >>> > >>> 2018-01-30 19:52 GMT+01:00 > Kenneth Knowles > >>> <k...@google.com <mailto:k...@google.com> > >>> <mailto:k...@google.com > >>> <mailto:k...@google.com>> <mailto: > k...@google.com > >>> <mailto:k...@google.com> > >>> <mailto:k...@google.com > >>> <mailto:k...@google.com>>>>: > >>> > >>> > >>> I generally like having > certain > >>> "escape hatches" that are > >>> well > >>> designed and limited in > scope, and > >>> anything that turns out > >>> to be > >>> important becomes > first-class. But > >>> this one I don't really > >>> like > >>> because the use cases belong > >>> elsewhere. Of course, they > >>> creep so you > >>> should assume they will be > unbounded > >>> in how much gets > >>> stuffed into > >>> them. And the definition of > a "hint" > >>> is that deleting it > >>> does not > >>> change semantics, just > >>> performance/monitor/UI etc but this > >>> does not > >>> seem to be true. > >>> > >>> "spark.persist" for > idempotent replay > >>> in a sink: > >>> - this is already > @RequiresStableInput > >>> - it is not a hint because > if you > >>> don't persist your > >>> results are > >>> incorrect > >>> - it is a property of a > DoFn / > >>> transform not a > >>> PCollection > >>> > >>> > >>> Let's put this last point aside > since we'll > >>> manage to make it > >>> working wherever we store it ;). > >>> > >>> > >>> schema: > >>> - should be first-class > >>> > >>> > >>> Except it doesn't make sense > everywhere. It > >>> is exactly like > >>> saying "implement this" and 2 > lines later > >>> "it doesn't do > >>> anything for you". If you think > wider on > >>> schema you will want to > >>> do far more - like getting them > from the > >>> previous step etc... - > >>> which makes it not an API thing. > However, > >>> with some runner like > >>> spark, being able to specifiy it > will > >>> enable to optimize the > >>> execution. There is a clear > mismatch > >>> between a consistent and > >>> user friendly generic and > portable API, and > >>> a runtime, runner > >>> specific, implementation. > >>> > >>> This is all fine as an issue for > a portable > >>> API and why all EE > >>> API have a map to pass properties > somewhere > >>> so I don't see why > >>> beam wouldn't fall in that exact > same > >>> bucket since it embraces > >>> the drawback of the portability > and we > >>> already hit it since > >>> several releases. > >>> > >>> > >>> step parallelism (you didn't > mention > >>> but most runners need > >>> some > >>> control): > >>> - this is a property of > the data and > >>> the pipeline > >>> together, not > >>> just the pipeline > >>> > >>> > >>> Good one but this can be > configured from > >>> the pipeline or even a > >>> transform. This doesn't mean the > data is > >>> not important - and you > >>> are more than right on that > point, just > >>> that it is configurable > >>> without referencing the data > (using ranges > >>> is a trivial example > >>> even if not the most efficient). > >>> > >>> > >>> So I just don't actually see > a use > >>> case for free-form hints > >>> that we > >>> haven't already covered. > >>> > >>> > >>> There are several cases, even in > the direct > >>> runner to be able to > >>> industrialize it: > >>> - use that particular executor > instance > >>> - debug these infos for that > transform > >>> > >>> etc... > >>> > >>> As a high level design I think it > is good > >>> to bring hints to beam > >>> to avoid to add ad-hoc solution > each time > >>> and take the risk to > >>> loose the portability of the main > API. > >>> > >>> > >>> Kenn > >>> > >>> On Tue, Jan 30, 2018 at 9:55 > AM, > >>> Romain Manni-Bucau > >>> <rmannibu...@gmail.com > >>> <mailto:rmannibu...@gmail.com> > >>> <mailto:rmannibu...@gmail.com > >>> <mailto:rmannibu...@gmail.com>> > >>> <mailto:rmannibu...@gmail.com > >>> <mailto:rmannibu...@gmail.com> > >>> <mailto:rmannibu...@gmail.com > >>> <mailto:rmannibu...@gmail.com>>>> > >>> wrote: > >>> > >>> Lukasz, the point is > that you have > >>> to choice to either > >>> bring all > >>> specificities to the > main API > >>> which makes most of the > >>> API not > >>> usable or implemented or > the > >>> opposite, not support > >>> anything. > >>> Introducing hints will > allow to > >>> have eagerly for some > >>> runners > >>> some features - or just > some very > >>> specific things - and > >>> once > >>> mainstream it can find a > place in > >>> the main API. This is > >>> saner > >>> than the opposite since > some > >>> specificities can never > >>> find a good > >>> place. > >>> > >>> The little thing we need > to take > >>> care with that is to > >>> avoid to > >>> introduce some feature > flipping as > >>> support some feature > >>> not > >>> doable with another > runner. It > >>> should really be about > >>> runing a > >>> runner execution (like > the schema > >>> in spark). > >>> > >>> > >>> Romain Manni-Bucau > >>> @rmannibucau > >>> <https://twitter.com/rmannibucau > >>> <https://twitter.com/rmannibucau> > >>> <https://twitter.com/rmannibucau > >>> <https://twitter.com/rmannibucau>>> | Blog > >>> < > https://rmannibucau.metawerx.net/ > >>> <https://rmannibucau.metawerx.net/> > >>> <https://rmannibucau.metawerx. > net/ > >>> <https://rmannibucau.metawerx.net/>>> | > Old Blog > >>> < > http://rmannibucau.wordpress.com > >>> <http://rmannibucau.wordpress.com> > >>> <http://rmannibucau.wordpress.com > >>> <http://rmannibucau.wordpress.com>>> | > Github > >>> <https://github.com/ > rmannibucau > >>> <https://github.com/rmannibucau> > >>> <https://github.com/rmannibucau > >>> <https://github.com/rmannibucau>>> | > LinkedIn > >>> > >>> <https://www.linkedin.com/in/rmannibucau > >>> <https://www.linkedin.com/in/rmannibucau> > >>> <https://www.linkedin.com/in/ > rmannibucau > >>> <https://www.linkedin.com/in/rmannibucau > >>> > >>> > >>> 2018-01-30 18:42 > GMT+01:00 > >>> Jean-Baptiste Onofré > >>> <j...@nanthrax.net <mailto: > j...@nanthrax.net> > >>> <mailto:j...@nanthrax.net <mailto: > j...@nanthrax.net>> > >>> <mailto:j...@nanthrax.net > >>> <mailto:j...@nanthrax.net> <mailto: > j...@nanthrax.net > >>> <mailto:j...@nanthrax.net>>>>: > >>> > >>> Good point Luke: in > that case, > >>> the hint will be > >>> ignored by > >>> the runner if the > hint is not > >>> for him. The hint can > >>> be > >>> generic (not > specific to a > >>> runner). It could be > >>> interesting > >>> for the schema > support or IOs, > >>> not specific to a > >>> runner. > >>> > >>> What do you mean by > gathering > >>> PTransforms/PCollections and > >>> where ? > >>> > >>> Thanks ! > >>> Regards > >>> JB > >>> > >>> On 30/01/2018 18:35, > Lukasz > >>> Cwik wrote: > >>> > >>> If the hint is > required to > >>> run the persons > >>> pipeline > >>> well, how do you > expect > >>> that the person we be > >>> able to > >>> migrate their > pipeline to > >>> another runner? > >>> > >>> A lot of hints > like > >>> "spark.persist" are really > >>> the user > >>> trying to tell us > >>> something about the > >>> PCollection, like > >>> it is very > small. I would > >>> prefer if we gathered > >>> this > >>> information about > >>> PTransforms and PCollections > >>> instead > >>> of runner > specific knobs > >>> since then each runner > >>> can > >>> choose how best > to map > >>> such a property on their > >>> internal > >>> representation. > >>> > >>> On Tue, Jan 30, > 2018 at > >>> 2:21 AM, Jean-Baptiste > >>> Onofré > >>> <j...@nanthrax.net > >>> <mailto:j...@nanthrax.net> <mailto: > j...@nanthrax.net > >>> <mailto:j...@nanthrax.net>> > >>> <mailto:j...@nanthrax.net > >>> <mailto:j...@nanthrax.net> <mailto: > j...@nanthrax.net > >>> <mailto:j...@nanthrax.net>>> > >>> <mailto: > j...@nanthrax.net > >>> <mailto:j...@nanthrax.net> > >>> <mailto:j...@nanthrax.net > >>> <mailto:j...@nanthrax.net>> <mailto: > j...@nanthrax.net > >>> <mailto:j...@nanthrax.net> > >>> <mailto:j...@nanthrax.net > >>> <mailto:j...@nanthrax.net>>>>> wrote: > >>> > >>> Hi, > >>> > >>> As part of > the > >>> discussion about schema, > >>> Romain > >>> mentioned hint. I > >>> think it's > >>> worth to > have an > >>> explanation about that > >>> and > >>> especially it > could be > >>> wider than > >>> schema. > >>> > >>> Today, to > give > >>> information to the runner, > >>> we use > >>> PipelineOptions. > >>> The runner > can > >>> use these > options, > >>> and apply for all inner > >>> representation > of the > >>> PCollection > in > >>> the runner. > >>> > >>> For > instance, for the > >>> Spark runner, the > >>> persistence > >>> storage level > >>> (memory, > disk, > >>> ...) can be > defined > >>> via pipeline options. > >>> > >>> Then, the > Spark > >>> runner automatically > >>> defines if > >>> RDDs have to be > >>> persisted > (using > >>> the storage > level > >>> defined in the pipeline > >>> options), > >>> for instance if > >>> the same > >>> > POutput/PCollection > >>> is read several time. > >>> > >>> However, > the user > >>> doesn't have any way to > >>> provide > >>> indication to the > >>> runner to > >>> deal with a > specific > >>> PCollection. > >>> > >>> Imagine, > the user has > >>> a pipeline like > >>> this: > >>> > >>> pipeline.apply().apply().apply(). We > >>> have three > >>> PCollections involved in this > >>> pipeline. > >>> It's not > >>> currently > possible > >>> to give > indications > >>> how the runner should > >>> "optimized" and > deal with > >>> the second > >>> PCollection > only. > >>> > >>> The idea is > to add a > >>> method on the > >>> PCollection: > >>> > >>> > >>> PCollection.addHint(String key, Object > >>> value); > >>> > >>> For > instance: > >>> > >>> > >>> collection.addHint("spark.persist", > >>> > StorageLevel.MEMORY_ONLY); > >>> > >>> I see three > direct > >>> usage of this: > >>> > >>> 1. Related > to schema: > >>> the schema > >>> definition could > >>> be a hint > >>> 2. Related > to the IO: > >>> add headers for the > >>> IO and > >>> the runner how to > >>> specifically > >>> process a > collection. > >>> In Apache Camel, we > >>> have > >>> headers on the > >>> message and > >>> properties > on the > >>> exchange similar to > >>> this. It > >>> allows to give > some > >>> indication > >>> how to > process some > >>> messages on the Camel > >>> component. We > can imagine > >>> the same of > >>> the IO > (using the > >>> PCollection hints to > >>> react > >>> accordingly). > >>> 3. Related > to runner > >>> optimization: I see > >>> for > >>> instance a way > to use > >>> RDD or > >>> dataframe > in Spark > >>> runner, or even > >>> specific > >>> optimization like > >>> persist. I > had lot > >>> of > questions from > >>> Spark users saying: "in > >>> my Spark > >>> job, I know where > >>> and how I > >>> should use > persist > >>> (rdd.persist()), but I > >>> can't do > >>> such optimization > >>> using > >>> Beam". So > it could be > >>> a good improvements. > >>> > >>> Thoughts ? > >>> > >>> Regards > >>> JB > >>> -- > >>> > Jean-Baptiste Onofré > >>> jbono...@apache.org > >>> <mailto:jbono...@apache.org> > >>> <mailto:jbono...@apache.org > >>> <mailto:jbono...@apache.org>> > >>> <mailto:jbono...@apache.org > >>> <mailto:jbono...@apache.org> > >>> <mailto:jbono...@apache.org > >>> <mailto:jbono...@apache.org>>> > >>> > >>> <mailto:jbono...@apache.org <mailto: > jbono...@apache.org> > >>> <mailto:jbono...@apache.org > >>> <mailto:jbono...@apache.org>> > >>> <mailto:jbono...@apache.org <mailto: > jbono...@apache.org> > >>> <mailto:jbono...@apache.org > >>> <mailto:jbono...@apache.org>>>> > >>> http://blog.nanthrax.net > >>> Talend - > >>> http://www.talend.com > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >> > >> -- > >> Jean-Baptiste Onofré > >> jbono...@apache.org > >> http://blog.nanthrax.net > >> Talend - http://www.talend.com > > -- > Jean-Baptiste Onofré > jbono...@apache.org > http://blog.nanthrax.net > Talend - http://www.talend.com >