I've gone ahead and filed a JIRA Issue and GitHub PR to follow up on this
suggestion and make it more concrete:

https://issues.apache.org/jira/browse/BEAM-5413
https://github.com/apache/beam/pull/6414

On Fri, Sep 14, 2018 at 1:42 PM Jeff Klukas <[email protected]> wrote:

> Hello all, I'm a data engineer at Mozilla working on a first project using
> Beam. I've been impressed with the usability of the API as there are good
> built-in solutions for handling many simple transformation cases with
> minimal code, and wanted to discuss one bit of ergonomics that seems to be
> missing.
>
> It appears that none of the existing PTransform factories are generic
> enough to take in or output a PCollectionTuple, but we've found many use
> cases where it's convenient to apply a few transforms on a PCollectionTuple
> in a lambda expression.
>
> For example, we've defined several PTransforms that return main and error
> output stream bundled in a PCollectionTuple. We defined a
> CompositeTransform interface so that we could handle the error output in a
> lambda expression like:
>
> pipeline
>     .apply("attempt to deserialize messages", new
> MyDeserializationTransform())
>     .apply("write deserialization errors",
>         CompositeTransform.of((PCollectionTuple input) -> {
>             input.get(errorTag).apply(new MyErrorOutputTransform())
>             return input.get(mainTag);
>         })
>     .apply("more processing on the deserialized messages", new
> MyOtherTransform())
>
> I'd be interested in contributing a patch to add this functionality,
> perhaps as a static method PTransform.compose(). Would that patch be
> welcome? Are there other thoughts on naming?
>
> The full code of the CompositeTransform interface we're currently using is
> included below.
>
>
> public interface CompositeTransform<InputT extends PInput, OutputT extends
> POutput> {
>   OutputT expand(InputT input);
>
>   /**
>    * The public factory method that serves as the entrypoint for users to
> create a composite PTransform.
>    */
>   static <InputT extends PInput, OutputT extends POutput>
>         PTransform<InputT, OutputT> of(CompositeTransform<InputT, OutputT>
> transform) {
>     return new PTransform<InputT, OutputT>() {
>       @Override
>       public OutputT expand(InputT input) {
>         return transform.expand(input);
>       }
>     };
>   }
> }
>
>
>
>

Reply via email to