I'll add, that there are some issues with the TrivialNativeTransformExpander related to SDF/primitive Read expansion. The call to SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary() somehow causes the TrivialNativeTransformExpander to return a pipeline that then throws exception when being fed to GreedyPipelineFuser.fuse(...).toPipeline(). The exception is (for example):

java.lang.IllegalArgumentException: PCollectionNodes 
[PCollectionNode{id=PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output,
 PCollection=unique_name: 
"PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output"
coder_id: "IterableCoder"
is_bounded: UNBOUNDED
windowing_strategy_id: "WindowingStrategy(GlobalWindows)"
}] were consumed but never produced

So far, I was not able to figure out if this problem is in the TrivialNativeTransformExpander or the GreedyPipelineFuser, or somewhere in between, but this whole stuff looks somewhat fragile. If we could replace it with something more robust, it would be great.

 Jan

On 8/2/21 8:53 PM, Robert Bradshaw wrote:
.  iOn Mon, Aug 2, 2021 at 10:55 AM Ke Wu <ke.wu...@gmail.com> wrote:
IIUC, TrivialNativeTransformExpander is introduced to prune composite nodes 
with known urns such that its sub transforms and environment id are eliminated, 
essentially making it a leaf node. I believe it is needed because when 
traversing a QueryablePipeline, only leaf nodes are being returned as the 
expectation is that only leaf nodes needs to be translated directly. So 
essentially, TrivialNativeTransformExpander  is used to make a composite 
transform a leaf node in order to give runners capability to translate it at 
runtime.
So, in short, TrivialNativeTransformExpander is needed to handle the
implicit expectations of QueryablePipeline. (I will say that sometimes
it is easier to reason about things like "what is /the/ producer of
PCollection X rather than think about the fact that both a "leaf" and
any number of composites could be considered its producer.)

I love the idea of relying on transform urns alone to determine primitive 
transforms so make things clearer. In order for us to do that, I suppose it is 
better for us to provide a unified approach to register translators instead of 
each runner has its own way of doing such. [1][2][3]

What are your thoughts?
Yes, each runner has a set of URNs that it can handle directly. We
could let this set be the "primitives" or "things to be translated."
+1 to making this more uniform.

It's possible that a runner should be able to say "sometimes I can
handle this URN, sometimes I can't and want to fall back to the
composite implementation" which could complicate this.

Best,
Ke


[1] Flink:
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L144
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java#L122
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L145

[2] Spark:
https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java#L99
https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java#L100

[3] Samza:
https://github.com/apache/beam/blob/master/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java#L173

On Jul 29, 2021, at 1:58 PM, Robert Bradshaw <rober...@google.com> wrote:

OK, I have to admit that I have no idea what the purpose of
TrivialNativeTransformExpander is--maybe something else in the
infrastructure can't handle translating pipelines that have
subtransforms or something like that? It seems to me the map of urn ->
PTransformTranslator should be sufficient to do the translation
(simply ignoring subtransforms for any known URN when walking the
tree). In this case the whole question and infrastructure around "is
this primitive" could simply go away.

On Wed, Jul 28, 2021 at 8:47 PM Ke Wu <ke.wu...@gmail.com> wrote:


Hi Robert,

Thanks for the reply, the motivation for this is, I noticed when we need to 
translate a composite transform, there are two steps involved:

1. Register the composite transform urn with a delicate translator. [1]
2. Register the composite transform urn with @AutoServer of NativeTranforms [2]

I was wondering whether step 2 could be eliminated since after a composite 
transform urn is registered with translator, its environment id and 
subtrasnforms will be removed from pipeline components by 
TrivialNativeTransformExpander [3]

If environment id is not sufficient to distinguish a primitive transform, I 
suppose we need to keep step 2, unless we update QueryablePipeline to take 
known urns as an input.

Does this make sense to you?

Best,
Ke


[1] 
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L158
[2] 
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L254
[3] 
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java#L59


On Jul 28, 2021, at 5:52 PM, Robert Bradshaw <rober...@google.com> wrote:

A composite transform is a transform that either only returns its
inputs or has subtransforms. A primitive, or leaf, transform would be
the complement of that. Checking environment ids is not sufficient
(e.g. the Flatten primitive doesn't need an environment, but the ParDo
does).

Perhaps it's worth taking a step back and trying to understand why you
need to distinguish between primitive and nonprimitive transforms at
all. When translating a pipeline,

(1) If you know the URN, you can implement the transform directly,
(2) If you don't know the URN, look to see if its outputs are a subset
of the inputs (in which case there's nothing to do) or it has
subtransforms (in which case you can just recurse to compute the
outputs).
(3) If neither (1) nor (2) hold, then you throw an error. This is an
unknown primitive.


On Tue, Jul 27, 2021 at 8:30 PM Ke Wu <ke.wu...@gmail.com> wrote:


Hello All,

When I am looking at translating composite transforms in potable pipeline, I 
realized that TrivialNativeTransformExpander[1] is being used to identify 
native transforms by transform urn, and it removes sub-transform and 
environment id in the corresponding transform node. However, QueryablePipeline 
seems to identify primitive transforms in a different approach [2], which 
requires us to register runner native transforms again [3][4] in addition to 
the transform translators.

An idea came to me that we should be able to identify primitive/native 
transform by look at its environment according to protobuf model [5],

// Environment where the current PTransform should be executed in.
//
// Transforms that are required to be implemented by a runner must omit this.
// All other transforms are required to specify this.
string environment_id = 7;


therefore, I updated the logic:

  private static boolean isPrimitiveTransform(PTransform transform) {
    String urn = PTransformTranslation.urnForTransformOrNull(transform);
-    return PRIMITIVE_URNS.contains(urn) || 
NativeTransforms.isNative(transform);
+   return transform.getEnvironmentId().isEmpty()
  }


However, tests started to fail on SQL cases where I found that external 
transforms seem to have empty environment id as well [6], which does not seem 
to confront the protobuf model.

My questions here are:

1. Is NativeTranforms required to register a primitive/native transform in 
addition to register with translators?
2. Is empty environment_id a good enough indicator to identify a 
native/primitive transform?
3. Is external transform suppose to have empty or non-empty environment_id?

Best,
Ke


[1] 
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java#L44
[2] 
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java#L186
[3] 
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L254
[4] 
https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java#L412
[5] 
https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L194
[6] 
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py#L392



Reply via email to