Thank you Robert for the response. 

After some more digging, I realized that Flink/Spark runner is translating 
composite transforms with

// Don't let the fuser fuse any subcomponents of native transforms.
Pipeline trimmedPipeline =
    TrivialNativeTransformExpander.forKnownUrns(
        pipelineWithSdfExpanded, translator.knownUrns());
which will mark composite transform whose urn is in translator list primitive, 
i.e, leaf node, therefore the traverse latter will be able to return it to be 
translated.

I believe this is a step that is currently missing in the Samza runner which 
makes the composite transform translation not working properly. 

Since I am working on Samza runner, I will post a PR to fix it.

Best,
Ke
 

> On Jul 26, 2021, at 5:45 PM, Robert Bradshaw <rober...@google.com> wrote:
> 
> You can think of composite transforms like subroutines--they're useful
> concepts for representing the logical structure of the pipeline, but
> for the purposes of execution it is just as valid to inline them all
> as a single monolithic function/pipeline composed of nothing but
> primitive calls. Flink/Spark/Samza have no native notion of composite
> transforms, so this is what they do. If you can preserve the more rich
> structure that has advantages (e.g. for monitoring, debugging, rolling
> up counters and messages, visualizing the pipeline).
> 
> There is one other important case for composites that runners may want
> to take advantage of: runners may recognize higher-level transforms
> and substitute their own (equivalent, of course) implementations. The
> prototypical example of this is combiner lifting, where CombinePerKey
> is naively implemented as GroupByKey + CombineAllValuesDoFn, but most
> runners have more sophisticated ways of handling associative,
> commutative CombineFn aggregations (See
> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE/edit#slide=id.g42e4c9aad6_0_260
> )
> 
> - Robert
> 
> On Mon, Jul 26, 2021 at 5:27 PM Ke Wu <ke.wu...@gmail.com> wrote:
>> 
>> Hello All,
>> 
>> I noticed that Flink/Spark/Samza runners are translating portable pipeline 
>> in the similar manner:
>> 
>> QueryablePipeline p =
>>    QueryablePipeline.forTransforms(
>>        pipeline.getRootTransformIdsList(), pipeline.getComponents());
>> 
>> for (PipelineNode.PTransformNode transform : 
>> p.getTopologicallyOrderedTransforms()) {
>>  // Translation logic
>> }
>> 
>> However, IIUC, this only iterates through leaf nodes of the pipeline, i.e. 
>> composite transforms are NOT being translated at all.
>> 
>> Is this the expected behavior for runner to implement translation logic for 
>> portable pipeline? If Yes, what are the suggestions if certain runners need 
>> to translate composite transforms?
>> 
>> Best,
>> Ke

Reply via email to