http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java index 89243a3..a4af1b0 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java @@ -19,7 +19,7 @@ package org.apache.beam.runners.flink.translation.functions; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollectionView; @@ -30,13 +30,13 @@ import org.apache.flink.util.Collector; import java.util.Map; /** - * Encapsulates a {@link org.apache.beam.sdk.transforms.DoFn} + * Encapsulates a {@link OldDoFn} * inside a Flink {@link org.apache.flink.api.common.functions.RichMapPartitionFunction}. */ public class FlinkDoFnFunction<InputT, OutputT> extends RichMapPartitionFunction<WindowedValue<InputT>, WindowedValue<OutputT>> { - private final DoFn<InputT, OutputT> doFn; + private final OldDoFn<InputT, OutputT> doFn; private final SerializedPipelineOptions serializedOptions; private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs; @@ -47,7 +47,7 @@ public class FlinkDoFnFunction<InputT, OutputT> private final WindowingStrategy<?, ?> windowingStrategy; public FlinkDoFnFunction( - DoFn<InputT, OutputT> doFn, + OldDoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy, Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, PipelineOptions options) { @@ -56,7 +56,7 @@ public class FlinkDoFnFunction<InputT, OutputT> this.serializedOptions = new SerializedPipelineOptions(options); this.windowingStrategy = windowingStrategy; - this.requiresWindowAccess = doFn instanceof DoFn.RequiresWindowAccess; + this.requiresWindowAccess = doFn instanceof OldDoFn.RequiresWindowAccess; this.hasSideInputs = !sideInputs.isEmpty(); }
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java index 9074d72..2d36043 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java @@ -20,7 +20,7 @@ package org.apache.beam.runners.flink.translation.functions; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; @@ -60,7 +60,7 @@ public class FlinkMergingNonShuffleReduceFunction< private final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn; - private final DoFn<KV<K, InputT>, KV<K, OutputT>> doFn; + private final OldDoFn<KV<K, InputT>, KV<K, OutputT>> doFn; private final WindowingStrategy<?, W> windowingStrategy; @@ -81,8 +81,8 @@ public class FlinkMergingNonShuffleReduceFunction< this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); - // dummy DoFn because we need one for ProcessContext - this.doFn = new DoFn<KV<K, InputT>, KV<K, OutputT>>() { + // dummy OldDoFn because we need one for ProcessContext + this.doFn = new OldDoFn<KV<K, InputT>, KV<K, OutputT>>() { @Override public void processElement(ProcessContext c) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java index f92e76f..6e673fc 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java @@ -19,7 +19,7 @@ package org.apache.beam.runners.flink.translation.functions; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; @@ -32,7 +32,7 @@ import org.apache.flink.util.Collector; import java.util.Map; /** - * Encapsulates a {@link org.apache.beam.sdk.transforms.DoFn} that uses side outputs + * Encapsulates a {@link OldDoFn} that uses side outputs * inside a Flink {@link org.apache.flink.api.common.functions.RichMapPartitionFunction}. * * We get a mapping from {@link org.apache.beam.sdk.values.TupleTag} to output index @@ -42,7 +42,7 @@ import java.util.Map; public class FlinkMultiOutputDoFnFunction<InputT, OutputT> extends RichMapPartitionFunction<WindowedValue<InputT>, WindowedValue<RawUnionValue>> { - private final DoFn<InputT, OutputT> doFn; + private final OldDoFn<InputT, OutputT> doFn; private final SerializedPipelineOptions serializedOptions; private final Map<TupleTag<?>, Integer> outputMap; @@ -55,7 +55,7 @@ public class FlinkMultiOutputDoFnFunction<InputT, OutputT> private final WindowingStrategy<?, ?> windowingStrategy; public FlinkMultiOutputDoFnFunction( - DoFn<InputT, OutputT> doFn, + OldDoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy, Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, PipelineOptions options, @@ -64,7 +64,7 @@ public class FlinkMultiOutputDoFnFunction<InputT, OutputT> this.serializedOptions = new SerializedPipelineOptions(options); this.outputMap = outputMap; - this.requiresWindowAccess = doFn instanceof DoFn.RequiresWindowAccess; + this.requiresWindowAccess = doFn instanceof OldDoFn.RequiresWindowAccess; this.hasSideInputs = !sideInputs.isEmpty(); this.windowingStrategy = windowingStrategy; this.sideInputs = sideInputs; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java index 71b6d27..fab3c85 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java @@ -18,7 +18,7 @@ package org.apache.beam.runners.flink.translation.functions; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -35,7 +35,7 @@ import java.util.Collection; import java.util.Map; /** - * {@link DoFn.ProcessContext} for {@link FlinkMultiOutputDoFnFunction} that supports + * {@link OldDoFn.ProcessContext} for {@link FlinkMultiOutputDoFnFunction} that supports * side outputs. */ class FlinkMultiOutputProcessContext<InputT, OutputT> @@ -50,7 +50,7 @@ class FlinkMultiOutputProcessContext<InputT, OutputT> FlinkMultiOutputProcessContext( PipelineOptions pipelineOptions, RuntimeContext runtimeContext, - DoFn<InputT, OutputT> doFn, + OldDoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy, Collector<WindowedValue<RawUnionValue>> collector, Map<TupleTag<?>, Integer> outputMap, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java index d49821b..98446f9 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java @@ -17,18 +17,16 @@ */ package org.apache.beam.runners.flink.translation.functions; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.joda.time.Instant; -import java.util.Collection; - /** * {@link WindowFn.AssignContext} for calling a {@link WindowFn} for elements emitted from - * {@link org.apache.beam.sdk.transforms.DoFn#startBundle(DoFn.Context)} - * or {@link DoFn#finishBundle(DoFn.Context)}. + * {@link OldDoFn#startBundle(OldDoFn.Context)} + * or {@link OldDoFn#finishBundle(OldDoFn.Context)}. * * <p>In those cases the {@code WindowFn} is not allowed to access any element information. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java index c29e1df..2db4b7b 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java @@ -20,7 +20,7 @@ package org.apache.beam.runners.flink.translation.functions; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -58,7 +58,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind protected final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn; - protected final DoFn<KV<K, InputT>, KV<K, AccumT>> doFn; + protected final OldDoFn<KV<K, InputT>, KV<K, AccumT>> doFn; protected final WindowingStrategy<?, W> windowingStrategy; @@ -77,8 +77,8 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind this.sideInputs = sideInputs; this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); - // dummy DoFn because we need one for ProcessContext - this.doFn = new DoFn<KV<K, InputT>, KV<K, AccumT>>() { + // dummy OldDoFn because we need one for ProcessContext + this.doFn = new OldDoFn<KV<K, InputT>, KV<K, AccumT>>() { @Override public void processElement(ProcessContext c) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java index 235a803..3954d1f 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java @@ -24,7 +24,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.TimerInternals; @@ -48,10 +48,10 @@ import java.util.Iterator; import java.util.Map; /** - * {@link org.apache.beam.sdk.transforms.DoFn.ProcessContext} for our Flink Wrappers. + * {@link OldDoFn.ProcessContext} for our Flink Wrappers. */ class FlinkProcessContext<InputT, OutputT> - extends DoFn<InputT, OutputT>.ProcessContext { + extends OldDoFn<InputT, OutputT>.ProcessContext { private final PipelineOptions pipelineOptions; private final RuntimeContext runtimeContext; @@ -67,7 +67,7 @@ class FlinkProcessContext<InputT, OutputT> FlinkProcessContext( PipelineOptions pipelineOptions, RuntimeContext runtimeContext, - DoFn<InputT, OutputT> doFn, + OldDoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy, Collector<WindowedValue<OutputT>> collector, Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs) { @@ -80,7 +80,7 @@ class FlinkProcessContext<InputT, OutputT> this.pipelineOptions = pipelineOptions; this.runtimeContext = runtimeContext; this.collector = collector; - this.requiresWindowAccess = doFn instanceof DoFn.RequiresWindowAccess; + this.requiresWindowAccess = doFn instanceof OldDoFn.RequiresWindowAccess; this.windowingStrategy = windowingStrategy; this.sideInputs = sideInputs; @@ -90,7 +90,7 @@ class FlinkProcessContext<InputT, OutputT> FlinkProcessContext( PipelineOptions pipelineOptions, RuntimeContext runtimeContext, - DoFn<InputT, OutputT> doFn, + OldDoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy, Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs) { doFn.super(); @@ -101,7 +101,7 @@ class FlinkProcessContext<InputT, OutputT> this.pipelineOptions = pipelineOptions; this.runtimeContext = runtimeContext; this.collector = null; - this.requiresWindowAccess = doFn instanceof DoFn.RequiresWindowAccess; + this.requiresWindowAccess = doFn instanceof OldDoFn.RequiresWindowAccess; this.windowingStrategy = windowingStrategy; this.sideInputs = sideInputs; @@ -141,7 +141,7 @@ class FlinkProcessContext<InputT, OutputT> public BoundedWindow window() { if (!requiresWindowAccess) { throw new UnsupportedOperationException( - "window() is only available in the context of a DoFn marked as RequiresWindowAccess."); + "window() is only available in the context of a OldDoFn marked as RequiresWindowAccess."); } return Iterables.getOnlyElement(windowedValue.getWindows()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java index 9cbc6b9..b1729a4 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java @@ -20,7 +20,7 @@ package org.apache.beam.runners.flink.translation.functions; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -60,7 +60,7 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow> protected final CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> combineFn; - protected final DoFn<KV<K, AccumT>, KV<K, OutputT>> doFn; + protected final OldDoFn<KV<K, AccumT>, KV<K, OutputT>> doFn; protected final WindowingStrategy<?, W> windowingStrategy; @@ -81,8 +81,8 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow> this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); - // dummy DoFn because we need one for ProcessContext - this.doFn = new DoFn<KV<K, AccumT>, KV<K, OutputT>>() { + // dummy OldDoFn because we need one for ProcessContext + this.doFn = new OldDoFn<KV<K, AccumT>, KV<K, OutputT>>() { @Override public void processElement(ProcessContext c) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java index e40d6e3..74ec66a 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java @@ -24,7 +24,7 @@ import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregat import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.WindowFn; @@ -52,13 +52,13 @@ import java.util.Collection; * */ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFlatMapFunction<WindowedValue<IN>, WindowedValue<OUTFL>> { - private final DoFn<IN, OUTDF> doFn; + private final OldDoFn<IN, OUTDF> doFn; private final WindowingStrategy<?, ?> windowingStrategy; private final SerializedPipelineOptions serializedPipelineOptions; private DoFnProcessContext context; - public FlinkAbstractParDoWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUTDF> doFn) { + public FlinkAbstractParDoWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, OldDoFn<IN, OUTDF> doFn) { checkNotNull(options); checkNotNull(windowingStrategy); checkNotNull(doFn); @@ -104,15 +104,15 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFl doFn.processElement(this.context); } - private class DoFnProcessContext extends DoFn<IN, OUTDF>.ProcessContext { + private class DoFnProcessContext extends OldDoFn<IN, OUTDF>.ProcessContext { - private final DoFn<IN, OUTDF> fn; + private final OldDoFn<IN, OUTDF> fn; protected final Collector<WindowedValue<OUTFL>> collector; private WindowedValue<IN> element; - private DoFnProcessContext(DoFn<IN, OUTDF> function, + private DoFnProcessContext(OldDoFn<IN, OUTDF> function, Collector<WindowedValue<OUTFL>> outCollector) { function.super(); super.setupDelegateAggregators(); @@ -137,9 +137,9 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFl @Override public BoundedWindow window() { - if (!(fn instanceof DoFn.RequiresWindowAccess)) { + if (!(fn instanceof OldDoFn.RequiresWindowAccess)) { throw new UnsupportedOperationException( - "window() is only available in the context of a DoFn marked as RequiresWindowAccess."); + "window() is only available in the context of a OldDoFn marked as RequiresWindowAccess."); } Collection<? extends BoundedWindow> windows = this.element.getWindows(); @@ -211,7 +211,7 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFl throw new IllegalArgumentException(String.format( "Cannot output with timestamp %s. Output timestamps must be no earlier than the " + "timestamp of the current input (%s) minus the allowed skew (%s). See the " - + "DoFn#getAllowedTimestmapSkew() Javadoc for details on changing the allowed skew.", + + "OldDoFn#getAllowedTimestmapSkew() Javadoc for details on changing the allowed skew.", timestamp, ref.getTimestamp(), PeriodFormat.getDefault().print(doFn.getAllowedTimestampSkew().toPeriod()))); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java index 0e977db..103a12b 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java @@ -36,7 +36,7 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -112,7 +112,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT> private transient CoderRegistry coderRegistry; - private DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> operator; + private OldDoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> operator; private ProcessContext context; @@ -263,7 +263,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT> * a function with that combiner is created, so that elements are combined as they arrive. This is * done for speed and (in most of the cases) for reduction of the per-window state. */ - private <W extends BoundedWindow> DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> createGroupAlsoByWindowOperator() { + private <W extends BoundedWindow> OldDoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> createGroupAlsoByWindowOperator() { if (this.operator == null) { StateInternalsFactory<K> stateInternalsFactory = new GroupAlsoByWindowWrapperStateInternalsFactory(); @@ -272,7 +272,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT> // Thus VOUT == Iterable<VIN> Coder<VIN> inputValueCoder = inputKvCoder.getValueCoder(); - this.operator = (DoFn) GroupAlsoByWindowViaWindowSetDoFn.create( + this.operator = (OldDoFn) GroupAlsoByWindowViaWindowSetDoFn.create( (WindowingStrategy<?, W>) this.windowingStrategy, stateInternalsFactory, SystemReduceFn.<K, VIN, W>buffering(inputValueCoder)); } else { Coder<K> inputKeyCoder = inputKvCoder.getKeyCoder(); @@ -446,7 +446,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT> private KeyedWorkItem<K, VIN> element; - public ProcessContext(DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> function, + public ProcessContext(OldDoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> function, TimestampedCollector<WindowedValue<KV<K, VOUT>>> outCollector, FlinkTimerInternals timerInternals) { function.super(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java index 619b887..0ea0cab 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java @@ -20,7 +20,7 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming; import static com.google.common.base.Preconditions.checkNotNull; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingInternals; @@ -40,7 +40,7 @@ public class FlinkParDoBoundMultiWrapper<IN, OUT> extends FlinkAbstractParDoWrap private final TupleTag<?> mainTag; private final Map<TupleTag<?>, Integer> outputLabels; - public FlinkParDoBoundMultiWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUT> doFn, TupleTag<?> mainTag, Map<TupleTag<?>, Integer> tagsToLabels) { + public FlinkParDoBoundMultiWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, OldDoFn<IN, OUT> doFn, TupleTag<?> mainTag, Map<TupleTag<?>, Integer> tagsToLabels) { super(options, windowingStrategy, doFn); this.mainTag = checkNotNull(mainTag); this.outputLabels = checkNotNull(tagsToLabels); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java index 4def0c6..6be94b2 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java @@ -19,7 +19,7 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.TimerInternals; @@ -41,7 +41,7 @@ import java.util.Collection; * */ public class FlinkParDoBoundWrapper<IN, OUT> extends FlinkAbstractParDoWrapper<IN, OUT, OUT> { - public FlinkParDoBoundWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUT> doFn) { + public FlinkParDoBoundWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, OldDoFn<IN, OUT> doFn) { super(options, windowingStrategy, doFn); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java index 9e55002..a0b33f8 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java @@ -19,7 +19,7 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming.state; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.TimerInternals; @@ -106,7 +106,7 @@ public abstract class AbstractFlinkTimerInternals<K, VIN> implements TimerIntern } } - public void encodeTimerInternals(DoFn.ProcessContext context, + public void encodeTimerInternals(OldDoFn.ProcessContext context, StateCheckpointWriter writer, KvCoder<K, VIN> kvCoder, Coder<? extends BoundedWindow> windowCoder) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java index 61e219c..c24d91d 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java @@ -27,7 +27,7 @@ import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; @@ -114,7 +114,7 @@ public class PipelineOptionsTest { } - private static class TestDoFn extends DoFn<Object, Object> { + private static class TestDoFn extends OldDoFn<Object, Object> { @Override public void processElement(ProcessContext c) throws Exception { @@ -126,7 +126,7 @@ public class PipelineOptionsTest { } private static class TestParDoWrapper extends FlinkAbstractParDoWrapper { - public TestParDoWrapper(PipelineOptions options, WindowingStrategy windowingStrategy, DoFn doFn) { + public TestParDoWrapper(PipelineOptions options, WindowingStrategy windowingStrategy, OldDoFn doFn) { super(options, windowingStrategy, doFn); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java index bb79b27..ca70096 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java @@ -20,7 +20,7 @@ package org.apache.beam.runners.flink; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.CountingInput; import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; @@ -72,7 +72,7 @@ public class ReadSourceITCase extends JavaProgramTestBase { PCollection<String> result = p .apply(CountingInput.upTo(10)) - .apply(ParDo.of(new DoFn<Long, String>() { + .apply(ParDo.of(new OldDoFn<Long, String>() { @Override public void processElement(ProcessContext c) throws Exception { c.output(c.element().toString()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java index fe71802..bc69f34 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java @@ -20,7 +20,7 @@ package org.apache.beam.runners.flink; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.CountingInput; import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import com.google.common.base.Joiner; import org.apache.flink.streaming.util.StreamingProgramTestBase; @@ -59,7 +59,7 @@ public class ReadSourceStreamingITCase extends StreamingProgramTestBase { p .apply(CountingInput.upTo(10)) - .apply(ParDo.of(new DoFn<Long, String>() { + .apply(ParDo.of(new OldDoFn<Long, String>() { @Override public void processElement(ProcessContext c) throws Exception { c.output(c.element().toString()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java index 1b55c61..ca183a8 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java @@ -21,7 +21,7 @@ import org.apache.beam.runners.flink.FlinkTestPipeline; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; @@ -61,7 +61,7 @@ public class GroupByNullKeyTest extends StreamingProgramTestBase implements Seri compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); } - public static class ExtractUserAndTimestamp extends DoFn<KV<Integer, String>, String> { + public static class ExtractUserAndTimestamp extends OldDoFn<KV<Integer, String>, String> { private static final long serialVersionUID = 0; @Override @@ -97,7 +97,7 @@ public class GroupByNullKeyTest extends StreamingProgramTestBase implements Seri .withAllowedLateness(Duration.ZERO) .discardingFiredPanes()) - .apply(ParDo.of(new DoFn<String, KV<Void, String>>() { + .apply(ParDo.of(new OldDoFn<String, KV<Void, String>>() { @Override public void processElement(ProcessContext c) throws Exception { String elem = c.element(); @@ -105,7 +105,7 @@ public class GroupByNullKeyTest extends StreamingProgramTestBase implements Seri } })) .apply(GroupByKey.<Void, String>create()) - .apply(ParDo.of(new DoFn<KV<Void, Iterable<String>>, String>() { + .apply(ParDo.of(new OldDoFn<KV<Void, Iterable<String>>, String>() { @Override public void processElement(ProcessContext c) throws Exception { KV<Void, Iterable<String>> elem = c.element(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java index 1efb42f..7912aee 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java @@ -22,7 +22,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.Window; @@ -103,7 +103,7 @@ public class TopWikipediaSessionsITCase extends StreamingProgramTestBase impleme - .apply(ParDo.of(new DoFn<TableRow, String>() { + .apply(ParDo.of(new OldDoFn<TableRow, String>() { @Override public void processElement(ProcessContext c) throws Exception { TableRow row = c.element(); @@ -120,7 +120,7 @@ public class TopWikipediaSessionsITCase extends StreamingProgramTestBase impleme .apply(Count.<String>perElement()); - PCollection<String> format = output.apply(ParDo.of(new DoFn<KV<String, Long>, String>() { + PCollection<String> format = output.apply(ParDo.of(new OldDoFn<KV<String, Long>, String>() { @Override public void processElement(ProcessContext c) throws Exception { KV<String, Long> el = c.element(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 7fd203f..ac06b52 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -47,9 +47,9 @@ import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.runners.TransformTreeNode; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; @@ -82,7 +82,6 @@ import com.google.api.services.dataflow.model.WorkerPool; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -94,7 +93,6 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; - import javax.annotation.Nullable; /** @@ -1021,7 +1019,7 @@ public class DataflowPipelineTranslator { } private static void translateFn( - DoFn fn, + OldDoFn fn, WindowingStrategy windowingStrategy, Iterable<PCollectionView<?>> sideInputs, Coder inputCoder, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index e7cc20e..d762d50 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -78,9 +78,9 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -143,7 +143,6 @@ import com.google.common.collect.Multimap; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; - import org.joda.time.DateTimeUtils; import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; @@ -173,7 +172,6 @@ import java.util.Random; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; - import javax.annotation.Nullable; /** @@ -762,13 +760,14 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { PTransform<PCollection<T>, PCollection<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>>> { /** - * A {@link DoFn} that for each element outputs a {@code KV} structure suitable for + * A {@link OldDoFn} that for each element outputs a {@code KV} structure suitable for * grouping by the hash of the window's byte representation and sorting the grouped values * using the window's byte representation. */ @SystemDoFnInternal private static class UseWindowHashAsKeyAndWindowAsSortKeyDoFn<T, W extends BoundedWindow> - extends DoFn<T, KV<Integer, KV<W, WindowedValue<T>>>> implements DoFn.RequiresWindowAccess { + extends OldDoFn<T, KV<Integer, KV<W, WindowedValue<T>>>> implements + OldDoFn.RequiresWindowAccess { private final IsmRecordCoder<?> ismCoderForHash; private UseWindowHashAsKeyAndWindowAsSortKeyDoFn(IsmRecordCoder<?> ismCoderForHash) { @@ -828,15 +827,15 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { extends PTransform<PCollection<T>, PCollectionView<T>> { /** - * A {@link DoFn} that outputs {@link IsmRecord}s. These records are structured as follows: + * A {@link OldDoFn} that outputs {@link IsmRecord}s. These records are structured as follows: * <ul> * <li>Key 1: Window * <li>Value: Windowed value * </ul> */ static class IsmRecordForSingularValuePerWindowDoFn<T, W extends BoundedWindow> - extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>, - IsmRecord<WindowedValue<T>>> { + extends OldDoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>, + IsmRecord<WindowedValue<T>>> { private final Coder<W> windowCoder; IsmRecordForSingularValuePerWindowDoFn(Coder<W> windowCoder) { @@ -902,8 +901,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { applyForSingleton( DataflowRunner runner, PCollection<T> input, - DoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>, - IsmRecord<WindowedValue<FinalT>>> doFn, + OldDoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>, + IsmRecord<WindowedValue<FinalT>>> doFn, boolean hasDefault, FinalT defaultValue, Coder<FinalT> defaultValueCoder) { @@ -998,7 +997,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { static class BatchViewAsList<T> extends PTransform<PCollection<T>, PCollectionView<List<T>>> { /** - * A {@link DoFn} which creates {@link IsmRecord}s assuming that each element is within the + * A {@link OldDoFn} which creates {@link IsmRecord}s assuming that each element is within the * global window. Each {@link IsmRecord} has * <ul> * <li>Key 1: Global window</li> @@ -1008,7 +1007,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { */ @SystemDoFnInternal static class ToIsmRecordForGlobalWindowDoFn<T> - extends DoFn<T, IsmRecord<WindowedValue<T>>> { + extends OldDoFn<T, IsmRecord<WindowedValue<T>>> { long indexInBundle; @Override @@ -1030,7 +1029,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } /** - * A {@link DoFn} which creates {@link IsmRecord}s comparing successive elements windows + * A {@link OldDoFn} which creates {@link IsmRecord}s comparing successive elements windows * to locate the window boundaries. The {@link IsmRecord} has: * <ul> * <li>Key 1: Window</li> @@ -1040,8 +1039,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { */ @SystemDoFnInternal static class ToIsmRecordForNonGlobalWindowDoFn<T, W extends BoundedWindow> - extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>, - IsmRecord<WindowedValue<T>>> { + extends OldDoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>, + IsmRecord<WindowedValue<T>>> { private final Coder<W> windowCoder; ToIsmRecordForNonGlobalWindowDoFn(Coder<W> windowCoder) { @@ -1174,7 +1173,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> { /** - * A {@link DoFn} which groups elements by window boundaries. For each group, + * A {@link OldDoFn} which groups elements by window boundaries. For each group, * the group of elements is transformed into a {@link TransformedMap}. * The transformed {@code Map<K, V>} is backed by a {@code Map<K, WindowedValue<V>>} * and contains a function {@code WindowedValue<V> -> V}. @@ -1188,10 +1187,10 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { * </ul> */ static class ToMapDoFn<K, V, W extends BoundedWindow> - extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<KV<K, V>>>>>, - IsmRecord<WindowedValue<TransformedMap<K, - WindowedValue<V>, - V>>>> { + extends OldDoFn<KV<Integer, Iterable<KV<W, WindowedValue<KV<K, V>>>>>, + IsmRecord<WindowedValue<TransformedMap<K, + WindowedValue<V>, + V>>>> { private final Coder<W> windowCoder; ToMapDoFn(Coder<W> windowCoder) { @@ -1358,8 +1357,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { @SystemDoFnInternal private static class GroupByKeyHashAndSortByKeyAndWindowDoFn<K, V, W> - extends DoFn<KV<K, V>, KV<Integer, KV<KV<K, W>, WindowedValue<V>>>> - implements DoFn.RequiresWindowAccess { + extends OldDoFn<KV<K, V>, KV<Integer, KV<KV<K, W>, WindowedValue<V>>>> + implements OldDoFn.RequiresWindowAccess { private final IsmRecordCoder<?> coder; private GroupByKeyHashAndSortByKeyAndWindowDoFn(IsmRecordCoder<?> coder) { @@ -1412,7 +1411,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } /** - * A {@link DoFn} which creates {@link IsmRecord}s comparing successive elements windows + * A {@link OldDoFn} which creates {@link IsmRecord}s comparing successive elements windows * and keys to locate window and key boundaries. The main output {@link IsmRecord}s have: * <ul> * <li>Key 1: Window</li> @@ -1424,12 +1423,12 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { * <p>Additionally, we output all the unique keys per window seen to {@code outputForEntrySet} * and the unique key count per window to {@code outputForSize}. * - * <p>Finally, if this DoFn has been requested to perform unique key checking, it will + * <p>Finally, if this OldDoFn has been requested to perform unique key checking, it will * throw an {@link IllegalStateException} if more than one key per window is found. */ static class ToIsmRecordForMapLikeDoFn<K, V, W extends BoundedWindow> - extends DoFn<KV<Integer, Iterable<KV<KV<K, W>, WindowedValue<V>>>>, - IsmRecord<WindowedValue<V>>> { + extends OldDoFn<KV<Integer, Iterable<KV<KV<K, W>, WindowedValue<V>>>>, + IsmRecord<WindowedValue<V>>> { private final TupleTag<KV<Integer, KV<W, Long>>> outputForSize; private final TupleTag<KV<Integer, KV<W, K>>> outputForEntrySet; @@ -1557,7 +1556,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } /** - * A {@link DoFn} which outputs a metadata {@link IsmRecord} per window of: + * A {@link OldDoFn} which outputs a metadata {@link IsmRecord} per window of: * <ul> * <li>Key 1: META key</li> * <li>Key 2: window</li> @@ -1565,11 +1564,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { * <li>Value: sum of values for window</li> * </ul> * - * <p>This {@link DoFn} is meant to be used to compute the number of unique keys + * <p>This {@link OldDoFn} is meant to be used to compute the number of unique keys * per window for map and multimap side inputs. */ static class ToIsmMetadataRecordForSizeDoFn<K, V, W extends BoundedWindow> - extends DoFn<KV<Integer, Iterable<KV<W, Long>>>, IsmRecord<WindowedValue<V>>> { + extends OldDoFn<KV<Integer, Iterable<KV<W, Long>>>, IsmRecord<WindowedValue<V>>> { private final Coder<W> windowCoder; ToIsmMetadataRecordForSizeDoFn(Coder<W> windowCoder) { this.windowCoder = windowCoder; @@ -1606,7 +1605,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } /** - * A {@link DoFn} which outputs a metadata {@link IsmRecord} per window and key pair of: + * A {@link OldDoFn} which outputs a metadata {@link IsmRecord} per window and key pair of: * <ul> * <li>Key 1: META key</li> * <li>Key 2: window</li> @@ -1614,11 +1613,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { * <li>Value: key</li> * </ul> * - * <p>This {@link DoFn} is meant to be used to output index to key records + * <p>This {@link OldDoFn} is meant to be used to output index to key records * per window for map and multimap side inputs. */ static class ToIsmMetadataRecordForKeyDoFn<K, V, W extends BoundedWindow> - extends DoFn<KV<Integer, Iterable<KV<W, K>>>, IsmRecord<WindowedValue<V>>> { + extends OldDoFn<KV<Integer, Iterable<KV<W, K>>>, IsmRecord<WindowedValue<V>>> { private final Coder<K> keyCoder; private final Coder<W> windowCoder; @@ -1658,7 +1657,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } /** - * A {@link DoFn} which partitions sets of elements by window boundaries. Within each + * A {@link OldDoFn} which partitions sets of elements by window boundaries. Within each * partition, the set of elements is transformed into a {@link TransformedMap}. * The transformed {@code Map<K, Iterable<V>>} is backed by a * {@code Map<K, Iterable<WindowedValue<V>>>} and contains a function @@ -1673,10 +1672,10 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { * </ul> */ static class ToMultimapDoFn<K, V, W extends BoundedWindow> - extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<KV<K, V>>>>>, - IsmRecord<WindowedValue<TransformedMap<K, - Iterable<WindowedValue<V>>, - Iterable<V>>>>> { + extends OldDoFn<KV<Integer, Iterable<KV<W, WindowedValue<KV<K, V>>>>>, + IsmRecord<WindowedValue<TransformedMap<K, + Iterable<WindowedValue<V>>, + Iterable<V>>>>> { private final Coder<W> windowCoder; ToMultimapDoFn(Coder<W> windowCoder) { @@ -2335,7 +2334,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { // WindmillSink. .apply(Reshuffle.<Integer, ValueWithRecordId<T>>of()) .apply("StripIds", ParDo.of( - new DoFn<KV<Integer, ValueWithRecordId<T>>, T>() { + new OldDoFn<KV<Integer, ValueWithRecordId<T>>, T>() { @Override public void processElement(ProcessContext c) { c.output(c.element().getValue().getValue()); @@ -2372,11 +2371,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } /** - * A specialized {@link DoFn} for writing the contents of a {@link PCollection} + * A specialized {@link OldDoFn} for writing the contents of a {@link PCollection} * to a streaming {@link PCollectionView} backend implementation. */ private static class StreamingPCollectionViewWriterFn<T> - extends DoFn<Iterable<T>, T> implements DoFn.RequiresWindowAccess { + extends OldDoFn<Iterable<T>, T> implements OldDoFn.RequiresWindowAccess { private final PCollectionView<?> view; private final Coder<T> dataCoder; @@ -2553,7 +2552,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } } - private static class WrapAsList<T> extends DoFn<T, List<T>> { + private static class WrapAsList<T> extends OldDoFn<T, List<T>> { @Override public void processElement(ProcessContext c) { c.output(Arrays.asList(c.element())); @@ -2716,7 +2715,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { @Nullable private PTransform<?, ?> transform; @Nullable - private DoFn<?, ?> doFn; + private OldDoFn<?, ?> doFn; /** * Builds an instance of this class from the overridden transform. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java index 5f808a5..d4f9a90 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java @@ -18,7 +18,7 @@ package org.apache.beam.runners.dataflow.internal; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.Window; @@ -63,9 +63,9 @@ public class AssignWindows<T> extends PTransform<PCollection<T>, PCollection<T>> } else { // If the windowFn didn't change, we just run a pass-through transform and then set the // new windowing strategy. - return input.apply("Identity", ParDo.of(new DoFn<T, T>() { + return input.apply("Identity", ParDo.of(new OldDoFn<T, T>() { @Override - public void processElement(DoFn<T, T>.ProcessContext c) throws Exception { + public void processElement(OldDoFn<T, T>.ProcessContext c) throws Exception { c.output(c.element()); } })).setWindowingStrategyInternal(outputStrategy); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java index f83acbc..2017313 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java @@ -18,32 +18,32 @@ package org.apache.beam.runners.dataflow.util; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollectionView; import java.io.Serializable; /** - * Wrapper class holding the necessary information to serialize a DoFn. + * Wrapper class holding the necessary information to serialize a OldDoFn. * - * @param <InputT> the type of the (main) input elements of the DoFn - * @param <OutputT> the type of the (main) output elements of the DoFn + * @param <InputT> the type of the (main) input elements of the OldDoFn + * @param <OutputT> the type of the (main) output elements of the OldDoFn */ public class DoFnInfo<InputT, OutputT> implements Serializable { - private final DoFn<InputT, OutputT> doFn; + private final OldDoFn<InputT, OutputT> doFn; private final WindowingStrategy<?, ?> windowingStrategy; private final Iterable<PCollectionView<?>> sideInputViews; private final Coder<InputT> inputCoder; - public DoFnInfo(DoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy) { + public DoFnInfo(OldDoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy) { this.doFn = doFn; this.windowingStrategy = windowingStrategy; this.sideInputViews = null; this.inputCoder = null; } - public DoFnInfo(DoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy, + public DoFnInfo(OldDoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy, Iterable<PCollectionView<?>> sideInputViews, Coder<InputT> inputCoder) { this.doFn = doFn; this.windowingStrategy = windowingStrategy; @@ -51,7 +51,7 @@ public class DoFnInfo<InputT, OutputT> implements Serializable { this.inputCoder = inputCoder; } - public DoFn<InputT, OutputT> getDoFn() { + public OldDoFn<InputT, OutputT> getDoFn() { return doFn; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 7d89735..2a01c03 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -49,7 +49,7 @@ import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; @@ -506,7 +506,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { } /** - * Returns a Step for a DoFn by creating and translating a pipeline. + * Returns a Step for a OldDoFn by creating and translating a pipeline. */ private static Step createPredefinedStep() throws Exception { DataflowPipelineOptions options = buildPipelineOptions(); @@ -530,7 +530,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { return step; } - private static class NoOpFn extends DoFn<String, String> { + private static class NoOpFn extends OldDoFn<String, String> { @Override public void processElement(ProcessContext c) throws Exception { c.output(c.element()); } @@ -864,7 +864,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); Pipeline pipeline = Pipeline.create(options); - DoFn<Integer, Integer> fn1 = new DoFn<Integer, Integer>() { + OldDoFn<Integer, Integer> fn1 = new OldDoFn<Integer, Integer>() { @Override public void processElement(ProcessContext c) throws Exception { c.output(c.element()); @@ -880,7 +880,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { } }; - DoFn<Integer, Integer> fn2 = new DoFn<Integer, Integer>() { + OldDoFn<Integer, Integer> fn2 = new OldDoFn<Integer, Integer>() { @Override public void processElement(ProcessContext c) throws Exception { c.output(c.element()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java index 4951043..0677030 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java @@ -25,8 +25,8 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SimpleFunction; @@ -44,7 +44,7 @@ public class WordCount { * of-line. This DoFn tokenizes lines of text into individual words; we pass it to a ParDo in the * pipeline. */ - static class ExtractWordsFn extends DoFn<String, String> { + static class ExtractWordsFn extends OldDoFn<String, String> { private final Aggregator<Long, Long> emptyLines = createAggregator("emptyLines", new Sum.SumLongFn()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java index b5888bd..f4ce516 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java @@ -19,7 +19,7 @@ package org.apache.beam.runners.spark.translation; import org.apache.beam.runners.spark.util.BroadcastHelper; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.TupleTag; @@ -39,7 +39,7 @@ import java.util.Map; public class DoFnFunction<InputT, OutputT> implements FlatMapFunction<Iterator<WindowedValue<InputT>>, WindowedValue<OutputT>> { - private final DoFn<InputT, OutputT> mFunction; + private final OldDoFn<InputT, OutputT> mFunction; private final SparkRuntimeContext mRuntimeContext; private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs; @@ -48,7 +48,7 @@ public class DoFnFunction<InputT, OutputT> * @param runtime Runtime to apply function in. * @param sideInputs Side inputs used in DoFunction. */ - public DoFnFunction(DoFn<InputT, OutputT> fn, + public DoFnFunction(OldDoFn<InputT, OutputT> fn, SparkRuntimeContext runtime, Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) { this.mFunction = fn; @@ -69,7 +69,7 @@ public class DoFnFunction<InputT, OutputT> private final List<WindowedValue<OutputT>> outputs = new LinkedList<>(); - ProcCtxt(DoFn<InputT, OutputT> fn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>, + ProcCtxt(OldDoFn<InputT, OutputT> fn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) { super(fn, runtimeContext, sideInputs); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java index daa767d..e33578d 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java @@ -19,7 +19,7 @@ package org.apache.beam.runners.spark.translation; import org.apache.beam.runners.spark.util.BroadcastHelper; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.TupleTag; @@ -45,13 +45,13 @@ import scala.Tuple2; */ class MultiDoFnFunction<InputT, OutputT> implements PairFlatMapFunction<Iterator<WindowedValue<InputT>>, TupleTag<?>, WindowedValue<?>> { - private final DoFn<InputT, OutputT> mFunction; + private final OldDoFn<InputT, OutputT> mFunction; private final SparkRuntimeContext mRuntimeContext; private final TupleTag<OutputT> mMainOutputTag; private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs; MultiDoFnFunction( - DoFn<InputT, OutputT> fn, + OldDoFn<InputT, OutputT> fn, SparkRuntimeContext runtimeContext, TupleTag<OutputT> mainOutputTag, Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) { @@ -75,7 +75,7 @@ class MultiDoFnFunction<InputT, OutputT> private final Multimap<TupleTag<?>, WindowedValue<?>> outputs = LinkedListMultimap.create(); - ProcCtxt(DoFn<InputT, OutputT> fn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>, + ProcCtxt(OldDoFn<InputT, OutputT> fn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) { super(fn, runtimeContext, sideInputs); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java index cad2a8e..58ac03c 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java @@ -23,7 +23,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.TimerInternals; @@ -50,17 +50,17 @@ import java.util.Map; * Spark runner process context. */ public abstract class SparkProcessContext<InputT, OutputT, ValueT> - extends DoFn<InputT, OutputT>.ProcessContext { + extends OldDoFn<InputT, OutputT>.ProcessContext { private static final Logger LOG = LoggerFactory.getLogger(SparkProcessContext.class); - private final DoFn<InputT, OutputT> fn; + private final OldDoFn<InputT, OutputT> fn; private final SparkRuntimeContext mRuntimeContext; private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs; protected WindowedValue<InputT> windowedValue; - SparkProcessContext(DoFn<InputT, OutputT> fn, + SparkProcessContext(OldDoFn<InputT, OutputT> fn, SparkRuntimeContext runtime, Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) { fn.super(); @@ -135,9 +135,9 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT> @Override public BoundedWindow window() { - if (!(fn instanceof DoFn.RequiresWindowAccess)) { + if (!(fn instanceof OldDoFn.RequiresWindowAccess)) { throw new UnsupportedOperationException( - "window() is only available in the context of a DoFn marked as RequiresWindowAccess."); + "window() is only available in the context of a OldDoFn marked as RequiresWindowAccess."); } return Iterables.getOnlyElement(windowedValue.getWindows()); } @@ -200,7 +200,7 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT> protected abstract Iterator<ValueT> getOutputIterator(); protected Iterable<ValueT> getOutputIterable(final Iterator<WindowedValue<InputT>> iter, - final DoFn<InputT, OutputT> doFn) { + final OldDoFn<InputT, OutputT> doFn) { return new Iterable<ValueT>() { @Override public Iterator<ValueT> iterator() { @@ -212,11 +212,11 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT> private class ProcCtxtIterator extends AbstractIterator<ValueT> { private final Iterator<WindowedValue<InputT>> inputIterator; - private final DoFn<InputT, OutputT> doFn; + private final OldDoFn<InputT, OutputT> doFn; private Iterator<ValueT> outputIterator; private boolean calledFinish; - ProcCtxtIterator(Iterator<WindowedValue<InputT>> iterator, DoFn<InputT, OutputT> doFn) { + ProcCtxtIterator(Iterator<WindowedValue<InputT>> iterator, OldDoFn<InputT, OutputT> doFn) { this.inputIterator = iterator; this.doFn = doFn; this.outputIterator = getOutputIterator(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index c5d5802..c51a500 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -39,8 +39,8 @@ import org.apache.beam.sdk.io.AvroIO; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; @@ -94,6 +94,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; + import scala.Tuple2; /** @@ -203,7 +204,7 @@ public final class TransformTranslator { WindowingStrategy<?, W> windowingStrategy = (WindowingStrategy<?, W>) transform.getWindowingStrategy(); - DoFn<KV<K, Iterable<WindowedValue<V>>>, KV<K, Iterable<V>>> gabwDoFn = + OldDoFn<KV<K, Iterable<WindowedValue<V>>>, KV<K, Iterable<V>>> gabwDoFn = new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>( windowingStrategy, new InMemoryStateInternalsFactory<K>(), @@ -768,7 +769,7 @@ public final class TransformTranslator { && windowFn instanceof GlobalWindows)) { context.setOutputRDD(transform, inRDD); } else { - DoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn); + OldDoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn); DoFnFunction<T, T> dofn = new DoFnFunction<>(addWindowsDoFn, context.getRuntimeContext(), null); context.setOutputRDD(transform, inRDD.mapPartitions(dofn)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index 8154cd7..b0fb931 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -32,8 +32,8 @@ import org.apache.beam.sdk.io.AvroIO; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; @@ -315,7 +315,7 @@ public final class StreamingTransformTranslator { sec.setStream(transform, dStream.window(windowDuration, slideDuration)); } //--- then we apply windowing to the elements - DoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn); + OldDoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn); DoFnFunction<T, T> dofn = new DoFnFunction<>(addWindowsDoFn, ((StreamingEvaluationContext) context).getRuntimeContext(), null); @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java index d1f8d12..e4a293f 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java @@ -24,8 +24,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Keys; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.RemoveDuplicates; @@ -101,7 +101,7 @@ public class TfIdfTest { // of the words in the document associated with that that URI. PCollection<KV<URI, String>> uriToWords = uriToContent .apply("SplitWords", ParDo.of( - new DoFn<KV<URI, String>, KV<URI, String>>() { + new OldDoFn<KV<URI, String>, KV<URI, String>>() { @Override public void processElement(ProcessContext c) { URI uri = c.element().getKey(); @@ -144,7 +144,7 @@ public class TfIdfTest { // by the URI key. PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount .apply("ShiftKeys", ParDo.of( - new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() { + new OldDoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() { @Override public void processElement(ProcessContext c) { URI uri = c.element().getKey().getKey(); @@ -183,7 +183,7 @@ public class TfIdfTest { // divided by the total number of words in the document. PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal .apply("ComputeTermFrequencies", ParDo.of( - new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() { + new OldDoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() { @Override public void processElement(ProcessContext c) { URI uri = c.element().getKey(); @@ -208,7 +208,7 @@ public class TfIdfTest { PCollection<KV<String, Double>> wordToDf = wordToDocCount .apply("ComputeDocFrequencies", ParDo .withSideInputs(totalDocuments) - .of(new DoFn<KV<String, Long>, KV<String, Double>>() { + .of(new OldDoFn<KV<String, Long>, KV<String, Double>>() { @Override public void processElement(ProcessContext c) { String word = c.element().getKey(); @@ -237,7 +237,7 @@ public class TfIdfTest { // divided by the log of the document frequency. return wordToUriAndTfAndDf .apply("ComputeTfIdf", ParDo.of( - new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() { + new OldDoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() { @Override public void processElement(ProcessContext c) { String word = c.element().getKey(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java index 600217d..2e477e9 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java @@ -27,7 +27,7 @@ import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; @@ -70,7 +70,7 @@ public class CombinePerKeyTest { private static class SumPerKey<T> extends PTransform<PCollection<T>, PCollection<KV<T, Long>>> { @Override public PCollection<KV<T, Long>> apply(PCollection<T> pcol) { - PCollection<KV<T, Long>> withLongs = pcol.apply(ParDo.of(new DoFn<T, KV<T, Long>>() { + PCollection<KV<T, Long>> withLongs = pcol.apply(ParDo.of(new OldDoFn<T, KV<T, Long>>() { @Override public void processElement(ProcessContext processContext) throws Exception { processContext.output(KV.of(processContext.element(), 1L)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java index 0f60271..263ce99 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java @@ -25,7 +25,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; @@ -46,7 +46,7 @@ public class DoFnOutputTest implements Serializable { PCollection<String> strings = pipeline.apply(Create.of("a")); // Test that values written from startBundle() and finishBundle() are written to // the output - PCollection<String> output = strings.apply(ParDo.of(new DoFn<String, String>() { + PCollection<String> output = strings.apply(ParDo.of(new OldDoFn<String, String>() { @Override public void startBundle(Context c) throws Exception { c.output("start"); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java index ded3eb2..739eec3 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java @@ -30,9 +30,9 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.ApproximateUnique; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.Max; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; @@ -103,9 +103,9 @@ public class MultiOutputWordCountTest { } /** - * A DoFn that tokenizes lines of text into individual words. + * A OldDoFn that tokenizes lines of text into individual words. */ - static class ExtractWordsFn extends DoFn<String, String> { + static class ExtractWordsFn extends OldDoFn<String, String> { private final Aggregator<Integer, Integer> totalWords = createAggregator("totalWords", new Sum.SumIntegerFn()); @@ -170,7 +170,7 @@ public class MultiOutputWordCountTest { } } - private static class FormatCountsFn extends DoFn<KV<String, Long>, String> { + private static class FormatCountsFn extends OldDoFn<KV<String, Long>, String> { @Override public void processElement(ProcessContext c) { c.output(c.element().getKey() + ": " + c.element().getValue());