I'll add, that there are some issues with the
TrivialNativeTransformExpander related to SDF/primitive Read expansion.
The call to
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary()
somehow causes the TrivialNativeTransformExpander to return a pipeline
that then throws exception when being fed to
GreedyPipelineFuser.fuse(...).toPipeline(). The exception is (for example):
java.lang.IllegalArgumentException: PCollectionNodes
[PCollectionNode{id=PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output,
PCollection=unique_name:
"PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output"
coder_id: "IterableCoder"
is_bounded: UNBOUNDED
windowing_strategy_id: "WindowingStrategy(GlobalWindows)"
}] were consumed but never produced
So far, I was not able to figure out if this problem is in the
TrivialNativeTransformExpander or the GreedyPipelineFuser, or somewhere
in between, but this whole stuff looks somewhat fragile. If we could
replace it with something more robust, it would be great.
Jan
On 8/2/21 8:53 PM, Robert Bradshaw wrote:
. iOn Mon, Aug 2, 2021 at 10:55 AM Ke Wu <ke.wu...@gmail.com> wrote:
IIUC, TrivialNativeTransformExpander is introduced to prune composite nodes
with known urns such that its sub transforms and environment id are eliminated,
essentially making it a leaf node. I believe it is needed because when
traversing a QueryablePipeline, only leaf nodes are being returned as the
expectation is that only leaf nodes needs to be translated directly. So
essentially, TrivialNativeTransformExpander is used to make a composite
transform a leaf node in order to give runners capability to translate it at
runtime.
So, in short, TrivialNativeTransformExpander is needed to handle the
implicit expectations of QueryablePipeline. (I will say that sometimes
it is easier to reason about things like "what is /the/ producer of
PCollection X rather than think about the fact that both a "leaf" and
any number of composites could be considered its producer.)
I love the idea of relying on transform urns alone to determine primitive
transforms so make things clearer. In order for us to do that, I suppose it is
better for us to provide a unified approach to register translators instead of
each runner has its own way of doing such. [1][2][3]
What are your thoughts?
Yes, each runner has a set of URNs that it can handle directly. We
could let this set be the "primitives" or "things to be translated."
+1 to making this more uniform.
It's possible that a runner should be able to say "sometimes I can
handle this URN, sometimes I can't and want to fall back to the
composite implementation" which could complicate this.
Best,
Ke
[1] Flink:
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L144
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java#L122
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L145
[2] Spark:
https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java#L99
https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java#L100
[3] Samza:
https://github.com/apache/beam/blob/master/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java#L173
On Jul 29, 2021, at 1:58 PM, Robert Bradshaw <rober...@google.com> wrote:
OK, I have to admit that I have no idea what the purpose of
TrivialNativeTransformExpander is--maybe something else in the
infrastructure can't handle translating pipelines that have
subtransforms or something like that? It seems to me the map of urn ->
PTransformTranslator should be sufficient to do the translation
(simply ignoring subtransforms for any known URN when walking the
tree). In this case the whole question and infrastructure around "is
this primitive" could simply go away.
On Wed, Jul 28, 2021 at 8:47 PM Ke Wu <ke.wu...@gmail.com> wrote:
Hi Robert,
Thanks for the reply, the motivation for this is, I noticed when we need to
translate a composite transform, there are two steps involved:
1. Register the composite transform urn with a delicate translator. [1]
2. Register the composite transform urn with @AutoServer of NativeTranforms [2]
I was wondering whether step 2 could be eliminated since after a composite
transform urn is registered with translator, its environment id and
subtrasnforms will be removed from pipeline components by
TrivialNativeTransformExpander [3]
If environment id is not sufficient to distinguish a primitive transform, I
suppose we need to keep step 2, unless we update QueryablePipeline to take
known urns as an input.
Does this make sense to you?
Best,
Ke
[1]
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L158
[2]
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L254
[3]
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java#L59
On Jul 28, 2021, at 5:52 PM, Robert Bradshaw <rober...@google.com> wrote:
A composite transform is a transform that either only returns its
inputs or has subtransforms. A primitive, or leaf, transform would be
the complement of that. Checking environment ids is not sufficient
(e.g. the Flatten primitive doesn't need an environment, but the ParDo
does).
Perhaps it's worth taking a step back and trying to understand why you
need to distinguish between primitive and nonprimitive transforms at
all. When translating a pipeline,
(1) If you know the URN, you can implement the transform directly,
(2) If you don't know the URN, look to see if its outputs are a subset
of the inputs (in which case there's nothing to do) or it has
subtransforms (in which case you can just recurse to compute the
outputs).
(3) If neither (1) nor (2) hold, then you throw an error. This is an
unknown primitive.
On Tue, Jul 27, 2021 at 8:30 PM Ke Wu <ke.wu...@gmail.com> wrote:
Hello All,
When I am looking at translating composite transforms in potable pipeline, I
realized that TrivialNativeTransformExpander[1] is being used to identify
native transforms by transform urn, and it removes sub-transform and
environment id in the corresponding transform node. However, QueryablePipeline
seems to identify primitive transforms in a different approach [2], which
requires us to register runner native transforms again [3][4] in addition to
the transform translators.
An idea came to me that we should be able to identify primitive/native
transform by look at its environment according to protobuf model [5],
// Environment where the current PTransform should be executed in.
//
// Transforms that are required to be implemented by a runner must omit this.
// All other transforms are required to specify this.
string environment_id = 7;
therefore, I updated the logic:
private static boolean isPrimitiveTransform(PTransform transform) {
String urn = PTransformTranslation.urnForTransformOrNull(transform);
- return PRIMITIVE_URNS.contains(urn) ||
NativeTransforms.isNative(transform);
+ return transform.getEnvironmentId().isEmpty()
}
However, tests started to fail on SQL cases where I found that external
transforms seem to have empty environment id as well [6], which does not seem
to confront the protobuf model.
My questions here are:
1. Is NativeTranforms required to register a primitive/native transform in
addition to register with translators?
2. Is empty environment_id a good enough indicator to identify a
native/primitive transform?
3. Is external transform suppose to have empty or non-empty environment_id?
Best,
Ke
[1]
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java#L44
[2]
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java#L186
[3]
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L254
[4]
https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java#L412
[5]
https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L194
[6]
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py#L392