On Fri, Dec 20, 2019 at 11:38 AM Luke Cwik <lc...@google.com> wrote:

> What do side inputs look like?
>

A user needs to first pass PCollections for side inputs into the external
transform in addition to ordinary input PCollections and define
PCollectionViews inside the external transform something like:

PCollectionTuple pTuple =
    PCollectionTuple.of("main1", main1)
        .and("main2", main2)
        .and("side", side)
        .apply(External.of(...).withMultiOutputs());

public static class TestTransform extends PTransform<PCollectionTuple,
PCollectionTuple> {
  @Override
  public PCollectionTuple expand(PCollectionTuple input) {
    PCollectionView<String> sideView =
input.<String>get("side").apply(View.asSingleton());
    PCollection<String> main =
        PCollectionList.<String>of(input.get("main1"))
            .and(input.get("main2"))
            .apply(Flatten.pCollections())
            .apply(
                ParDo.of(
                        new DoFn<String, String>() {
                          @ProcessElement
                          public void processElement(
                              @Element String x,
                              OutputReceiver<String> out,
                              DoFn<String, String>.ProcessContext c) {
                            out.output(x + c.sideInput(sideView));
                          }
                        })
                    .withSideInputs(sideView));



> On Thu, Dec 19, 2019 at 4:39 PM Heejong Lee <heej...@google.com> wrote:
>
>> I wanted to know if anybody has any comment on external transform API for
>> Java SDK.
>>
>> `External.of()` can create external transform for Java SDK. Depending on
>> input and output types, two additional methods are provided:
>> `withMultiOutputs()` which specifies the type of PCollection and
>> `withOutputType()` which specifies the type of output element. Some
>> examples are:
>>
>> PCollection<String> col =
>>     testPipeline
>>         .apply(Create.of("1", "2", "3"))
>>         .apply(External.of(*...*));
>>
>> This is okay without additional methods since 1) input and output types
>> of external transform can be inferred 2) output PCollection is singular.
>>
>
> How does the type/coder at runtime get inferred (doesn't java's type
> erasure get rid of this information)?
>

>
>> PCollectionTuple pTuple =
>>     testPipeline
>>         .apply(Create.of(1, 2, 3, 4, 5, 6))
>>         .apply(
>>             External.of(*...*).withMultiOutputs());
>>
>> This requires `withMultiOutputs()` since output PCollection is
>> PCollectionTuple.
>>
>
> Shouldn't this require a mapping from "output" name to coder/type variable
> to be specified as an argument to withMultiOutputs?
>
>
>> PCollection<String> pCol =
>>     testPipeline
>>         .apply(Create.of("1", "2", "2", "3", "3", "3"))
>>         .apply(
>>             External.of(...)
>>                 .<KV<String, Long>>withOutputType())
>>         .apply(
>>             "toString",
>>             MapElements.into(TypeDescriptors.strings()).via(                
>> x -> String.format("%s->%s", x.getKey(), x.getValue())));
>>
>>  This requires `withOutputType()` since the output element type cannot be
>> inferred from method chaining. I think some users may feel awkward to call
>> method only with the type parameter and empty parenthesis. Without
>> `withOutputType()`, the type of output element will be java.lang.Object
>> which might still be forcefully casted to KV.
>>
>
> How does the output type get preserved in this case (since Java's type
> erasure would remove <KV<String, Long>> after compilation and coder
> inference in my opinion should be broken and or choosing something generic
> like serializable)?
>

The expansion service is responsible for using cross-language compatible
coders in the returning expanded transforms and these are the coders used
in the runtime. Type information annotated by additional methods here is
for compile-time type safety of external transforms.


>
>
Thanks,
>> Heejong
>>
>

Reply via email to