Repository: incubator-beam Updated Branches: refs/heads/master ee6ad2fe4 -> 064f18a8f
Make DoFnInfo carry OldDoFn or DoFn This will allow consumers to prepare to accept DoFn while still accepting existing jobs that use OldDoFn. It is a move towards treating the Fn itself as just a serialized blob. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/73db5608 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/73db5608 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/73db5608 Branch: refs/heads/master Commit: 73db5608a58ff64a0b452140736a150f973986b8 Parents: 95bf7a8 Author: Kenneth Knowles <[email protected]> Authored: Mon Oct 24 15:11:12 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu Oct 27 10:48:34 2016 -0700 ---------------------------------------------------------------------- .../beam/runners/dataflow/util/DoFnInfo.java | 43 ++++++++++++++++++-- 1 file changed, 39 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/73db5608/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java index b211c04..bfa12e2 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java @@ -17,29 +17,38 @@ */ package org.apache.beam.runners.dataflow.util; +import static com.google.common.base.Preconditions.checkState; + import java.io.Serializable; import java.util.Map; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; /** - * Wrapper class holding the necessary information to serialize a {@link OldDoFn}. + * Wrapper class holding the necessary information to serialize a {@link OldDoFn} + * or {@link DoFn}. * * @param <InputT> the type of the (main) input elements of the {@link OldDoFn} * @param <OutputT> the type of the (main) output elements of the {@link OldDoFn} */ public class DoFnInfo<InputT, OutputT> implements Serializable { - private final OldDoFn<InputT, OutputT> doFn; + private final Serializable doFn; private final WindowingStrategy<?, ?> windowingStrategy; private final Iterable<PCollectionView<?>> sideInputViews; private final Coder<InputT> inputCoder; private final long mainOutput; private final Map<Long, TupleTag<?>> outputMap; - public DoFnInfo(OldDoFn<InputT, OutputT> doFn, + /** + * Creates a {@link DoFnInfo} for the given {@link DoFn} or {@link OldDoFn} and auxiliary bits and + * pieces. + */ + public DoFnInfo( + Serializable doFn, WindowingStrategy<?, ?> windowingStrategy, Iterable<PCollectionView<?>> sideInputViews, Coder<InputT> inputCoder, @@ -53,10 +62,36 @@ public class DoFnInfo<InputT, OutputT> implements Serializable { this.outputMap = outputMap; } - public OldDoFn<InputT, OutputT> getDoFn() { + /** + * @deprecated call the constructor with a {@link Serializable} + */ + @Deprecated + public DoFnInfo( + OldDoFn doFn, + WindowingStrategy<?, ?> windowingStrategy, + Iterable<PCollectionView<?>> sideInputViews, + Coder<InputT> inputCoder, + long mainOutput, + Map<Long, TupleTag<?>> outputMap) { + this((Serializable) doFn, windowingStrategy, sideInputViews, inputCoder, mainOutput, outputMap); + } + + /** Returns the embedded serialized function. It may be a {@code DoFn} or {@code OldDoFn}. */ + public Serializable getFn() { return doFn; } + /** @deprecated use {@link #getFn()} */ + @Deprecated + public OldDoFn getDoFn() { + checkState( + doFn instanceof OldDoFn, + "Deprecated %s.getDoFn() called when the payload was actually a new %s", + DoFnInfo.class.getSimpleName(), + DoFn.class.getSimpleName()); + return (OldDoFn) doFn; + } + public WindowingStrategy<?, ?> getWindowingStrategy() { return windowingStrategy; }
