IIUC, TrivialNativeTransformExpander is introduced to prune composite nodes with known urns such that its sub transforms and environment id are eliminated, essentially making it a leaf node. I believe it is needed because when traversing a QueryablePipeline, only leaf nodes are being returned as the expectation is that only leaf nodes needs to be translated directly. So essentially, TrivialNativeTransformExpander is used to make a composite transform a leaf node in order to give runners capability to translate it at runtime.
I love the idea of relying on transform urns alone to determine primitive transforms so make things clearer. In order for us to do that, I suppose it is better for us to provide a unified approach to register translators instead of each runner has its own way of doing such. [1][2][3] What are your thoughts? Best, Ke [1] Flink: https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L144 <https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L144> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java#L122 <https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java#L122> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L145 <https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L145> [2] Spark: https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java#L99 <https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java#L99> https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java#L100 <https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java#L100> [3] Samza: https://github.com/apache/beam/blob/master/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java#L173 <https://github.com/apache/beam/blob/master/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java#L173> > On Jul 29, 2021, at 1:58 PM, Robert Bradshaw <rober...@google.com> wrote: > > OK, I have to admit that I have no idea what the purpose of > TrivialNativeTransformExpander is--maybe something else in the > infrastructure can't handle translating pipelines that have > subtransforms or something like that? It seems to me the map of urn -> > PTransformTranslator should be sufficient to do the translation > (simply ignoring subtransforms for any known URN when walking the > tree). In this case the whole question and infrastructure around "is > this primitive" could simply go away. > > On Wed, Jul 28, 2021 at 8:47 PM Ke Wu <ke.wu...@gmail.com> wrote: >> >> Hi Robert, >> >> Thanks for the reply, the motivation for this is, I noticed when we need to >> translate a composite transform, there are two steps involved: >> >> 1. Register the composite transform urn with a delicate translator. [1] >> 2. Register the composite transform urn with @AutoServer of NativeTranforms >> [2] >> >> I was wondering whether step 2 could be eliminated since after a composite >> transform urn is registered with translator, its environment id and >> subtrasnforms will be removed from pipeline components by >> TrivialNativeTransformExpander [3] >> >> If environment id is not sufficient to distinguish a primitive transform, I >> suppose we need to keep step 2, unless we update QueryablePipeline to take >> known urns as an input. >> >> Does this make sense to you? >> >> Best, >> Ke >> >> >> [1] >> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L158 >> [2] >> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L254 >> [3] >> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java#L59 >> >> >> On Jul 28, 2021, at 5:52 PM, Robert Bradshaw <rober...@google.com> wrote: >> >> A composite transform is a transform that either only returns its >> inputs or has subtransforms. A primitive, or leaf, transform would be >> the complement of that. Checking environment ids is not sufficient >> (e.g. the Flatten primitive doesn't need an environment, but the ParDo >> does). >> >> Perhaps it's worth taking a step back and trying to understand why you >> need to distinguish between primitive and nonprimitive transforms at >> all. When translating a pipeline, >> >> (1) If you know the URN, you can implement the transform directly, >> (2) If you don't know the URN, look to see if its outputs are a subset >> of the inputs (in which case there's nothing to do) or it has >> subtransforms (in which case you can just recurse to compute the >> outputs). >> (3) If neither (1) nor (2) hold, then you throw an error. This is an >> unknown primitive. >> >> >> On Tue, Jul 27, 2021 at 8:30 PM Ke Wu <ke.wu...@gmail.com> wrote: >> >> >> Hello All, >> >> When I am looking at translating composite transforms in potable pipeline, I >> realized that TrivialNativeTransformExpander[1] is being used to identify >> native transforms by transform urn, and it removes sub-transform and >> environment id in the corresponding transform node. However, >> QueryablePipeline seems to identify primitive transforms in a different >> approach [2], which requires us to register runner native transforms again >> [3][4] in addition to the transform translators. >> >> An idea came to me that we should be able to identify primitive/native >> transform by look at its environment according to protobuf model [5], >> >> // Environment where the current PTransform should be executed in. >> // >> // Transforms that are required to be implemented by a runner must omit this. >> // All other transforms are required to specify this. >> string environment_id = 7; >> >> >> therefore, I updated the logic: >> >> private static boolean isPrimitiveTransform(PTransform transform) { >> String urn = PTransformTranslation.urnForTransformOrNull(transform); >> - return PRIMITIVE_URNS.contains(urn) || >> NativeTransforms.isNative(transform); >> + return transform.getEnvironmentId().isEmpty() >> } >> >> >> However, tests started to fail on SQL cases where I found that external >> transforms seem to have empty environment id as well [6], which does not >> seem to confront the protobuf model. >> >> My questions here are: >> >> 1. Is NativeTranforms required to register a primitive/native transform in >> addition to register with translators? >> 2. Is empty environment_id a good enough indicator to identify a >> native/primitive transform? >> 3. Is external transform suppose to have empty or non-empty environment_id? >> >> Best, >> Ke >> >> >> [1] >> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java#L44 >> [2] >> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java#L186 >> [3] >> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L254 >> [4] >> https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java#L412 >> [5] >> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L194 >> [6] >> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py#L392 >> >>