On Wed, Apr 1, 2020 at 11:31 AM Sam Rohde <[email protected]> wrote: > Okay cool, so it sounds like the cleanup can be done in two phases: move > the apply_ methods to transform replacements, then move Dataflow onto the > Cloudv1b3 protos. AFAIU, after phase one will make the Pipeline object > portable? If the InteractiveRunner were to make a Pipeline object, then it > could be passed to the DataflowRunner to run, correct? >
Currently we do the following. (1) Currently Java and Python SDKs SDK specific object representation -> Dataflow job request (v1beta3) -> Dataflow service specific representation Beam Runner API proto -> store in GCS -> Download in workers. (2) Currently Go SDK SDK specific object representation -> Beam Runner API proto -> Dataflow job request (v1beta3) -> Dataflow service specific representation We got cross-language (for Python) working for (1) above but code will be much cleaner if we could do (2) for Python and Java I think the cleanest approach is following which will allow us to share translation code across SDKs. (3) For all SDKs SDK specific object representation -> Runner API proto embedded in Dataflow job request -> Runner API proto to internal Dataflow specific representation within Dataflow service I think we should go for a cleaner approach here ((2) or (3)) instead of trying to do it in multiple steps (we'll have to keep updating features such as a cross-language to be in lockstep which will be hard and result in a lot of throwaway work). Thanks, Cham > On Tue, Mar 31, 2020 at 6:01 PM Robert Burke <[email protected]> wrote: > >> +1 to translation from beam pipeline Protos. >> >> The Go SDK does that currently in dataflowlib/translate.go to handle the >> current Dataflow situation, so it's certainly doable. >> >> On Tue, Mar 31, 2020, 5:48 PM Robert Bradshaw <[email protected]> >> wrote: >> >>> On Tue, Mar 31, 2020 at 12:06 PM Sam Rohde <[email protected]> wrote: >>> >>>> Hi All, >>>> >>>> I am currently investigating making the Python DataflowRunner to use a >>>> portable pipeline representation so that we can eventually get rid of the >>>> Pipeline(runner) weirdness. >>>> >>>> In that case, I have a lot questions about the Python DataflowRunner: >>>> >>>> *PValueCache* >>>> >>>> - Why does this exist? >>>> >>>> This is historical baggage from the (long gone) first direct runner >>> when actual computed PCollections were cached, and the DataflowRunner >>> inherited it. >>> >>> >>>> *DataflowRunner* >>>> >>>> - I see that the DataflowRunner defines some PTransforms as >>>> runner-specific primitives by returning a PCollection.from_(...) in >>>> apply_ >>>> methods. Then in the run_ methods, it references the PValueCache to add >>>> steps. >>>> - How does this add steps? >>>> - Where does it cache the values to? >>>> - How does the runner harness pick up these cached values to >>>> create new steps? >>>> - How is this information communicated to the runner harness? >>>> - Why do the following transforms need to be overridden: >>>> GroupByKey, WriteToBigQuery, CombineValues, Read? >>>> >>>> Each of these four has a different implementation on Dataflow. >>> >>>> >>>> - Why doesn't the ParDo transform need to be overridden? I see that >>>> it has a run_ method but no apply_ method. >>>> >>>> apply_ is called at pipeline construction time, all of these should be >>> replaced by PTransformOverrides. run_ is called after pipeline construction >>> to actually build up the dataflow graph. >>> >>> >>>> *Possible fixes* >>>> I was thinking of getting rid of the apply_ and run_ methods and >>>> replacing those with a PTransformOverride and a simple PipelineVisitor, >>>> respectively. Is this feasible? Am I missing any assumptions that don't >>>> make this feasible? >>>> >>> >>> If we're going to overhaul how the runner works, it would be best to >>> make DataflowRunner direct a translator from Beam runner api protos to >>> Cloudv1b3 protos, rather than manipulate the intermediate Python >>> representation (which no one wants to change for fear of messing up >>> DataflowRunner and cause headaches for cross langauge). >>> >>> >>>
