On Fri, Dec 20, 2019 at 11:38 AM Luke Cwik <lc...@google.com> wrote: > What do side inputs look like? >
A user needs to first pass PCollections for side inputs into the external transform in addition to ordinary input PCollections and define PCollectionViews inside the external transform something like: PCollectionTuple pTuple = PCollectionTuple.of("main1", main1) .and("main2", main2) .and("side", side) .apply(External.of(...).withMultiOutputs()); public static class TestTransform extends PTransform<PCollectionTuple, PCollectionTuple> { @Override public PCollectionTuple expand(PCollectionTuple input) { PCollectionView<String> sideView = input.<String>get("side").apply(View.asSingleton()); PCollection<String> main = PCollectionList.<String>of(input.get("main1")) .and(input.get("main2")) .apply(Flatten.pCollections()) .apply( ParDo.of( new DoFn<String, String>() { @ProcessElement public void processElement( @Element String x, OutputReceiver<String> out, DoFn<String, String>.ProcessContext c) { out.output(x + c.sideInput(sideView)); } }) .withSideInputs(sideView)); > On Thu, Dec 19, 2019 at 4:39 PM Heejong Lee <heej...@google.com> wrote: > >> I wanted to know if anybody has any comment on external transform API for >> Java SDK. >> >> `External.of()` can create external transform for Java SDK. Depending on >> input and output types, two additional methods are provided: >> `withMultiOutputs()` which specifies the type of PCollection and >> `withOutputType()` which specifies the type of output element. Some >> examples are: >> >> PCollection<String> col = >> testPipeline >> .apply(Create.of("1", "2", "3")) >> .apply(External.of(*...*)); >> >> This is okay without additional methods since 1) input and output types >> of external transform can be inferred 2) output PCollection is singular. >> > > How does the type/coder at runtime get inferred (doesn't java's type > erasure get rid of this information)? > > >> PCollectionTuple pTuple = >> testPipeline >> .apply(Create.of(1, 2, 3, 4, 5, 6)) >> .apply( >> External.of(*...*).withMultiOutputs()); >> >> This requires `withMultiOutputs()` since output PCollection is >> PCollectionTuple. >> > > Shouldn't this require a mapping from "output" name to coder/type variable > to be specified as an argument to withMultiOutputs? > > >> PCollection<String> pCol = >> testPipeline >> .apply(Create.of("1", "2", "2", "3", "3", "3")) >> .apply( >> External.of(...) >> .<KV<String, Long>>withOutputType()) >> .apply( >> "toString", >> MapElements.into(TypeDescriptors.strings()).via( >> x -> String.format("%s->%s", x.getKey(), x.getValue()))); >> >> This requires `withOutputType()` since the output element type cannot be >> inferred from method chaining. I think some users may feel awkward to call >> method only with the type parameter and empty parenthesis. Without >> `withOutputType()`, the type of output element will be java.lang.Object >> which might still be forcefully casted to KV. >> > > How does the output type get preserved in this case (since Java's type > erasure would remove <KV<String, Long>> after compilation and coder > inference in my opinion should be broken and or choosing something generic > like serializable)? > The expansion service is responsible for using cross-language compatible coders in the returning expanded transforms and these are the coders used in the runtime. Type information annotated by additional methods here is for compile-time type safety of external transforms. > > Thanks, >> Heejong >> >