A primitive transform is a PTransform that has been chosen to have no
default implementation in terms of other PTransforms. A primitive transform
therefore must be implemented directly by a pipeline runner in terms of
pipeline-runner-specific concepts. An initial list of primitive PTransforms
were defined in [2] and has since been updated in [3].

As part of the portability effort, libraries that are intended to be shared
across multiple runners are being developed to support their migration to a
portable execution model. One of these is responsible for fusing multiple
primitive PTransforms together into a pipeline runner specific concept.
This library made the choice that a primitive PTransform is a PTransform
that doesn't contain any other PTransforms.

Unfortunately, while Ryan was attempting to enable testing of validates
runner tests for Flink using the new portability libraries, he ran into an
issue where the Apache Beam Java SDK allows for a person to construct a
PTransform that has zero sub PTransforms and also isn't one of the defined
Apache Beam primitives. In this case the PTransform was trivial as it was
not applying any additional transforms to input PCollection and just
returning it. This caused an issue within the portability libraries since
they couldn't handle this structure.

To solve this issue, I had proposed that we modify the portability library
that does fusion to use a whitelist of primitives preventing the issue from
happening. This solved the problem but caused an issue for Thomas as he was
relying on this behaviour of PTransforms with zero sub transforms being
primitives. Thomas has a use-case where he wants to expose the internal
Flink Kafka and Kinesis connectors and to build Apache Beam pipelines that
use the Flink native sources/sinks. I'll call these "native" PTransforms,
since they aren't part of the Apache Beam model and are runner specific.

This brings up two topics:
A) What should we do with these "empty" PTransforms?
B) What should we do with "native" PTransforms?

The typical flow of a pipeline representation for a portable pipeline is:
language specific representation -> proto representation -> job service ->
shared libraries that simplify/replace the proto representation with a
simplified version (e.g. fusion) -> runner specific conversion to native
runner concepts (e.g. GBK -> runner implementation of GBK)

------------------

A) What should we do with these "empty" PTransforms?

To give a little more detail, these transforms typically can happen if
people have conditional logic such as loops that would perform an expansion
but do nothing if the condition is immediately unsatisfied. So allowing for
PTransforms that are empty is useful when building a pipeline.

What should we do:
A1) Stick with the whitelist of primitive PTransforms.
A2) When converting the pipeline from language specific representation into
the proto representation, drop any "empty" PTransforms. This means that the
pipeline representation that is sent to the runner doesn't contain the
offending type of PTransform and the shared libraries wouldn't have to
change.
A3) Handle the "empty" PTransform case within all of the shared libraries.

I like doing both A1 and A2. A1 since it helps simplify the shared
libraries since we know the whole list of primitives we need to understand
and A2 because it removes noise within the pipeline shape from its
representation.

------------------

B) What should we do with "native" PTransforms?

Some approaches that we could take as a community:

B1) Prevent the usage of "native" PTransforms within Apache Beam since they
hurt portability of pipelines across runners. This can be done by
specifically using whitelists of allowed primitive PTransforms in the
shared libraries and explicitly not allowing for shared libraries to have
extension points customizing this.

B2) We embrace that closed systems internal to companies will want to use
their own extensions and enable support for "native" PTransforms but
actively discourage "native" PTransforms in the open ecosystem.

B3) We embrace and allow for "native" PTransforms in the open ecosystem.

"native" PTransforms are useful in closed systems since they allow
companies to solve certain scenarios which would not be practical to expose
the Apache Beam community. It does take more work for the community to
support these types of extensions. To my knowledge, Google is likely to
want to do something similar to handle internal use cases similar to what
Thomas is trying to do.

I'm for either B1 or B2 since the risk of embracing and allowing for
"native" PTransforms in the open ecosystem is likely to fragment the
project and also is counter to what portability is really about.

1: https://github.com/apache/beam/pull/6328
2:
https://docs.google.com/document/d/1bao-5B6uBuf-kwH1meenAuXXS0c9cBQ1B2J59I3FiyI/edit#heading=h.tt55lhd3k6by

3:
https://github.com/apache/beam/blob/6df2ef3ec9c835097e79b4441ce47ff09a458894/model/pipeline/src/main/proto/beam_runner_api.proto#L180

Reply via email to