Hello All,

When I am looking at translating composite transforms in potable pipeline, I 
realized that TrivialNativeTransformExpander[1] is being used to identify 
native transforms by transform urn, and it removes sub-transform and 
environment id in the corresponding transform node. However, QueryablePipeline 
seems to identify primitive transforms in a different approach [2], which 
requires us to register runner native transforms again [3][4] in addition to 
the transform translators.

An idea came to me that we should be able to identify primitive/native 
transform by look at its environment according to protobuf model [5], 

// Environment where the current PTransform should be executed in.
//
// Transforms that are required to be implemented by a runner must omit this.
// All other transforms are required to specify this.
string environment_id = 7;

therefore, I updated the logic:

   private static boolean isPrimitiveTransform(PTransform transform) {
     String urn = PTransformTranslation.urnForTransformOrNull(transform);
-    return PRIMITIVE_URNS.contains(urn) || 
NativeTransforms.isNative(transform);
+   return transform.getEnvironmentId().isEmpty()
   }

However, tests started to fail on SQL cases where I found that external 
transforms seem to have empty environment id as well [6], which does not seem 
to confront the protobuf model.

My questions here are:

1. Is NativeTranforms required to register a primitive/native transform in 
addition to register with translators?
2. Is empty environment_id a good enough indicator to identify a 
native/primitive transform?
3. Is external transform suppose to have empty or non-empty environment_id?

Best,
Ke


[1] 
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java#L44
 
<https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java#L44>
 
[2] 
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java#L186
 
<https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java#L186>
 
[3] 
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L254
 
<https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L254>
 
[4] 
https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java#L412
 
<https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java#L412>
 
[5] 
https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L194
 
<https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L194>
 
[6] 
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py#L392
 
<https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py#L392>
 

Reply via email to