Extract the Main Input PCollection in ParDos
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/888a5e6a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/888a5e6a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/888a5e6a Branch: refs/heads/master Commit: 888a5e6ab9c1c83ee06281de4c906be69c076286 Parents: 10357c2 Author: Thomas Groh <[email protected]> Authored: Fri May 19 14:24:07 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Mon May 22 12:24:45 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/core/construction/ParDos.java | 16 ++++++++++++++++ .../beam/runners/core/construction/ParDosTest.java | 4 ++++ 2 files changed, 20 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/888a5e6a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java index 4752bd1..2ecc041 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java @@ -23,6 +23,8 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; import com.google.common.base.Optional; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.BytesValue; @@ -161,6 +163,20 @@ public class ParDos { return doFnAndMainOutputTagFromProto(payload.getDoFn()).getMainOutputTag(); } + public static RunnerApi.PCollection getMainInput( + RunnerApi.PTransform ptransform, Components components) throws IOException { + checkArgument( + ptransform.getSpec().getUrn().equals(PAR_DO_PAYLOAD_URN), + "Unexpected payload type %s", + ptransform.getSpec().getUrn()); + ParDoPayload payload = ptransform.getSpec().getParameter().unpack(ParDoPayload.class); + String mainInputId = + Iterables.getOnlyElement( + Sets.difference( + ptransform.getInputsMap().keySet(), payload.getSideInputsMap().keySet())); + return components.getPcollectionsOrThrow(ptransform.getInputsOrThrow(mainInputId)); + } + // TODO: Implement private static StateSpec toProto(StateDeclaration state) { throw new UnsupportedOperationException("Not yet supported"); http://git-wip-us.apache.org/repos/asf/beam/blob/888a5e6a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java index 74edec1..b6f0b7d 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java @@ -149,6 +149,10 @@ public class ParDosTest { view.getWindowingStrategyInternal().fixDefaults())); assertThat(restoredView.getCoderInternal(), equalTo(view.getCoderInternal())); } + String mainInputId = components.registerPCollection(mainInput); + assertThat( + ParDos.getMainInput(protoTransform, protoComponents), + equalTo(protoComponents.getPcollectionsOrThrow(mainInputId))); } private static class DropElementsFn extends DoFn<KV<Long, String>, Void> {
