Hello All, When I am looking at translating composite transforms in potable pipeline, I realized that TrivialNativeTransformExpander[1] is being used to identify native transforms by transform urn, and it removes sub-transform and environment id in the corresponding transform node. However, QueryablePipeline seems to identify primitive transforms in a different approach [2], which requires us to register runner native transforms again [3][4] in addition to the transform translators.
An idea came to me that we should be able to identify primitive/native transform by look at its environment according to protobuf model [5], // Environment where the current PTransform should be executed in. // // Transforms that are required to be implemented by a runner must omit this. // All other transforms are required to specify this. string environment_id = 7; therefore, I updated the logic: private static boolean isPrimitiveTransform(PTransform transform) { String urn = PTransformTranslation.urnForTransformOrNull(transform); - return PRIMITIVE_URNS.contains(urn) || NativeTransforms.isNative(transform); + return transform.getEnvironmentId().isEmpty() } However, tests started to fail on SQL cases where I found that external transforms seem to have empty environment id as well [6], which does not seem to confront the protobuf model. My questions here are: 1. Is NativeTranforms required to register a primitive/native transform in addition to register with translators? 2. Is empty environment_id a good enough indicator to identify a native/primitive transform? 3. Is external transform suppose to have empty or non-empty environment_id? Best, Ke [1] https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java#L44 <https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java#L44> [2] https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java#L186 <https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java#L186> [3] https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L254 <https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L254> [4] https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java#L412 <https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java#L412> [5] https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L194 <https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L194> [6] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py#L392 <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py#L392>