Hi Robert,

Thanks for the reply, the motivation for this is, I noticed when we need to 
translate a composite transform, there are two steps involved:

1. Register the composite transform urn with a delicate translator. [1]
2. Register the composite transform urn with @AutoServer of NativeTranforms [2]

I was wondering whether step 2 could be eliminated since after a composite 
transform urn is registered with translator, its environment id and 
subtrasnforms will be removed from pipeline components by 
TrivialNativeTransformExpander [3]

If environment id is not sufficient to distinguish a primitive transform, I 
suppose we need to keep step 2, unless we update QueryablePipeline to take 
known urns as an input.

Does this make sense to you?

Best,
Ke


[1] 
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L158
 
<https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L158>
 
[2] 
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L254
 
<https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L254>
 
[3] 
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java#L59
 
<https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java#L59>
 


> On Jul 28, 2021, at 5:52 PM, Robert Bradshaw <rober...@google.com> wrote:
> 
> A composite transform is a transform that either only returns its
> inputs or has subtransforms. A primitive, or leaf, transform would be
> the complement of that. Checking environment ids is not sufficient
> (e.g. the Flatten primitive doesn't need an environment, but the ParDo
> does).
> 
> Perhaps it's worth taking a step back and trying to understand why you
> need to distinguish between primitive and nonprimitive transforms at
> all. When translating a pipeline,
> 
> (1) If you know the URN, you can implement the transform directly,
> (2) If you don't know the URN, look to see if its outputs are a subset
> of the inputs (in which case there's nothing to do) or it has
> subtransforms (in which case you can just recurse to compute the
> outputs).
> (3) If neither (1) nor (2) hold, then you throw an error. This is an
> unknown primitive.
> 
> 
> On Tue, Jul 27, 2021 at 8:30 PM Ke Wu <ke.wu...@gmail.com> wrote:
>> 
>> Hello All,
>> 
>> When I am looking at translating composite transforms in potable pipeline, I 
>> realized that TrivialNativeTransformExpander[1] is being used to identify 
>> native transforms by transform urn, and it removes sub-transform and 
>> environment id in the corresponding transform node. However, 
>> QueryablePipeline seems to identify primitive transforms in a different 
>> approach [2], which requires us to register runner native transforms again 
>> [3][4] in addition to the transform translators.
>> 
>> An idea came to me that we should be able to identify primitive/native 
>> transform by look at its environment according to protobuf model [5],
>> 
>> // Environment where the current PTransform should be executed in.
>> //
>> // Transforms that are required to be implemented by a runner must omit this.
>> // All other transforms are required to specify this.
>> string environment_id = 7;
>> 
>> 
>> therefore, I updated the logic:
>> 
>>   private static boolean isPrimitiveTransform(PTransform transform) {
>>     String urn = PTransformTranslation.urnForTransformOrNull(transform);
>> -    return PRIMITIVE_URNS.contains(urn) || 
>> NativeTransforms.isNative(transform);
>> +   return transform.getEnvironmentId().isEmpty()
>>   }
>> 
>> 
>> However, tests started to fail on SQL cases where I found that external 
>> transforms seem to have empty environment id as well [6], which does not 
>> seem to confront the protobuf model.
>> 
>> My questions here are:
>> 
>> 1. Is NativeTranforms required to register a primitive/native transform in 
>> addition to register with translators?
>> 2. Is empty environment_id a good enough indicator to identify a 
>> native/primitive transform?
>> 3. Is external transform suppose to have empty or non-empty environment_id?
>> 
>> Best,
>> Ke
>> 
>> 
>> [1] 
>> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java#L44
>> [2] 
>> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java#L186
>> [3] 
>> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L254
>> [4] 
>> https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java#L412
>> [5] 
>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L194
>> [6] 
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py#L392
>> 

Reply via email to