http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java index 2696020..ed9ec10 100644 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java @@ -25,8 +25,8 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; @@ -58,15 +58,15 @@ import java.util.Set; */ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<InputT, OutputT> { - /** The DoFn being run. */ - public final DoFn<InputT, OutputT> fn; + /** The OldDoFn being run. */ + public final OldDoFn<InputT, OutputT> fn; - /** The context used for running the DoFn. */ + /** The context used for running the OldDoFn. */ public final DoFnContext<InputT, OutputT> context; protected DoFnRunnerBase( PipelineOptions options, - DoFn<InputT, OutputT> fn, + OldDoFn<InputT, OutputT> fn, SideInputReader sideInputReader, OutputManager outputManager, TupleTag<OutputT> mainOutputTag, @@ -145,7 +145,7 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu } /** - * Invokes {@link DoFn#processElement} after certain pre-processings has been done in + * Invokes {@link OldDoFn#processElement} after certain pre-processings has been done in * {@link DoFnRunnerBase#processElement}. */ protected abstract void invokeProcessElement(WindowedValue<InputT> elem); @@ -162,17 +162,17 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu } /** - * A concrete implementation of {@code DoFn.Context} used for running a {@link DoFn}. + * A concrete implementation of {@code OldDoFn.Context} used for running a {@link OldDoFn}. * - * @param <InputT> the type of the DoFn's (main) input elements - * @param <OutputT> the type of the DoFn's (main) output elements + * @param <InputT> the type of the OldDoFn's (main) input elements + * @param <OutputT> the type of the OldDoFn's (main) output elements */ private static class DoFnContext<InputT, OutputT> - extends DoFn<InputT, OutputT>.Context { + extends OldDoFn<InputT, OutputT>.Context { private static final int MAX_SIDE_OUTPUTS = 1000; final PipelineOptions options; - final DoFn<InputT, OutputT> fn; + final OldDoFn<InputT, OutputT> fn; final SideInputReader sideInputReader; final OutputManager outputManager; final TupleTag<OutputT> mainOutputTag; @@ -187,7 +187,7 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu private Set<TupleTag<?>> outputTags; public DoFnContext(PipelineOptions options, - DoFn<InputT, OutputT> fn, + OldDoFn<InputT, OutputT> fn, SideInputReader sideInputReader, OutputManager outputManager, TupleTag<OutputT> mainOutputTag, @@ -317,8 +317,8 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu } // Following implementations of output, outputWithTimestamp, and sideOutput - // are only accessible in DoFn.startBundle and DoFn.finishBundle, and will be shadowed by - // ProcessContext's versions in DoFn.processElement. + // are only accessible in OldDoFn.startBundle and OldDoFn.finishBundle, and will be shadowed by + // ProcessContext's versions in OldDoFn.processElement. @Override public void output(OutputT output) { outputWindowedValue(output, null, null, PaneInfo.NO_FIRING); @@ -350,9 +350,10 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu } /** - * Returns a new {@code DoFn.ProcessContext} for the given element. + * Returns a new {@code OldDoFn.ProcessContext} for the given element. */ - protected DoFn<InputT, OutputT>.ProcessContext createProcessContext(WindowedValue<InputT> elem) { + protected OldDoFn<InputT, OutputT>.ProcessContext createProcessContext( + WindowedValue<InputT> elem) { return new DoFnProcessContext<InputT, OutputT>(fn, context, elem); } @@ -365,21 +366,21 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu } /** - * A concrete implementation of {@code DoFn.ProcessContext} used for - * running a {@link DoFn} over a single element. + * A concrete implementation of {@code OldDoFn.ProcessContext} used for + * running a {@link OldDoFn} over a single element. * - * @param <InputT> the type of the DoFn's (main) input elements - * @param <OutputT> the type of the DoFn's (main) output elements + * @param <InputT> the type of the OldDoFn's (main) input elements + * @param <OutputT> the type of the OldDoFn's (main) output elements */ static class DoFnProcessContext<InputT, OutputT> - extends DoFn<InputT, OutputT>.ProcessContext { + extends OldDoFn<InputT, OutputT>.ProcessContext { - final DoFn<InputT, OutputT> fn; + final OldDoFn<InputT, OutputT> fn; final DoFnContext<InputT, OutputT> context; final WindowedValue<InputT> windowedValue; - public DoFnProcessContext(DoFn<InputT, OutputT> fn, + public DoFnProcessContext(OldDoFn<InputT, OutputT> fn, DoFnContext<InputT, OutputT> context, WindowedValue<InputT> windowedValue) { fn.super(); @@ -426,7 +427,8 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu public BoundedWindow window() { if (!(fn instanceof RequiresWindowAccess)) { throw new UnsupportedOperationException( - "window() is only available in the context of a DoFn marked as RequiresWindowAccess."); + "window() is only available in the context of a OldDoFn marked as" + + "RequiresWindowAccess."); } return Iterables.getOnlyElement(windows()); } @@ -484,7 +486,7 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu throw new IllegalArgumentException(String.format( "Cannot output with timestamp %s. Output timestamps must be no earlier than the " + "timestamp of the current input (%s) minus the allowed skew (%s). See the " - + "DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.", + + "OldDoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.", timestamp, windowedValue.getTimestamp(), PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()))); }
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java index cb96da2..a9f3cf4 100644 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java @@ -19,7 +19,7 @@ package org.apache.beam.sdk.util; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.DoFnRunner.ReduceFnExecutor; import org.apache.beam.sdk.util.ExecutionContext.StepContext; @@ -27,6 +27,7 @@ import org.apache.beam.sdk.util.common.CounterSet; import org.apache.beam.sdk.util.common.CounterSet.AddCounterMutator; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; + import java.util.List; /** @@ -44,13 +45,13 @@ public class DoFnRunners { } /** - * Returns a basic implementation of {@link DoFnRunner} that works for most {@link DoFn DoFns}. + * Returns a basic implementation of {@link DoFnRunner} that works for most {@link OldDoFn DoFns}. * - * <p>It invokes {@link DoFn#processElement} for each input. + * <p>It invokes {@link OldDoFn#processElement} for each input. */ public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner( PipelineOptions options, - DoFn<InputT, OutputT> fn, + OldDoFn<InputT, OutputT> fn, SideInputReader sideInputReader, OutputManager outputManager, TupleTag<OutputT> mainOutputTag, @@ -71,13 +72,14 @@ public class DoFnRunners { } /** - * Returns a basic implementation of {@link DoFnRunner} that works for most {@link DoFn DoFns}. + * Returns a basic implementation of {@link DoFnRunner} that works for most + * {@link OldDoFn OldDoFns}. * - * <p>It invokes {@link DoFn#processElement} for each input. + * <p>It invokes {@link OldDoFn#processElement} for each input. */ public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner( PipelineOptions options, - DoFn<InputT, OutputT> fn, + OldDoFn<InputT, OutputT> fn, SideInputReader sideInputReader, OutputManager outputManager, TupleTag<OutputT> mainOutputTag, @@ -99,7 +101,7 @@ public class DoFnRunners { /** * Returns an implementation of {@link DoFnRunner} that handles late data dropping. * - * <p>It drops elements from expired windows before they reach the underlying {@link DoFn}. + * <p>It drops elements from expired windows before they reach the underlying {@link OldDoFn}. */ public static <K, InputT, OutputT, W extends BoundedWindow> DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> lateDataDroppingRunner( @@ -133,7 +135,7 @@ public class DoFnRunners { /** * Returns an implementation of {@link DoFnRunner} that handles late data dropping. * - * <p>It drops elements from expired windows before they reach the underlying {@link DoFn}. + * <p>It drops elements from expired windows before they reach the underlying {@link OldDoFn}. */ public static <K, InputT, OutputT, W extends BoundedWindow> DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> lateDataDroppingRunner( @@ -160,7 +162,7 @@ public class DoFnRunners { public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault( PipelineOptions options, - DoFn<InputT, OutputT> doFn, + OldDoFn<InputT, OutputT> doFn, SideInputReader sideInputReader, OutputManager outputManager, TupleTag<OutputT> mainOutputTag, @@ -198,7 +200,7 @@ public class DoFnRunners { public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault( PipelineOptions options, - DoFn<InputT, OutputT> doFn, + OldDoFn<InputT, OutputT> doFn, SideInputReader sideInputReader, OutputManager outputManager, TupleTag<OutputT> mainOutputTag, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java index b575559..f82e5df 100644 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java @@ -19,14 +19,14 @@ package org.apache.beam.sdk.util; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.values.KV; /** - * DoFn that merges windows and groups elements in those windows, optionally + * OldDoFn that merges windows and groups elements in those windows, optionally * combining values. * * @param <K> key type @@ -36,7 +36,7 @@ import org.apache.beam.sdk.values.KV; */ @SystemDoFnInternal public abstract class GroupAlsoByWindowsDoFn<K, InputT, OutputT, W extends BoundedWindow> - extends DoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> { + extends OldDoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> { public static final String DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER = "DroppedDueToClosedWindow"; public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness"; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java index d185a24..f872ffc 100644 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java @@ -17,7 +17,7 @@ */ package org.apache.beam.sdk.util; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateInternalsFactory; @@ -52,7 +52,7 @@ public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends @Override public void processElement( - DoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>>.ProcessContext c) + OldDoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>>.ProcessContext c) throws Exception { K key = c.element().getKey(); // Used with Batch, we know that all the data is available for this key. We can't use the http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java index 8a0152e..f0f9007 100644 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java @@ -22,8 +22,8 @@ import static com.google.common.base.Preconditions.checkArgument; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -138,7 +138,9 @@ public class GroupByKeyViaGroupByKeyOnly<K, V> return input .apply( ParDo.of( - new DoFn<KV<K, Iterable<WindowedValue<V>>>, KV<K, Iterable<WindowedValue<V>>>>() { + new OldDoFn< + KV<K, Iterable<WindowedValue<V>>>, + KV<K, Iterable<WindowedValue<V>>>>() { @Override public void processElement(ProcessContext c) { KV<K, Iterable<WindowedValue<V>>> kvs = c.element(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.java index 4815162..8b3ba24 100644 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.java @@ -18,7 +18,7 @@ package org.apache.beam.sdk.util; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.KV; @@ -31,7 +31,7 @@ import org.joda.time.Instant; /** * A customized {@link DoFnRunner} that handles late data dropping for - * a {@link KeyedWorkItem} input {@link DoFn}. + * a {@link KeyedWorkItem} input {@link OldDoFn}. * * <p>It expands windows before checking data lateness. * http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java index 812e99a..0c5849e 100644 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java @@ -32,7 +32,6 @@ import org.apache.beam.sdk.util.state.ValueState; import com.google.common.annotations.VisibleForTesting; import org.joda.time.Instant; - import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java index c879409..1fa0830 100644 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java @@ -22,7 +22,7 @@ import static com.google.common.base.Preconditions.checkState; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -181,7 +181,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { * Store the previously emitted pane (if any) for each window. * * <ul> - * <li>State: The previous {@link PaneInfo} passed to the user's {@link DoFn#processElement}, + * <li>State: The previous {@link PaneInfo} passed to the user's {@link OldDoFn#processElement}, * if any. * <li>Style style: DIRECT * <li>Merging: Always keyed by actual window, so does not depend on {@link #activeWindows}. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java index e034638..a0cdb40 100644 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java @@ -19,21 +19,21 @@ package org.apache.beam.sdk.util; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.util.DoFnRunners.OutputManager; import org.apache.beam.sdk.util.ExecutionContext.StepContext; import org.apache.beam.sdk.values.TupleTag; import java.util.List; /** - * Runs a {@link DoFn} by constructing the appropriate contexts and passing them in. + * Runs a {@link OldDoFn} by constructing the appropriate contexts and passing them in. * - * @param <InputT> the type of the DoFn's (main) input elements - * @param <OutputT> the type of the DoFn's (main) output elements + * @param <InputT> the type of the OldDoFn's (main) input elements + * @param <OutputT> the type of the OldDoFn's (main) output elements */ public class SimpleDoFnRunner<InputT, OutputT> extends DoFnRunnerBase<InputT, OutputT>{ - protected SimpleDoFnRunner(PipelineOptions options, DoFn<InputT, OutputT> fn, + protected SimpleDoFnRunner(PipelineOptions options, OldDoFn<InputT, OutputT> fn, SideInputReader sideInputReader, OutputManager outputManager, TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, StepContext stepContext, @@ -44,7 +44,7 @@ public class SimpleDoFnRunner<InputT, OutputT> extends DoFnRunnerBase<InputT, Ou @Override protected void invokeProcessElement(WindowedValue<InputT> elem) { - final DoFn<InputT, OutputT>.ProcessContext processContext = createProcessContext(elem); + final OldDoFn<InputT, OutputT>.ProcessContext processContext = createProcessContext(elem); // This can contain user code. Wrap it in case it throws an exception. try { fn.processElement(processContext); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java index 985f210..5c17009 100644 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java @@ -37,7 +37,6 @@ import org.joda.time.Duration; import org.joda.time.Instant; import java.io.Serializable; - import javax.annotation.Nullable; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java index dc2413a..8d604cb 100644 --- a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java @@ -21,6 +21,7 @@ import static org.apache.beam.sdk.WindowMatchers.isSingleWindowedValue; import static org.apache.beam.sdk.WindowMatchers.isWindowedValue; import static com.google.common.base.Preconditions.checkArgument; + import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.emptyIterable; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java index e0ff879..feba191 100644 --- a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.util; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/test/java/org/apache/beam/sdk/util/SimpleDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/SimpleDoFnRunnerTest.java index fb74fc6..f0c52b9 100644 --- a/runners/core-java/src/test/java/org/apache/beam/sdk/util/SimpleDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/sdk/util/SimpleDoFnRunnerTest.java @@ -20,7 +20,7 @@ package org.apache.beam.sdk.util; import static org.hamcrest.Matchers.is; import static org.mockito.Mockito.mock; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.util.BaseExecutionContext.StepContext; import org.apache.beam.sdk.values.TupleTag; @@ -62,7 +62,7 @@ public class SimpleDoFnRunnerTest { runner.processElement(WindowedValue.valueInGlobalWindow("anyValue")); } - private DoFnRunner<String, String> createRunner(DoFn<String, String> fn) { + private DoFnRunner<String, String> createRunner(OldDoFn<String, String> fn) { // Pass in only necessary parameters for the test List<TupleTag<?>> sideOutputTags = Arrays.asList(); StepContext context = mock(StepContext.class); @@ -70,7 +70,7 @@ public class SimpleDoFnRunnerTest { null, fn, null, null, null, sideOutputTags, context, null, null); } - static class ThrowingDoFn extends DoFn<String, String> { + static class ThrowingDoFn extends OldDoFn<String, String> { final Exception exceptionToThrow = new UnsupportedOperationException("Expected exception"); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java index 477da30..e052226 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java @@ -23,7 +23,7 @@ import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly; @@ -106,7 +106,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { StateInternals<K> stateInternals = (StateInternals<K>) stepContext.stateInternals(); - DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> gabwDoFn = + OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> gabwDoFn = GroupAlsoByWindowViaWindowSetDoFn.create( windowingStrategy, new ConstantStateInternalsFactory<K>(stateInternals), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java index dcbe3d1..8be12fd 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java @@ -23,7 +23,7 @@ import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.util.IllegalMutationException; import org.apache.beam.sdk.util.MutationDetector; import org.apache.beam.sdk.util.MutationDetectors; @@ -42,7 +42,7 @@ import org.joda.time.Instant; * elements added to the bundle will be encoded by the {@link Coder} of the underlying * {@link PCollection}. * - * <p>This catches errors during the execution of a {@link DoFn} caused by modifying an element + * <p>This catches errors during the execution of a {@link OldDoFn} caused by modifying an element * after it is added to an output {@link PCollection}. */ class ImmutabilityCheckingBundleFactory implements BundleFactory { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java index dd1cf37..6ef0ffe 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java @@ -21,7 +21,7 @@ import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.util.DoFnRunner; import org.apache.beam.sdk.util.DoFnRunners; import org.apache.beam.sdk.util.DoFnRunners.OutputManager; @@ -48,7 +48,7 @@ class ParDoEvaluator<T> implements TransformEvaluator<T> { DirectStepContext stepContext, CommittedBundle<InputT> inputBundle, AppliedPTransform<PCollection<InputT>, ?, ?> application, - DoFn<InputT, OutputT> fn, + OldDoFn<InputT, OutputT> fn, List<PCollectionView<?>> sideInputs, TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java index eda3db4..ce770ca 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java @@ -20,7 +20,7 @@ package org.apache.beam.runners.direct; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo.BoundMulti; import org.apache.beam.sdk.values.PCollection; @@ -38,7 +38,7 @@ import java.util.Map; * {@link BoundMulti} primitive {@link PTransform}. */ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory { - private final LoadingCache<AppliedPTransform<?, ?, BoundMulti<?, ?>>, ThreadLocal<DoFn<?, ?>>> + private final LoadingCache<AppliedPTransform<?, ?, BoundMulti<?, ?>>, ThreadLocal<OldDoFn<?, ?>>> fnClones; public ParDoMultiEvaluatorFactory() { @@ -46,9 +46,10 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory { CacheBuilder.newBuilder() .build( new CacheLoader< - AppliedPTransform<?, ?, BoundMulti<?, ?>>, ThreadLocal<DoFn<?, ?>>>() { + AppliedPTransform<?, ?, BoundMulti<?, ?>>, ThreadLocal<OldDoFn<?, ?>>>() { @Override - public ThreadLocal<DoFn<?, ?>> load(AppliedPTransform<?, ?, BoundMulti<?, ?>> key) + public ThreadLocal<OldDoFn<?, ?>> load( + AppliedPTransform<?, ?, BoundMulti<?, ?>> key) throws Exception { @SuppressWarnings({"unchecked", "rawtypes"}) ThreadLocal threadLocal = @@ -76,7 +77,7 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory { Map<TupleTag<?>, PCollection<?>> outputs = application.getOutput().getAll(); @SuppressWarnings({"unchecked", "rawtypes"}) - ThreadLocal<DoFn<InT, OuT>> fnLocal = + ThreadLocal<OldDoFn<InT, OuT>> fnLocal = (ThreadLocal) fnClones.getUnchecked((AppliedPTransform) application); String stepName = evaluationContext.getStepName(application); DirectStepContext stepContext = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java index 044abdc..53af6af 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java @@ -20,7 +20,7 @@ package org.apache.beam.runners.direct; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo.Bound; import org.apache.beam.sdk.values.PCollection; @@ -38,16 +38,17 @@ import java.util.Collections; * {@link Bound ParDo.Bound} primitive {@link PTransform}. */ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory { - private final LoadingCache<AppliedPTransform<?, ?, Bound<?, ?>>, ThreadLocal<DoFn<?, ?>>> + private final LoadingCache<AppliedPTransform<?, ?, Bound<?, ?>>, ThreadLocal<OldDoFn<?, ?>>> fnClones; public ParDoSingleEvaluatorFactory() { fnClones = CacheBuilder.newBuilder() .build( - new CacheLoader<AppliedPTransform<?, ?, Bound<?, ?>>, ThreadLocal<DoFn<?, ?>>>() { + new CacheLoader< + AppliedPTransform<?, ?, Bound<?, ?>>, ThreadLocal<OldDoFn<?, ?>>>() { @Override - public ThreadLocal<DoFn<?, ?>> load(AppliedPTransform<?, ?, Bound<?, ?>> key) + public ThreadLocal<OldDoFn<?, ?>> load(AppliedPTransform<?, ?, Bound<?, ?>> key) throws Exception { @SuppressWarnings({"unchecked", "rawtypes"}) ThreadLocal threadLocal = @@ -80,7 +81,7 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory { .getOrCreateStepContext(stepName, stepName); @SuppressWarnings({"unchecked", "rawtypes"}) - ThreadLocal<DoFn<InputT, OutputT>> fnLocal = + ThreadLocal<OldDoFn<InputT, OutputT>> fnLocal = (ThreadLocal) fnClones.getUnchecked((AppliedPTransform) application); try { ParDoEvaluator<InputT> parDoEvaluator = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java index 7fac1e3..d021b43 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java @@ -21,7 +21,7 @@ import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import javax.annotation.Nullable; @@ -38,8 +38,8 @@ public interface TransformEvaluatorFactory { * Create a new {@link TransformEvaluator} for the application of the {@link PTransform}. * * <p>Any work that must be done before input elements are processed (such as calling - * {@link DoFn#startBundle(DoFn.Context)}) must be done before the {@link TransformEvaluator} is - * made available to the caller. + * {@link OldDoFn#startBundle(OldDoFn.Context)}) must be done before the + * {@link TransformEvaluator} is made available to the caller. * * <p>May return null if the application cannot produce an evaluator (for example, it is a * {@link Read} {@link PTransform} where all evaluators are in-use). http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java index d6ee6ea..cee4001 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java @@ -23,9 +23,9 @@ import static com.google.common.base.Preconditions.checkArgument; import org.apache.beam.sdk.io.Write; import org.apache.beam.sdk.io.Write.Bound; import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Values; @@ -101,7 +101,7 @@ class WriteWithShardingFactory implements PTransformOverrideFactory { } @VisibleForTesting - static class KeyBasedOnCountFn<T> extends DoFn<T, KV<Integer, T>> { + static class KeyBasedOnCountFn<T> extends OldDoFn<T, KV<Integer, T>> { @VisibleForTesting static final int MIN_SHARDS_FOR_LOG = 3; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java index 353eef6..529316c 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java @@ -24,8 +24,8 @@ import org.apache.beam.sdk.io.CountingInput; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; @@ -62,9 +62,9 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable { p.apply("listCreate", Create.of("foo", "bar")) .apply( ParDo.of( - new DoFn<String, String>() { + new OldDoFn<String, String>() { @Override - public void processElement(DoFn<String, String>.ProcessContext c) + public void processElement(OldDoFn<String, String>.ProcessContext c) throws Exception { c.output(Integer.toString(c.element().length())); } @@ -109,9 +109,9 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable { PCollection<String> transformed = created.apply( ParDo.of( - new DoFn<String, String>() { + new OldDoFn<String, String>() { @Override - public void processElement(DoFn<String, String>.ProcessContext c) + public void processElement(OldDoFn<String, String>.ProcessContext c) throws Exception { c.output(Integer.toString(c.element().length())); } @@ -140,9 +140,9 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable { PCollection<String> transformed = created.apply( ParDo.of( - new DoFn<String, String>() { + new OldDoFn<String, String>() { @Override - public void processElement(DoFn<String, String>.ProcessContext c) + public void processElement(OldDoFn<String, String>.ProcessContext c) throws Exception { c.output(Integer.toString(c.element().length())); } @@ -157,9 +157,9 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable { p.apply(Create.of("1", "2", "3")) .apply( ParDo.of( - new DoFn<String, String>() { + new OldDoFn<String, String>() { @Override - public void processElement(DoFn<String, String>.ProcessContext c) + public void processElement(OldDoFn<String, String>.ProcessContext c) throws Exception { c.output(Integer.toString(c.element().length())); } @@ -182,9 +182,9 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable { PCollection<String> transformed = created.apply( ParDo.of( - new DoFn<String, String>() { + new OldDoFn<String, String>() { @Override - public void processElement(DoFn<String, String>.ProcessContext c) + public void processElement(OldDoFn<String, String>.ProcessContext c) throws Exception { c.output(Integer.toString(c.element().length())); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java index 09707bd..29dea32 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java @@ -32,9 +32,9 @@ import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SimpleFunction; @@ -159,7 +159,7 @@ public class DirectRunnerTest implements Serializable { @Test public void transformDisplayDataExceptionShouldFail() { - DoFn<Integer, Integer> brokenDoFn = new DoFn<Integer, Integer>() { + OldDoFn<Integer, Integer> brokenDoFn = new OldDoFn<Integer, Integer>() { @Override public void processElement(ProcessContext c) throws Exception {} @@ -211,7 +211,7 @@ public class DirectRunnerTest implements Serializable { /** - * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the + * Tests that a {@link OldDoFn} that mutates an output with a good equals() fails in the * {@link DirectRunner}. */ @Test @@ -220,7 +220,7 @@ public class DirectRunnerTest implements Serializable { pipeline .apply(Create.of(42)) - .apply(ParDo.of(new DoFn<Integer, List<Integer>>() { + .apply(ParDo.of(new OldDoFn<Integer, List<Integer>>() { @Override public void processElement(ProcessContext c) { List<Integer> outputList = Arrays.asList(1, 2, 3, 4); c.output(outputList); @@ -236,7 +236,7 @@ public class DirectRunnerTest implements Serializable { } /** - * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the + * Tests that a {@link OldDoFn} that mutates an output with a good equals() fails in the * {@link DirectRunner}. */ @Test @@ -245,7 +245,7 @@ public class DirectRunnerTest implements Serializable { pipeline .apply(Create.of(42)) - .apply(ParDo.of(new DoFn<Integer, List<Integer>>() { + .apply(ParDo.of(new OldDoFn<Integer, List<Integer>>() { @Override public void processElement(ProcessContext c) { List<Integer> outputList = Arrays.asList(1, 2, 3, 4); c.output(outputList); @@ -260,7 +260,7 @@ public class DirectRunnerTest implements Serializable { } /** - * Tests that a {@link DoFn} that mutates an output with a bad equals() still fails + * Tests that a {@link OldDoFn} that mutates an output with a bad equals() still fails * in the {@link DirectRunner}. */ @Test @@ -269,7 +269,7 @@ public class DirectRunnerTest implements Serializable { pipeline .apply(Create.of(42)) - .apply(ParDo.of(new DoFn<Integer, byte[]>() { + .apply(ParDo.of(new OldDoFn<Integer, byte[]>() { @Override public void processElement(ProcessContext c) { byte[] outputArray = new byte[]{0x1, 0x2, 0x3}; c.output(outputArray); @@ -285,7 +285,7 @@ public class DirectRunnerTest implements Serializable { } /** - * Tests that a {@link DoFn} that mutates its input with a good equals() fails in the + * Tests that a {@link OldDoFn} that mutates its input with a good equals() fails in the * {@link DirectRunner}. */ @Test @@ -295,7 +295,7 @@ public class DirectRunnerTest implements Serializable { pipeline .apply(Create.of(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6)) .withCoder(ListCoder.of(VarIntCoder.of()))) - .apply(ParDo.of(new DoFn<List<Integer>, Integer>() { + .apply(ParDo.of(new OldDoFn<List<Integer>, Integer>() { @Override public void processElement(ProcessContext c) { List<Integer> inputList = c.element(); inputList.set(0, 37); @@ -310,7 +310,7 @@ public class DirectRunnerTest implements Serializable { } /** - * Tests that a {@link DoFn} that mutates an input with a bad equals() still fails + * Tests that a {@link OldDoFn} that mutates an input with a bad equals() still fails * in the {@link DirectRunner}. */ @Test @@ -319,7 +319,7 @@ public class DirectRunnerTest implements Serializable { pipeline .apply(Create.of(new byte[]{0x1, 0x2, 0x3}, new byte[]{0x4, 0x5, 0x6})) - .apply(ParDo.of(new DoFn<byte[], Integer>() { + .apply(ParDo.of(new OldDoFn<byte[], Integer>() { @Override public void processElement(ProcessContext c) { byte[] inputArray = c.element(); inputArray[0] = 0xa; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java index d40cf93..db934e5 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java @@ -26,7 +26,7 @@ import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -213,9 +213,9 @@ public class ImmutabilityCheckingBundleFactoryTest { CommittedBundle<byte[]> committed = intermediate.commit(Instant.now()); } - private static class IdentityDoFn<T> extends DoFn<T, T> { + private static class IdentityDoFn<T> extends OldDoFn<T, T> { @Override - public void processElement(DoFn<T, T>.ProcessContext c) throws Exception { + public void processElement(OldDoFn<T, T>.ProcessContext c) throws Exception { c.output(c.element()); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java index 890e06d..e1be120 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java @@ -22,7 +22,7 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.IllegalMutationException; import org.apache.beam.sdk.util.WindowedValue; @@ -59,9 +59,9 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable { p.apply(Create.of("foo".getBytes(), "spamhameggs".getBytes())) .apply( ParDo.of( - new DoFn<byte[], byte[]>() { + new OldDoFn<byte[], byte[]>() { @Override - public void processElement(DoFn<byte[], byte[]>.ProcessContext c) + public void processElement(OldDoFn<byte[], byte[]>.ProcessContext c) throws Exception { c.element()[0] = 'b'; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java index aa0d976..9e273ad 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java @@ -28,9 +28,9 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.Keys; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; @@ -180,9 +180,9 @@ public class KeyedPValueTrackingVisitorTest { } } - private static class IdentityFn<K> extends DoFn<K, K> { + private static class IdentityFn<K> extends OldDoFn<K, K> { @Override - public void processElement(DoFn<K, K>.ProcessContext c) throws Exception { + public void processElement(OldDoFn<K, K>.ProcessContext c) throws Exception { c.output(c.element()); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java index 07f478d..3208841 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java @@ -30,7 +30,7 @@ import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -169,7 +169,7 @@ public class ParDoEvaluatorTest { ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, output)); } - private static class RecorderFn extends DoFn<Integer, Integer> { + private static class RecorderFn extends OldDoFn<Integer, Integer> { private Collection<Integer> processed; private final PCollectionView<Integer> view; @@ -179,7 +179,7 @@ public class ParDoEvaluatorTest { } @Override - public void processElement(DoFn<Integer, Integer>.ProcessContext c) throws Exception { + public void processElement(OldDoFn<Integer, Integer>.ProcessContext c) throws Exception { processed.add(c.element()); c.output(c.element() + c.sideInput(view)); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java index c0ab4df..19094cb 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java @@ -31,7 +31,7 @@ import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.ParDo.BoundMulti; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -80,7 +80,7 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { BoundMulti<String, KV<String, Integer>> pardo = ParDo.of( - new DoFn<String, KV<String, Integer>>() { + new OldDoFn<String, KV<String, Integer>>() { @Override public void processElement(ProcessContext c) { c.output(KV.<String, Integer>of(c.element(), c.element().length())); @@ -170,7 +170,7 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { BoundMulti<String, KV<String, Integer>> pardo = ParDo.of( - new DoFn<String, KV<String, Integer>>() { + new OldDoFn<String, KV<String, Integer>>() { @Override public void processElement(ProcessContext c) { c.output(KV.<String, Integer>of(c.element(), c.element().length())); @@ -254,7 +254,7 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE); BoundMulti<String, KV<String, Integer>> pardo = ParDo.of( - new DoFn<String, KV<String, Integer>>() { + new OldDoFn<String, KV<String, Integer>>() { @Override public void processElement(ProcessContext c) { c.windowingInternals() @@ -354,7 +354,7 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { BoundMulti<String, KV<String, Integer>> pardo = ParDo.of( - new DoFn<String, KV<String, Integer>>() { + new OldDoFn<String, KV<String, Integer>>() { @Override public void processElement(ProcessContext c) { c.windowingInternals().stateInternals(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java index d778da6..a4fd570 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java @@ -31,7 +31,7 @@ import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -73,7 +73,7 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { PCollection<Integer> collection = input.apply( ParDo.of( - new DoFn<String, Integer>() { + new OldDoFn<String, Integer>() { @Override public void processElement(ProcessContext c) { c.output(c.element().length()); @@ -127,7 +127,7 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { PCollection<Integer> collection = input.apply( ParDo.of( - new DoFn<String, Integer>() { + new OldDoFn<String, Integer>() { @Override public void processElement(ProcessContext c) { c.sideOutput(sideOutputTag, c.element().length()); @@ -179,7 +179,7 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE); ParDo.Bound<String, KV<String, Integer>> pardo = ParDo.of( - new DoFn<String, KV<String, Integer>>() { + new OldDoFn<String, KV<String, Integer>>() { @Override public void processElement(ProcessContext c) { c.windowingInternals() @@ -265,7 +265,7 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { ParDo.Bound<String, KV<String, Integer>> pardo = ParDo.of( - new DoFn<String, KV<String, Integer>>() { + new OldDoFn<String, KV<String, Integer>>() { @Override public void processElement(ProcessContext c) { c.windowingInternals().stateInternals(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java index 7c7005c..22f148a 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java @@ -38,9 +38,9 @@ import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Filter; import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -76,7 +76,6 @@ import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.Map; - import javax.annotation.Nullable; /** @@ -105,9 +104,9 @@ public class WatermarkManagerTest implements Serializable { createdInts = p.apply("createdInts", Create.of(1, 2, 3)); filtered = createdInts.apply("filtered", Filter.greaterThan(1)); - filteredTimesTwo = filtered.apply("timesTwo", ParDo.of(new DoFn<Integer, Integer>() { + filteredTimesTwo = filtered.apply("timesTwo", ParDo.of(new OldDoFn<Integer, Integer>() { @Override - public void processElement(DoFn<Integer, Integer>.ProcessContext c) throws Exception { + public void processElement(OldDoFn<Integer, Integer>.ProcessContext c) throws Exception { c.output(c.element() * 2); } })); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java index 56737a4..716c8ad 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java @@ -32,7 +32,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.Keys; import org.apache.beam.sdk.transforms.PTransform; @@ -230,7 +230,7 @@ public class TFIDF { // Create a collection of pairs mapping a URI to each // of the words in the document associated with that that URI. PCollection<KV<URI, String>> uriToWords = uriToContent - .apply("SplitWords", ParDo.of(new DoFn<KV<URI, String>, KV<URI, String>>() { + .apply("SplitWords", ParDo.of(new OldDoFn<KV<URI, String>, KV<URI, String>>() { private static final long serialVersionUID = 0; @Override @@ -275,7 +275,7 @@ public class TFIDF { // by the URI key. PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount .apply("ShiftKeys", ParDo.of( - new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() { + new OldDoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() { private static final long serialVersionUID = 0; @Override @@ -316,7 +316,7 @@ public class TFIDF { // divided by the total number of words in the document. PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal .apply("ComputeTermFrequencies", ParDo.of( - new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() { + new OldDoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() { private static final long serialVersionUID = 0; @Override @@ -339,11 +339,11 @@ public class TFIDF { // documents in which the word appears divided by the total // number of documents in the corpus. Note how the total number of // documents is passed as a side input; the same value is - // presented to each invocation of the DoFn. + // presented to each invocation of the OldDoFn. PCollection<KV<String, Double>> wordToDf = wordToDocCount .apply("ComputeDocFrequencies", ParDo .withSideInputs(totalDocuments) - .of(new DoFn<KV<String, Long>, KV<String, Double>>() { + .of(new OldDoFn<KV<String, Long>, KV<String, Double>>() { private static final long serialVersionUID = 0; @Override @@ -375,7 +375,7 @@ public class TFIDF { return wordToUriAndTfAndDf .apply("ComputeTfIdf", ParDo.of( - new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() { + new OldDoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() { private static final long serialVersionUID = 0; @Override @@ -416,7 +416,7 @@ public class TFIDF { @Override public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) { return wordToUriAndTfIdf - .apply("Format", ParDo.of(new DoFn<KV<String, KV<URI, Double>>, String>() { + .apply("Format", ParDo.of(new OldDoFn<KV<String, KV<URI, Double>>, String>() { private static final long serialVersionUID = 0; @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java index c54229d..080cdc9 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java @@ -27,7 +27,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -38,7 +38,7 @@ import org.apache.beam.sdk.values.PCollection; public class WordCount { - public static class ExtractWordsFn extends DoFn<String, String> { + public static class ExtractWordsFn extends OldDoFn<String, String> { private final Aggregator<Long, Long> emptyLines = createAggregator("emptyLines", new Sum.SumLongFn()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java index c0ff85d..068404a 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java @@ -29,7 +29,7 @@ import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.Filter; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.PTransform; @@ -92,7 +92,7 @@ public class AutoComplete { // Map the KV outputs of Count into our own CompletionCandiate class. .apply("CreateCompletionCandidates", ParDo.of( - new DoFn<KV<String, Long>, CompletionCandidate>() { + new OldDoFn<KV<String, Long>, CompletionCandidate>() { private static final long serialVersionUID = 0; @Override @@ -182,7 +182,7 @@ public class AutoComplete { } private static class FlattenTops - extends DoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> { + extends OldDoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> { private static final long serialVersionUID = 0; @Override @@ -236,10 +236,10 @@ public class AutoComplete { } /** - * A DoFn that keys each candidate by all its prefixes. + * A OldDoFn that keys each candidate by all its prefixes. */ private static class AllPrefixes - extends DoFn<CompletionCandidate, KV<String, CompletionCandidate>> { + extends OldDoFn<CompletionCandidate, KV<String, CompletionCandidate>> { private static final long serialVersionUID = 0; private final int minPrefix; @@ -314,7 +314,7 @@ public class AutoComplete { } } - static class ExtractWordsFn extends DoFn<String, String> { + static class ExtractWordsFn extends OldDoFn<String, String> { private final Aggregator<Long, Long> emptyLines = createAggregator("emptyLines", new Sum.SumLongFn()); @@ -340,8 +340,8 @@ public class AutoComplete { * Takes as input a the top candidates per prefix, and emits an entity * suitable for writing to Datastore. */ - static class FormatForPerTaskLocalFile extends DoFn<KV<String, List<CompletionCandidate>>, String> - implements DoFn.RequiresWindowAccess{ + static class FormatForPerTaskLocalFile extends OldDoFn<KV<String, List<CompletionCandidate>>, String> + implements OldDoFn.RequiresWindowAccess{ private static final long serialVersionUID = 0; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java index f456b27..7d7c0c7 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java @@ -23,7 +23,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.join.CoGbkResult; import org.apache.beam.sdk.transforms.join.CoGroupByKey; @@ -76,7 +76,7 @@ public class JoinExamples { // country code 'key' -> string of <event info>, <country name> PCollection<KV<String, String>> finalResultCollection = kvpCollection.apply("Process", ParDo.of( - new DoFn<KV<String, CoGbkResult>, KV<String, String>>() { + new OldDoFn<KV<String, CoGbkResult>, KV<String, String>>() { private static final long serialVersionUID = 0; @Override @@ -98,7 +98,7 @@ public class JoinExamples { })); return finalResultCollection - .apply("Format", ParDo.of(new DoFn<KV<String, String>, String>() { + .apply("Format", ParDo.of(new OldDoFn<KV<String, String>, String>() { private static final long serialVersionUID = 0; @Override @@ -110,7 +110,7 @@ public class JoinExamples { })); } - static class ExtractEventDataFn extends DoFn<String, KV<String, String>> { + static class ExtractEventDataFn extends OldDoFn<String, KV<String, String>> { private static final long serialVersionUID = 0; @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java index 8756abe..395b409 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java @@ -30,7 +30,7 @@ import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -326,7 +326,7 @@ public class KafkaIOExamples { * Print contents to stdout * @param <T> type of the input */ - private static class PrintFn<T> extends DoFn<T, T> { + private static class PrintFn<T> extends OldDoFn<T, T> { @Override public void processElement(ProcessContext c) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java index 4e81420..8c31783 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java @@ -27,7 +27,7 @@ import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; @@ -49,7 +49,7 @@ public class KafkaWindowedWordCountExample { static final String GROUP_ID = "myGroup"; // Default groupId static final String ZOOKEEPER = "localhost:2181"; // Default zookeeper to connect to for Kafka - public static class ExtractWordsFn extends DoFn<String, String> { + public static class ExtractWordsFn extends OldDoFn<String, String> { private final Aggregator<Long, Long> emptyLines = createAggregator("emptyLines", new Sum.SumLongFn()); @@ -71,7 +71,7 @@ public class KafkaWindowedWordCountExample { } } - public static class FormatAsStringFn extends DoFn<KV<String, Long>, String> { + public static class FormatAsStringFn extends OldDoFn<KV<String, Long>, String> { @Override public void processElement(ProcessContext c) { String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java index 1b532a7..d149e4e 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java @@ -27,7 +27,7 @@ import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; @@ -59,7 +59,7 @@ public class WindowedWordCount { static final long WINDOW_SIZE = 10; // Default window duration in seconds static final long SLIDE_SIZE = 5; // Default window slide in seconds - static class FormatAsStringFn extends DoFn<KV<String, Long>, String> { + static class FormatAsStringFn extends OldDoFn<KV<String, Long>, String> { @Override public void processElement(ProcessContext c) { String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString(); @@ -67,7 +67,7 @@ public class WindowedWordCount { } } - static class ExtractWordsFn extends DoFn<String, String> { + static class ExtractWordsFn extends OldDoFn<String, String> { private final Aggregator<Long, Long> emptyLines = createAggregator("emptyLines", new Sum.SumLongFn()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java index 0bba0d0..01a3ab2 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java @@ -39,7 +39,7 @@ import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineFnBase; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; @@ -391,7 +391,7 @@ class FlinkBatchTransformTranslators { inputDataSet.groupBy(new KvKeySelector<InputT, K>(inputCoder.getKeyCoder())); // construct a map from side input to WindowingStrategy so that - // the DoFn runner can map main-input windows to side input windows + // the OldDoFn runner can map main-input windows to side input windows Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>(); for (PCollectionView<?> sideInput: transform.getSideInputs()) { sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal()); @@ -494,7 +494,7 @@ class FlinkBatchTransformTranslators { DataSet<WindowedValue<InputT>> inputDataSet = context.getInputDataSet(context.getInput(transform)); - final DoFn<InputT, OutputT> doFn = transform.getFn(); + final OldDoFn<InputT, OutputT> doFn = transform.getFn(); TypeInformation<WindowedValue<OutputT>> typeInformation = context.getTypeInfo(context.getOutput(transform)); @@ -502,7 +502,7 @@ class FlinkBatchTransformTranslators { List<PCollectionView<?>> sideInputs = transform.getSideInputs(); // construct a map from side input to WindowingStrategy so that - // the DoFn runner can map main-input windows to side input windows + // the OldDoFn runner can map main-input windows to side input windows Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>(); for (PCollectionView<?> sideInput: sideInputs) { sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal()); @@ -539,7 +539,7 @@ class FlinkBatchTransformTranslators { DataSet<WindowedValue<InputT>> inputDataSet = context.getInputDataSet(context.getInput(transform)); - final DoFn<InputT, OutputT> doFn = transform.getFn(); + final OldDoFn<InputT, OutputT> doFn = transform.getFn(); Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll(); @@ -578,7 +578,7 @@ class FlinkBatchTransformTranslators { List<PCollectionView<?>> sideInputs = transform.getSideInputs(); // construct a map from side input to WindowingStrategy so that - // the DoFn runner can map main-input windows to side input windows + // the OldDoFn runner can map main-input windows to side input windows Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>(); for (PCollectionView<?> sideInput: sideInputs) { sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java index fa6b387..5b55d42 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java @@ -35,11 +35,10 @@ import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.Sink; import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.Write; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; @@ -71,8 +70,6 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; -import org.apache.flink.streaming.api.functions.IngestionTimeExtractor; -import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.util.Collector; import org.joda.time.Instant; import org.slf4j.Logger; @@ -346,8 +343,8 @@ public class FlinkStreamingTransformTranslators { context.setOutputDataStream(context.getOutput(transform), windowedStream); } - private static <T, W extends BoundedWindow> DoFn<T, T> createWindowAssigner(final WindowFn<T, W> windowFn) { - return new DoFn<T, T>() { + private static <T, W extends BoundedWindow> OldDoFn<T, T> createWindowAssigner(final WindowFn<T, W> windowFn) { + return new OldDoFn<T, T>() { @Override public void processElement(final ProcessContext c) throws Exception {
