Hello all, I'm a data engineer at Mozilla working on a first project using
Beam. I've been impressed with the usability of the API as there are good
built-in solutions for handling many simple transformation cases with
minimal code, and wanted to discuss one bit of ergonomics that seems to be
missing.

It appears that none of the existing PTransform factories are generic
enough to take in or output a PCollectionTuple, but we've found many use
cases where it's convenient to apply a few transforms on a PCollectionTuple
in a lambda expression.

For example, we've defined several PTransforms that return main and error
output stream bundled in a PCollectionTuple. We defined a
CompositeTransform interface so that we could handle the error output in a
lambda expression like:

pipeline
    .apply("attempt to deserialize messages", new
MyDeserializationTransform())
    .apply("write deserialization errors",
        CompositeTransform.of((PCollectionTuple input) -> {
            input.get(errorTag).apply(new MyErrorOutputTransform())
            return input.get(mainTag);
        })
    .apply("more processing on the deserialized messages", new
MyOtherTransform())

I'd be interested in contributing a patch to add this functionality,
perhaps as a static method PTransform.compose(). Would that patch be
welcome? Are there other thoughts on naming?

The full code of the CompositeTransform interface we're currently using is
included below.


public interface CompositeTransform<InputT extends PInput, OutputT extends
POutput> {
  OutputT expand(InputT input);

  /**
   * The public factory method that serves as the entrypoint for users to
create a composite PTransform.
   */
  static <InputT extends PInput, OutputT extends POutput>
        PTransform<InputT, OutputT> of(CompositeTransform<InputT, OutputT>
transform) {
    return new PTransform<InputT, OutputT>() {
      @Override
      public OutputT expand(InputT input) {
        return transform.expand(input);
      }
    };
  }
}

Reply via email to