On Mon, Dec 23, 2019 at 12:20 PM Heejong Lee <[email protected]> wrote:
> > > On Fri, Dec 20, 2019 at 11:38 AM Luke Cwik <[email protected]> wrote: > >> What do side inputs look like? >> > > A user needs to first pass PCollections for side inputs into the external > transform in addition to ordinary input PCollections and define > PCollectionViews inside the external transform something like: > > PCollectionTuple pTuple = > PCollectionTuple.of("main1", main1) > .and("main2", main2) > .and("side", side) > .apply(External.of(...).withMultiOutputs()); > > public static class TestTransform extends PTransform<PCollectionTuple, > PCollectionTuple> { > @Override > public PCollectionTuple expand(PCollectionTuple input) { > PCollectionView<String> sideView = > input.<String>get("side").apply(View.asSingleton()); > PCollection<String> main = > PCollectionList.<String>of(input.get("main1")) > .and(input.get("main2")) > .apply(Flatten.pCollections()) > .apply( > ParDo.of( > new DoFn<String, String>() { > @ProcessElement > public void processElement( > @Element String x, > OutputReceiver<String> out, > DoFn<String, String>.ProcessContext c) { > out.output(x + c.sideInput(sideView)); > } > }) > .withSideInputs(sideView)); > > > >> On Thu, Dec 19, 2019 at 4:39 PM Heejong Lee <[email protected]> wrote: >> >>> I wanted to know if anybody has any comment on external transform API >>> for Java SDK. >>> >>> `External.of()` can create external transform for Java SDK. Depending on >>> input and output types, two additional methods are provided: >>> `withMultiOutputs()` which specifies the type of PCollection and >>> `withOutputType()` which specifies the type of output element. Some >>> examples are: >>> >>> PCollection<String> col = >>> testPipeline >>> .apply(Create.of("1", "2", "3")) >>> .apply(External.of(*...*)); >>> >>> This is okay without additional methods since 1) input and output types >>> of external transform can be inferred 2) output PCollection is singular. >>> >> >> How does the type/coder at runtime get inferred (doesn't java's type >> erasure get rid of this information)? >> > >> >>> PCollectionTuple pTuple = >>> testPipeline >>> .apply(Create.of(1, 2, 3, 4, 5, 6)) >>> .apply( >>> External.of(*...*).withMultiOutputs()); >>> >>> This requires `withMultiOutputs()` since output PCollection is >>> PCollectionTuple. >>> >> >> Shouldn't this require a mapping from "output" name to coder/type >> variable to be specified as an argument to withMultiOutputs? >> >> >>> PCollection<String> pCol = >>> testPipeline >>> .apply(Create.of("1", "2", "2", "3", "3", "3")) >>> .apply( >>> External.of(...) >>> .<KV<String, Long>>withOutputType()) >>> .apply( >>> "toString", >>> MapElements.into(TypeDescriptors.strings()).via( >>> x -> String.format("%s->%s", x.getKey(), x.getValue()))); >>> >>> This requires `withOutputType()` since the output element type cannot >>> be inferred from method chaining. I think some users may feel awkward to >>> call method only with the type parameter and empty parenthesis. Without >>> `withOutputType()`, the type of output element will be java.lang.Object >>> which might still be forcefully casted to KV. >>> >> >> How does the output type get preserved in this case (since Java's type >> erasure would remove <KV<String, Long>> after compilation and coder >> inference in my opinion should be broken and or choosing something generic >> like serializable)? >> > > The expansion service is responsible for using cross-language compatible > coders in the returning expanded transforms and these are the coders used > in the runtime. Type information annotated by additional methods here is > for compile-time type safety of external transforms. > Note that *.<KV<String, Long>>withOutputType()* could be changed to *.<String>withOutputType()* and we would get a *PCollection<String>* since *withOutputType* doesn't actually do anything at runtime and is just to make types align during compilation. Is there a way to ensure that the output type is actually compatible with the coder that was returned after expansion (this would likely require you to pass in typing information into *withOutputType*, see TypeDescriptors[1])? 1: https://github.com/apache/beam/blob/4c18cb4ada2650552a0006dfffd68d0775dd76c6/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java >> > Thanks, >>> Heejong >>> >>
