Support side inputs in CombineTranslation
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5bc77fcf Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5bc77fcf Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5bc77fcf Branch: refs/heads/master Commit: 5bc77fcf619dc6f1272d1cd4143b6a09e0cfbda1 Parents: 11368e0 Author: Kenneth Knowles <[email protected]> Authored: Tue Oct 17 11:50:46 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Tue Oct 17 12:45:11 2017 -0700 ---------------------------------------------------------------------- .../runners/core/construction/CombineTranslation.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/5bc77fcf/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java index 21796aa..ff431fc 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java @@ -23,7 +23,6 @@ import static org.apache.beam.runners.core.construction.PTransformTranslation.CO import com.google.auto.service.AutoService; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.protobuf.ByteString; import java.io.IOException; @@ -49,6 +48,7 @@ import org.apache.beam.sdk.util.AppliedCombineFn; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; /** * Methods for translating between {@link Combine.PerKey} {@link PTransform PTransforms} and {@link @@ -170,8 +170,12 @@ public class CombineTranslation { @Override public Map<String, SideInput> getSideInputs() { - // TODO: support side inputs - return ImmutableMap.of(); + Map<String, SideInput> sideInputs = new HashMap<>(); + for (PCollectionView<?> sideInput : combine.getTransform().getSideInputs()) { + sideInputs.put( + sideInput.getTagInternal().getId(), ParDoTranslation.toProto(sideInput)); + } + return sideInputs; } }, components);
