Thank you Robert for the response. After some more digging, I realized that Flink/Spark runner is translating composite transforms with
// Don't let the fuser fuse any subcomponents of native transforms. Pipeline trimmedPipeline = TrivialNativeTransformExpander.forKnownUrns( pipelineWithSdfExpanded, translator.knownUrns()); which will mark composite transform whose urn is in translator list primitive, i.e, leaf node, therefore the traverse latter will be able to return it to be translated. I believe this is a step that is currently missing in the Samza runner which makes the composite transform translation not working properly. Since I am working on Samza runner, I will post a PR to fix it. Best, Ke > On Jul 26, 2021, at 5:45 PM, Robert Bradshaw <rober...@google.com> wrote: > > You can think of composite transforms like subroutines--they're useful > concepts for representing the logical structure of the pipeline, but > for the purposes of execution it is just as valid to inline them all > as a single monolithic function/pipeline composed of nothing but > primitive calls. Flink/Spark/Samza have no native notion of composite > transforms, so this is what they do. If you can preserve the more rich > structure that has advantages (e.g. for monitoring, debugging, rolling > up counters and messages, visualizing the pipeline). > > There is one other important case for composites that runners may want > to take advantage of: runners may recognize higher-level transforms > and substitute their own (equivalent, of course) implementations. The > prototypical example of this is combiner lifting, where CombinePerKey > is naively implemented as GroupByKey + CombineAllValuesDoFn, but most > runners have more sophisticated ways of handling associative, > commutative CombineFn aggregations (See > https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE/edit#slide=id.g42e4c9aad6_0_260 > ) > > - Robert > > On Mon, Jul 26, 2021 at 5:27 PM Ke Wu <ke.wu...@gmail.com> wrote: >> >> Hello All, >> >> I noticed that Flink/Spark/Samza runners are translating portable pipeline >> in the similar manner: >> >> QueryablePipeline p = >> QueryablePipeline.forTransforms( >> pipeline.getRootTransformIdsList(), pipeline.getComponents()); >> >> for (PipelineNode.PTransformNode transform : >> p.getTopologicallyOrderedTransforms()) { >> // Translation logic >> } >> >> However, IIUC, this only iterates through leaf nodes of the pipeline, i.e. >> composite transforms are NOT being translated at all. >> >> Is this the expected behavior for runner to implement translation logic for >> portable pipeline? If Yes, what are the suggestions if certain runners need >> to translate composite transforms? >> >> Best, >> Ke