On Mon, Dec 23, 2019 at 12:20 PM Heejong Lee <[email protected]> wrote:

>
>
> On Fri, Dec 20, 2019 at 11:38 AM Luke Cwik <[email protected]> wrote:
>
>> What do side inputs look like?
>>
>
> A user needs to first pass PCollections for side inputs into the external
> transform in addition to ordinary input PCollections and define
> PCollectionViews inside the external transform something like:
>
> PCollectionTuple pTuple =
>     PCollectionTuple.of("main1", main1)
>         .and("main2", main2)
>         .and("side", side)
>         .apply(External.of(...).withMultiOutputs());
>
> public static class TestTransform extends PTransform<PCollectionTuple, 
> PCollectionTuple> {
>   @Override
>   public PCollectionTuple expand(PCollectionTuple input) {
>     PCollectionView<String> sideView = 
> input.<String>get("side").apply(View.asSingleton());
>     PCollection<String> main =
>         PCollectionList.<String>of(input.get("main1"))
>             .and(input.get("main2"))
>             .apply(Flatten.pCollections())
>             .apply(
>                 ParDo.of(
>                         new DoFn<String, String>() {
>                           @ProcessElement
>                           public void processElement(
>                               @Element String x,
>                               OutputReceiver<String> out,
>                               DoFn<String, String>.ProcessContext c) {
>                             out.output(x + c.sideInput(sideView));
>                           }
>                         })
>                     .withSideInputs(sideView));
>
>
>
>> On Thu, Dec 19, 2019 at 4:39 PM Heejong Lee <[email protected]> wrote:
>>
>>> I wanted to know if anybody has any comment on external transform API
>>> for Java SDK.
>>>
>>> `External.of()` can create external transform for Java SDK. Depending on
>>> input and output types, two additional methods are provided:
>>> `withMultiOutputs()` which specifies the type of PCollection and
>>> `withOutputType()` which specifies the type of output element. Some
>>> examples are:
>>>
>>> PCollection<String> col =
>>>     testPipeline
>>>         .apply(Create.of("1", "2", "3"))
>>>         .apply(External.of(*...*));
>>>
>>> This is okay without additional methods since 1) input and output types
>>> of external transform can be inferred 2) output PCollection is singular.
>>>
>>
>> How does the type/coder at runtime get inferred (doesn't java's type
>> erasure get rid of this information)?
>>
>
>>
>>> PCollectionTuple pTuple =
>>>     testPipeline
>>>         .apply(Create.of(1, 2, 3, 4, 5, 6))
>>>         .apply(
>>>             External.of(*...*).withMultiOutputs());
>>>
>>> This requires `withMultiOutputs()` since output PCollection is
>>> PCollectionTuple.
>>>
>>
>> Shouldn't this require a mapping from "output" name to coder/type
>> variable to be specified as an argument to withMultiOutputs?
>>
>>
>>> PCollection<String> pCol =
>>>     testPipeline
>>>         .apply(Create.of("1", "2", "2", "3", "3", "3"))
>>>         .apply(
>>>             External.of(...)
>>>                 .<KV<String, Long>>withOutputType())
>>>         .apply(
>>>             "toString",
>>>             MapElements.into(TypeDescriptors.strings()).via(                
>>> x -> String.format("%s->%s", x.getKey(), x.getValue())));
>>>
>>>  This requires `withOutputType()` since the output element type cannot
>>> be inferred from method chaining. I think some users may feel awkward to
>>> call method only with the type parameter and empty parenthesis. Without
>>> `withOutputType()`, the type of output element will be java.lang.Object
>>> which might still be forcefully casted to KV.
>>>
>>
>> How does the output type get preserved in this case (since Java's type
>> erasure would remove <KV<String, Long>> after compilation and coder
>> inference in my opinion should be broken and or choosing something generic
>> like serializable)?
>>
>
> The expansion service is responsible for using cross-language compatible
> coders in the returning expanded transforms and these are the coders used
> in the runtime. Type information annotated by additional methods here is
> for compile-time type safety of external transforms.
>

Note that *.<KV<String, Long>>withOutputType()* could be changed to
*.<String>withOutputType()* and we would get a *PCollection<String>* since
*withOutputType* doesn't actually do anything at runtime and is just to
make types align during compilation.

Is there a way to ensure that the output type is actually compatible with
the coder that was returned after expansion (this would likely require you
to pass in typing information into *withOutputType*, see
TypeDescriptors[1])?

1:
https://github.com/apache/beam/blob/4c18cb4ada2650552a0006dfffd68d0775dd76c6/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java


>>
> Thanks,
>>> Heejong
>>>
>>

Reply via email to