In particular, ideally the Dataflow Service is handling the Dataflow specific format translation, rather than each SDK. Move the v1 beta3 pipeline to an internal detail.
Ideally Dataflow would support a JobManagment endpoint directly, but I imagine that's a more involved task that's out of scope for now. On Thu, Apr 2, 2020, 7:43 AM Chamikara Jayalath <[email protected]> wrote: > > > On Wed, Apr 1, 2020 at 11:31 AM Sam Rohde <[email protected]> wrote: > >> Okay cool, so it sounds like the cleanup can be done in two phases: move >> the apply_ methods to transform replacements, then move Dataflow onto the >> Cloudv1b3 protos. AFAIU, after phase one will make the Pipeline object >> portable? If the InteractiveRunner were to make a Pipeline object, then it >> could be passed to the DataflowRunner to run, correct? >> > > Currently we do the following. > > (1) Currently Java and Python SDKs > SDK specific object representation -> Dataflow job request (v1beta3) -> > Dataflow service specific representation > Beam Runner API proto -> store in GCS -> Download in workers. > > (2) Currently Go SDK > SDK specific object representation -> Beam Runner API proto -> Dataflow > job request (v1beta3) -> Dataflow service specific representation > > We got cross-language (for Python) working for (1) above but code will be > much cleaner if we could do (2) for Python and Java > > I think the cleanest approach is following which will allow us to share > translation code across SDKs. > (3) For all SDKs > SDK specific object representation -> Runner API proto embedded in > Dataflow job request -> Runner API proto to internal Dataflow specific > representation within Dataflow service > > I think we should go for a cleaner approach here ((2) or (3)) instead of > trying to do it in multiple steps (we'll have to keep updating features > such as a cross-language to be in lockstep which will be hard and result in > a lot of throwaway work). > > Thanks, > Cham > > >> On Tue, Mar 31, 2020 at 6:01 PM Robert Burke <[email protected]> wrote: >> >>> +1 to translation from beam pipeline Protos. >>> >>> The Go SDK does that currently in dataflowlib/translate.go to handle >>> the current Dataflow situation, so it's certainly doable. >>> >>> On Tue, Mar 31, 2020, 5:48 PM Robert Bradshaw <[email protected]> >>> wrote: >>> >>>> On Tue, Mar 31, 2020 at 12:06 PM Sam Rohde <[email protected]> wrote: >>>> >>>>> Hi All, >>>>> >>>>> I am currently investigating making the Python DataflowRunner to use a >>>>> portable pipeline representation so that we can eventually get rid of the >>>>> Pipeline(runner) weirdness. >>>>> >>>>> In that case, I have a lot questions about the Python DataflowRunner: >>>>> >>>>> *PValueCache* >>>>> >>>>> - Why does this exist? >>>>> >>>>> This is historical baggage from the (long gone) first direct runner >>>> when actual computed PCollections were cached, and the DataflowRunner >>>> inherited it. >>>> >>>> >>>>> *DataflowRunner* >>>>> >>>>> - I see that the DataflowRunner defines some PTransforms as >>>>> runner-specific primitives by returning a PCollection.from_(...) in >>>>> apply_ >>>>> methods. Then in the run_ methods, it references the PValueCache to add >>>>> steps. >>>>> - How does this add steps? >>>>> - Where does it cache the values to? >>>>> - How does the runner harness pick up these cached values to >>>>> create new steps? >>>>> - How is this information communicated to the runner harness? >>>>> - Why do the following transforms need to be overridden: >>>>> GroupByKey, WriteToBigQuery, CombineValues, Read? >>>>> >>>>> Each of these four has a different implementation on Dataflow. >>>> >>>>> >>>>> - Why doesn't the ParDo transform need to be overridden? I see >>>>> that it has a run_ method but no apply_ method. >>>>> >>>>> apply_ is called at pipeline construction time, all of these should be >>>> replaced by PTransformOverrides. run_ is called after pipeline construction >>>> to actually build up the dataflow graph. >>>> >>>> >>>>> *Possible fixes* >>>>> I was thinking of getting rid of the apply_ and run_ methods and >>>>> replacing those with a PTransformOverride and a simple PipelineVisitor, >>>>> respectively. Is this feasible? Am I missing any assumptions that don't >>>>> make this feasible? >>>>> >>>> >>>> If we're going to overhaul how the runner works, it would be best to >>>> make DataflowRunner direct a translator from Beam runner api protos to >>>> Cloudv1b3 protos, rather than manipulate the intermediate Python >>>> representation (which no one wants to change for fear of messing up >>>> DataflowRunner and cause headaches for cross langauge). >>>> >>>> >>>>
