IIUC, TrivialNativeTransformExpander is introduced to prune composite nodes 
with known urns such that its sub transforms and environment id are eliminated, 
essentially making it a leaf node. I believe it is needed because when 
traversing a QueryablePipeline, only leaf nodes are being returned as the 
expectation is that only leaf nodes needs to be translated directly. So 
essentially, TrivialNativeTransformExpander  is used to make a composite 
transform a leaf node in order to give runners capability to translate it at 
runtime.

I love the idea of relying on transform urns alone to determine primitive 
transforms so make things clearer. In order for us to do that, I suppose it is 
better for us to provide a unified approach to register translators instead of 
each runner has its own way of doing such. [1][2][3]

What are your thoughts?

Best,
Ke


[1] Flink:
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L144
 
<https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L144>
 
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java#L122
 
<https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java#L122>
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L145
 
<https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L145>

[2] Spark:
https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java#L99
 
<https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java#L99>
https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java#L100
 
<https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java#L100>

[3] Samza:
https://github.com/apache/beam/blob/master/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java#L173
 
<https://github.com/apache/beam/blob/master/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java#L173>
 

> On Jul 29, 2021, at 1:58 PM, Robert Bradshaw <rober...@google.com> wrote:
> 
> OK, I have to admit that I have no idea what the purpose of
> TrivialNativeTransformExpander is--maybe something else in the
> infrastructure can't handle translating pipelines that have
> subtransforms or something like that? It seems to me the map of urn ->
> PTransformTranslator should be sufficient to do the translation
> (simply ignoring subtransforms for any known URN when walking the
> tree). In this case the whole question and infrastructure around "is
> this primitive" could simply go away.
> 
> On Wed, Jul 28, 2021 at 8:47 PM Ke Wu <ke.wu...@gmail.com> wrote:
>> 
>> Hi Robert,
>> 
>> Thanks for the reply, the motivation for this is, I noticed when we need to 
>> translate a composite transform, there are two steps involved:
>> 
>> 1. Register the composite transform urn with a delicate translator. [1]
>> 2. Register the composite transform urn with @AutoServer of NativeTranforms 
>> [2]
>> 
>> I was wondering whether step 2 could be eliminated since after a composite 
>> transform urn is registered with translator, its environment id and 
>> subtrasnforms will be removed from pipeline components by 
>> TrivialNativeTransformExpander [3]
>> 
>> If environment id is not sufficient to distinguish a primitive transform, I 
>> suppose we need to keep step 2, unless we update QueryablePipeline to take 
>> known urns as an input.
>> 
>> Does this make sense to you?
>> 
>> Best,
>> Ke
>> 
>> 
>> [1] 
>> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L158
>> [2] 
>> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L254
>> [3] 
>> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java#L59
>> 
>> 
>> On Jul 28, 2021, at 5:52 PM, Robert Bradshaw <rober...@google.com> wrote:
>> 
>> A composite transform is a transform that either only returns its
>> inputs or has subtransforms. A primitive, or leaf, transform would be
>> the complement of that. Checking environment ids is not sufficient
>> (e.g. the Flatten primitive doesn't need an environment, but the ParDo
>> does).
>> 
>> Perhaps it's worth taking a step back and trying to understand why you
>> need to distinguish between primitive and nonprimitive transforms at
>> all. When translating a pipeline,
>> 
>> (1) If you know the URN, you can implement the transform directly,
>> (2) If you don't know the URN, look to see if its outputs are a subset
>> of the inputs (in which case there's nothing to do) or it has
>> subtransforms (in which case you can just recurse to compute the
>> outputs).
>> (3) If neither (1) nor (2) hold, then you throw an error. This is an
>> unknown primitive.
>> 
>> 
>> On Tue, Jul 27, 2021 at 8:30 PM Ke Wu <ke.wu...@gmail.com> wrote:
>> 
>> 
>> Hello All,
>> 
>> When I am looking at translating composite transforms in potable pipeline, I 
>> realized that TrivialNativeTransformExpander[1] is being used to identify 
>> native transforms by transform urn, and it removes sub-transform and 
>> environment id in the corresponding transform node. However, 
>> QueryablePipeline seems to identify primitive transforms in a different 
>> approach [2], which requires us to register runner native transforms again 
>> [3][4] in addition to the transform translators.
>> 
>> An idea came to me that we should be able to identify primitive/native 
>> transform by look at its environment according to protobuf model [5],
>> 
>> // Environment where the current PTransform should be executed in.
>> //
>> // Transforms that are required to be implemented by a runner must omit this.
>> // All other transforms are required to specify this.
>> string environment_id = 7;
>> 
>> 
>> therefore, I updated the logic:
>> 
>>  private static boolean isPrimitiveTransform(PTransform transform) {
>>    String urn = PTransformTranslation.urnForTransformOrNull(transform);
>> -    return PRIMITIVE_URNS.contains(urn) || 
>> NativeTransforms.isNative(transform);
>> +   return transform.getEnvironmentId().isEmpty()
>>  }
>> 
>> 
>> However, tests started to fail on SQL cases where I found that external 
>> transforms seem to have empty environment id as well [6], which does not 
>> seem to confront the protobuf model.
>> 
>> My questions here are:
>> 
>> 1. Is NativeTranforms required to register a primitive/native transform in 
>> addition to register with translators?
>> 2. Is empty environment_id a good enough indicator to identify a 
>> native/primitive transform?
>> 3. Is external transform suppose to have empty or non-empty environment_id?
>> 
>> Best,
>> Ke
>> 
>> 
>> [1] 
>> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java#L44
>> [2] 
>> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java#L186
>> [3] 
>> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L254
>> [4] 
>> https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java#L412
>> [5] 
>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L194
>> [6] 
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py#L392
>> 
>> 

Reply via email to