http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnWithContext.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnWithContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnWithContext.java index 8a83e44..b27163a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnWithContext.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnWithContext.java @@ -24,7 +24,7 @@ import static com.google.common.base.Preconditions.checkState; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.DoFn.DelegatingAggregator; +import org.apache.beam.sdk.transforms.OldDoFn.DelegatingAggregator; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -127,7 +127,7 @@ public abstract class DoFnWithContext<InputT, OutputT> implements Serializable, * * <p>If invoked from {@link ProcessElement}), the timestamp * must not be older than the input element's timestamp minus - * {@link DoFn#getAllowedTimestampSkew}. The output element will + * {@link OldDoFn#getAllowedTimestampSkew}. The output element will * be in the same windows as the input element. * * <p>If invoked from {@link StartBundle} or {@link FinishBundle}, @@ -176,7 +176,7 @@ public abstract class DoFnWithContext<InputT, OutputT> implements Serializable, * * <p>If invoked from {@link ProcessElement}), the timestamp * must not be older than the input element's timestamp minus - * {@link DoFn#getAllowedTimestampSkew}. The output element will + * {@link OldDoFn#getAllowedTimestampSkew}. The output element will * be in the same windows as the input element. * * <p>If invoked from {@link StartBundle} or {@link FinishBundle}, @@ -194,7 +194,7 @@ public abstract class DoFnWithContext<InputT, OutputT> implements Serializable, } /** - * Information accessible when running {@link DoFn#processElement}. + * Information accessible when running {@link OldDoFn#processElement}. */ public abstract class ProcessContext extends Context { @@ -358,13 +358,13 @@ public abstract class DoFnWithContext<InputT, OutputT> implements Serializable, /** * Returns an {@link Aggregator} with aggregation logic specified by the * {@link CombineFn} argument. The name provided must be unique across - * {@link Aggregator}s created within the DoFn. Aggregators can only be created + * {@link Aggregator}s created within the OldDoFn. Aggregators can only be created * during pipeline construction. * * @param name the name of the aggregator * @param combiner the {@link CombineFn} to use in the aggregator * @return an aggregator for the provided name and combiner in the scope of - * this DoFn + * this OldDoFn * @throws NullPointerException if the name or combiner is null * @throws IllegalArgumentException if the given name collides with another * aggregator in this scope @@ -391,13 +391,13 @@ public abstract class DoFnWithContext<InputT, OutputT> implements Serializable, /** * Returns an {@link Aggregator} with the aggregation logic specified by the * {@link SerializableFunction} argument. The name provided must be unique - * across {@link Aggregator}s created within the DoFn. Aggregators can only be + * across {@link Aggregator}s created within the OldDoFn. Aggregators can only be * created during pipeline construction. * * @param name the name of the aggregator * @param combiner the {@link SerializableFunction} to use in the aggregator * @return an aggregator for the provided name and combiner in the scope of - * this DoFn + * this OldDoFn * @throws NullPointerException if the name or combiner is null * @throws IllegalArgumentException if the given name collides with another * aggregator in this scope
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java index a31799e..4466874 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java @@ -202,7 +202,7 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> { @Override public PCollection<T> apply(PCollection<T> input) { - PCollection<T> output = input.apply(ParDo.of(new DoFn<T, T>() { + PCollection<T> output = input.apply(ParDo.of(new OldDoFn<T, T>() { @Override public void processElement(ProcessContext c) { if (predicate.apply(c.element()) == true) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java index 4f270a7..b48da38 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java @@ -133,7 +133,7 @@ extends PTransform<PCollection<InputT>, PCollection<OutputT>> { @Override public PCollection<OutputT> apply(PCollection<InputT> input) { - return input.apply("Map", ParDo.of(new DoFn<InputT, OutputT>() { + return input.apply("Map", ParDo.of(new OldDoFn<InputT, OutputT>() { private static final long serialVersionUID = 0L; @Override public void processElement(ProcessContext c) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java index 0b83fb6..53e898e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java @@ -174,7 +174,7 @@ public class Flatten { Coder<T> elemCoder = ((IterableLikeCoder<T, ?>) inCoder).getElemCoder(); return in.apply("FlattenIterables", ParDo.of( - new DoFn<Iterable<T>, T>() { + new OldDoFn<Iterable<T>, T>() { @Override public void processElement(ProcessContext c) { for (T i : c.element()) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java index 8ad57d2..ed7f411 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java @@ -68,7 +68,7 @@ import org.apache.beam.sdk.values.PCollection.IsBounded; * PCollection<KV<String, Iterable<Doc>>> urlToDocs = * urlDocPairs.apply(GroupByKey.<String, Doc>create()); * PCollection<R> results = - * urlToDocs.apply(ParDo.of(new DoFn<KV<String, Iterable<Doc>>, R>() { + * urlToDocs.apply(ParDo.of(new OldDoFn<KV<String, Iterable<Doc>>, R>() { * public void processElement(ProcessContext c) { * String url = c.element().getKey(); * Iterable<Doc> docsWithThatUrl = c.element().getValue(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/IntraBundleParallelization.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/IntraBundleParallelization.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/IntraBundleParallelization.java index ef1e3c6..b5fe60f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/IntraBundleParallelization.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/IntraBundleParallelization.java @@ -40,7 +40,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicReference; /** - * Provides multi-threading of {@link DoFn}s, using threaded execution to + * Provides multi-threading of {@link OldDoFn}s, using threaded execution to * process multiple elements concurrently within a bundle. * * <p>Note, that each Dataflow worker will already process multiple bundles @@ -57,7 +57,7 @@ import java.util.concurrent.atomic.AtomicReference; * share of the maximum write rate) will take at least 6 seconds to complete (there is additional * overhead in the extra parallelization). * - * <p>To parallelize a {@link DoFn} to 10 threads: + * <p>To parallelize a {@link OldDoFn} to 10 threads: * <pre>{@code * PCollection<T> data = ...; * data.apply( @@ -65,18 +65,18 @@ import java.util.concurrent.atomic.AtomicReference; * .withMaxParallelism(10))); * }</pre> * - * <p>An uncaught exception from the wrapped {@link DoFn} will result in the exception + * <p>An uncaught exception from the wrapped {@link OldDoFn} will result in the exception * being rethrown in later calls to {@link MultiThreadedIntraBundleProcessingDoFn#processElement} * or a call to {@link MultiThreadedIntraBundleProcessingDoFn#finishBundle}. */ public class IntraBundleParallelization { /** * Creates a {@link IntraBundleParallelization} {@link PTransform} for the given - * {@link DoFn} that processes elements using multiple threads. + * {@link OldDoFn} that processes elements using multiple threads. * * <p>Note that the specified {@code doFn} needs to be thread safe. */ - public static <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> doFn) { + public static <InputT, OutputT> Bound<InputT, OutputT> of(OldDoFn<InputT, OutputT> doFn) { return new Unbound().of(doFn); } @@ -92,7 +92,7 @@ public class IntraBundleParallelization { * An incomplete {@code IntraBundleParallelization} transform, with unbound input/output types. * * <p>Before being applied, {@link IntraBundleParallelization.Unbound#of} must be - * invoked to specify the {@link DoFn} to invoke, which will also + * invoked to specify the {@link OldDoFn} to invoke, which will also * bind the input/output types of this {@code PTransform}. */ public static class Unbound { @@ -118,18 +118,18 @@ public class IntraBundleParallelization { /** * Returns a new {@link IntraBundleParallelization} {@link PTransform} like this one - * with the specified {@link DoFn}. + * with the specified {@link OldDoFn}. * * <p>Note that the specified {@code doFn} needs to be thread safe. */ - public <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> doFn) { + public <InputT, OutputT> Bound<InputT, OutputT> of(OldDoFn<InputT, OutputT> doFn) { return new Bound<>(doFn, maxParallelism); } } /** * A {@code PTransform} that, when applied to a {@code PCollection<InputT>}, - * invokes a user-specified {@code DoFn<InputT, OutputT>} on all its elements, + * invokes a user-specified {@code OldDoFn<InputT, OutputT>} on all its elements, * with all its outputs collected into an output * {@code PCollection<OutputT>}. * @@ -140,10 +140,10 @@ public class IntraBundleParallelization { */ public static class Bound<InputT, OutputT> extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> { - private final DoFn<InputT, OutputT> doFn; + private final OldDoFn<InputT, OutputT> doFn; private final int maxParallelism; - Bound(DoFn<InputT, OutputT> doFn, int maxParallelism) { + Bound(OldDoFn<InputT, OutputT> doFn, int maxParallelism) { checkArgument(maxParallelism > 0, "Expected parallelism factor greater than zero, received %s.", maxParallelism); this.doFn = doFn; @@ -160,12 +160,12 @@ public class IntraBundleParallelization { /** * Returns a new {@link IntraBundleParallelization} {@link PTransform} like this one - * with the specified {@link DoFn}. + * with the specified {@link OldDoFn}. * * <p>Note that the specified {@code doFn} needs to be thread safe. */ public <NewInputT, NewOutputT> Bound<NewInputT, NewOutputT> - of(DoFn<NewInputT, NewOutputT> doFn) { + of(OldDoFn<NewInputT, NewOutputT> doFn) { return new Bound<>(doFn, maxParallelism); } @@ -188,17 +188,19 @@ public class IntraBundleParallelization { } /** - * A multi-threaded {@code DoFn} wrapper. + * A multi-threaded {@code OldDoFn} wrapper. * - * @see IntraBundleParallelization#of(DoFn) + * @see IntraBundleParallelization#of(OldDoFn) * * @param <InputT> the type of the (main) input elements * @param <OutputT> the type of the (main) output elements */ public static class MultiThreadedIntraBundleProcessingDoFn<InputT, OutputT> - extends DoFn<InputT, OutputT> { + extends OldDoFn<InputT, OutputT> { - public MultiThreadedIntraBundleProcessingDoFn(DoFn<InputT, OutputT> doFn, int maxParallelism) { + public MultiThreadedIntraBundleProcessingDoFn( + OldDoFn<InputT, OutputT> doFn, + int maxParallelism) { checkArgument(maxParallelism > 0, "Expected parallelism factor greater than zero, received %s.", maxParallelism); this.doFn = doFn; @@ -267,7 +269,7 @@ public class IntraBundleParallelization { ///////////////////////////////////////////////////////////////////////////// /** - * Wraps a DoFn context, forcing single-thread output so that threads don't + * Wraps a OldDoFn context, forcing single-thread output so that threads don't * propagate through to downstream functions. */ private class WrappedContext extends ProcessContext { @@ -347,7 +349,7 @@ public class IntraBundleParallelization { } } - private final DoFn<InputT, OutputT> doFn; + private final OldDoFn<InputT, OutputT> doFn; private int maxParallelism; private transient ExecutorService executor; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java index 636e306..c8cbce8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java @@ -58,7 +58,7 @@ public class Keys<K> extends PTransform<PCollection<? extends KV<K, ?>>, @Override public PCollection<K> apply(PCollection<? extends KV<K, ?>> in) { return - in.apply("Keys", ParDo.of(new DoFn<KV<K, ?>, K>() { + in.apply("Keys", ParDo.of(new OldDoFn<KV<K, ?>, K>() { @Override public void processElement(ProcessContext c) { c.output(c.element().getKey()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java index 9597c92..430d37b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java @@ -62,7 +62,7 @@ public class KvSwap<K, V> extends PTransform<PCollection<KV<K, V>>, @Override public PCollection<KV<V, K>> apply(PCollection<KV<K, V>> in) { return - in.apply("KvSwap", ParDo.of(new DoFn<KV<K, V>, KV<V, K>>() { + in.apply("KvSwap", ParDo.of(new OldDoFn<KV<K, V>, KV<V, K>>() { @Override public void processElement(ProcessContext c) { KV<K, V> e = c.element(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java index f535111..c83c39f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java @@ -104,7 +104,7 @@ extends PTransform<PCollection<InputT>, PCollection<OutputT>> { @Override public PCollection<OutputT> apply(PCollection<InputT> input) { - return input.apply("Map", ParDo.of(new DoFn<InputT, OutputT>() { + return input.apply("Map", ParDo.of(new OldDoFn<InputT, OutputT>() { @Override public void processElement(ProcessContext c) { c.output(fn.apply(c.element())); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java new file mode 100644 index 0000000..48c6033 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java @@ -0,0 +1,565 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowingInternals; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TypeDescriptor; + +import com.google.common.base.MoreObjects; + +import org.joda.time.Duration; +import org.joda.time.Instant; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; + +/** + * The argument to {@link ParDo} providing the code to use to process + * elements of the input + * {@link org.apache.beam.sdk.values.PCollection}. + * + * <p>See {@link ParDo} for more explanation, examples of use, and + * discussion of constraints on {@code OldDoFn}s, including their + * serializability, lack of access to global shared mutable state, + * requirements for failure tolerance, and benefits of optimization. + * + * <p>{@code OldDoFn}s can be tested in the context of a particular + * {@code Pipeline} by running that {@code Pipeline} on sample input + * and then checking its output. Unit testing of a {@code OldDoFn}, + * separately from any {@code ParDo} transform or {@code Pipeline}, + * can be done via the {@link DoFnTester} harness. + * + * <p>{@link DoFnWithContext} (currently experimental) offers an alternative + * mechanism for accessing {@link ProcessContext#window()} without the need + * to implement {@link RequiresWindowAccess}. + * + * <p>See also {@link #processElement} for details on implementing the transformation + * from {@code InputT} to {@code OutputT}. + * + * @param <InputT> the type of the (main) input elements + * @param <OutputT> the type of the (main) output elements + */ +public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDisplayData { + + /** + * Information accessible to all methods in this {@code OldDoFn}. + * Used primarily to output elements. + */ + public abstract class Context { + + /** + * Returns the {@code PipelineOptions} specified with the + * {@link org.apache.beam.sdk.runners.PipelineRunner} + * invoking this {@code OldDoFn}. The {@code PipelineOptions} will + * be the default running via {@link DoFnTester}. + */ + public abstract PipelineOptions getPipelineOptions(); + + /** + * Adds the given element to the main output {@code PCollection}. + * + * <p>Once passed to {@code output} the element should be considered + * immutable and not be modified in any way. It may be cached or retained + * by the Dataflow runtime or later steps in the pipeline, or used in + * other unspecified ways. + * + * <p>If invoked from {@link OldDoFn#processElement processElement}, the output + * element will have the same timestamp and be in the same windows + * as the input element passed to {@link OldDoFn#processElement processElement}. + * + * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle}, + * this will attempt to use the + * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} + * of the input {@code PCollection} to determine what windows the element + * should be in, throwing an exception if the {@code WindowFn} attempts + * to access any information about the input element. The output element + * will have a timestamp of negative infinity. + */ + public abstract void output(OutputT output); + + /** + * Adds the given element to the main output {@code PCollection}, + * with the given timestamp. + * + * <p>Once passed to {@code outputWithTimestamp} the element should not be + * modified in any way. + * + * <p>If invoked from {@link OldDoFn#processElement processElement}, the timestamp + * must not be older than the input element's timestamp minus + * {@link OldDoFn#getAllowedTimestampSkew getAllowedTimestampSkew}. The output element will + * be in the same windows as the input element. + * + * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle}, + * this will attempt to use the + * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} + * of the input {@code PCollection} to determine what windows the element + * should be in, throwing an exception if the {@code WindowFn} attempts + * to access any information about the input element except for the + * timestamp. + */ + public abstract void outputWithTimestamp(OutputT output, Instant timestamp); + + /** + * Adds the given element to the side output {@code PCollection} with the + * given tag. + * + * <p>Once passed to {@code sideOutput} the element should not be modified + * in any way. + * + * <p>The caller of {@code ParDo} uses {@link ParDo#withOutputTags withOutputTags} to + * specify the tags of side outputs that it consumes. Non-consumed side + * outputs, e.g., outputs for monitoring purposes only, don't necessarily + * need to be specified. + * + * <p>The output element will have the same timestamp and be in the same + * windows as the input element passed to {@link OldDoFn#processElement processElement}. + * + * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle}, + * this will attempt to use the + * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} + * of the input {@code PCollection} to determine what windows the element + * should be in, throwing an exception if the {@code WindowFn} attempts + * to access any information about the input element. The output element + * will have a timestamp of negative infinity. + * + * @see ParDo#withOutputTags + */ + public abstract <T> void sideOutput(TupleTag<T> tag, T output); + + /** + * Adds the given element to the specified side output {@code PCollection}, + * with the given timestamp. + * + * <p>Once passed to {@code sideOutputWithTimestamp} the element should not be + * modified in any way. + * + * <p>If invoked from {@link OldDoFn#processElement processElement}, the timestamp + * must not be older than the input element's timestamp minus + * {@link OldDoFn#getAllowedTimestampSkew getAllowedTimestampSkew}. The output element will + * be in the same windows as the input element. + * + * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle}, + * this will attempt to use the + * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} + * of the input {@code PCollection} to determine what windows the element + * should be in, throwing an exception if the {@code WindowFn} attempts + * to access any information about the input element except for the + * timestamp. + * + * @see ParDo#withOutputTags + */ + public abstract <T> void sideOutputWithTimestamp( + TupleTag<T> tag, T output, Instant timestamp); + + /** + * Creates an {@link Aggregator} in the {@link OldDoFn} context with the + * specified name and aggregation logic specified by {@link CombineFn}. + * + * <p>For internal use only. + * + * @param name the name of the aggregator + * @param combiner the {@link CombineFn} to use in the aggregator + * @return an aggregator for the provided name and {@link CombineFn} in this + * context + */ + @Experimental(Kind.AGGREGATOR) + protected abstract <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> + createAggregatorInternal(String name, CombineFn<AggInputT, ?, AggOutputT> combiner); + + /** + * Sets up {@link Aggregator}s created by the {@link OldDoFn} so they are + * usable within this context. + * + * <p>This method should be called by runners before {@link OldDoFn#startBundle} + * is executed. + */ + @Experimental(Kind.AGGREGATOR) + protected final void setupDelegateAggregators() { + for (DelegatingAggregator<?, ?> aggregator : aggregators.values()) { + setupDelegateAggregator(aggregator); + } + + aggregatorsAreFinal = true; + } + + private final <AggInputT, AggOutputT> void setupDelegateAggregator( + DelegatingAggregator<AggInputT, AggOutputT> aggregator) { + + Aggregator<AggInputT, AggOutputT> delegate = createAggregatorInternal( + aggregator.getName(), aggregator.getCombineFn()); + + aggregator.setDelegate(delegate); + } + } + + /** + * Information accessible when running {@link OldDoFn#processElement}. + */ + public abstract class ProcessContext extends Context { + + /** + * Returns the input element to be processed. + * + * <p>The element should be considered immutable. The Dataflow runtime will not mutate the + * element, so it is safe to cache, etc. The element should not be mutated by any of the + * {@link OldDoFn} methods, because it may be cached elsewhere, retained by the Dataflow + * runtime, or used in other unspecified ways. + */ + public abstract InputT element(); + + /** + * Returns the value of the side input for the window corresponding to the + * window of the main input element. + * + * <p>See + * {@link org.apache.beam.sdk.transforms.windowing.WindowFn#getSideInputWindow} + * for how this corresponding window is determined. + * + * @throws IllegalArgumentException if this is not a side input + * @see ParDo#withSideInputs + */ + public abstract <T> T sideInput(PCollectionView<T> view); + + /** + * Returns the timestamp of the input element. + * + * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window} + * for more information. + */ + public abstract Instant timestamp(); + + /** + * Returns the window into which the input element has been assigned. + * + * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window} + * for more information. + * + * @throws UnsupportedOperationException if this {@link OldDoFn} does + * not implement {@link RequiresWindowAccess}. + */ + public abstract BoundedWindow window(); + + /** + * Returns information about the pane within this window into which the + * input element has been assigned. + * + * <p>Generally all data is in a single, uninteresting pane unless custom + * triggering and/or late data has been explicitly requested. + * See {@link org.apache.beam.sdk.transforms.windowing.Window} + * for more information. + */ + public abstract PaneInfo pane(); + + /** + * Returns the process context to use for implementing windowing. + */ + @Experimental + public abstract WindowingInternals<InputT, OutputT> windowingInternals(); + } + + /** + * Returns the allowed timestamp skew duration, which is the maximum + * duration that timestamps can be shifted backward in + * {@link OldDoFn.Context#outputWithTimestamp}. + * + * <p>The default value is {@code Duration.ZERO}, in which case + * timestamps can only be shifted forward to future. For infinite + * skew, return {@code Duration.millis(Long.MAX_VALUE)}. + * + * <p> Note that producing an element whose timestamp is less than the + * current timestamp may result in late data, i.e. returning a non-zero + * value here does not impact watermark calculations used for firing + * windows. + * + * @deprecated does not interact well with the watermark. + */ + @Deprecated + public Duration getAllowedTimestampSkew() { + return Duration.ZERO; + } + + /** + * Interface for signaling that a {@link OldDoFn} needs to access the window the + * element is being processed in, via {@link OldDoFn.ProcessContext#window}. + */ + @Experimental + public interface RequiresWindowAccess {} + + public OldDoFn() { + this(new HashMap<String, DelegatingAggregator<?, ?>>()); + } + + OldDoFn(Map<String, DelegatingAggregator<?, ?>> aggregators) { + this.aggregators = aggregators; + } + + ///////////////////////////////////////////////////////////////////////////// + + private final Map<String, DelegatingAggregator<?, ?>> aggregators; + + /** + * Protects aggregators from being created after initialization. + */ + private boolean aggregatorsAreFinal; + + /** + * Prepares this {@code OldDoFn} instance for processing a batch of elements. + * + * <p>By default, does nothing. + */ + public void startBundle(Context c) throws Exception { + } + + /** + * Processes one input element. + * + * <p>The current element of the input {@code PCollection} is returned by + * {@link ProcessContext#element() c.element()}. It should be considered immutable. The Dataflow + * runtime will not mutate the element, so it is safe to cache, etc. The element should not be + * mutated by any of the {@link OldDoFn} methods, because it may be cached elsewhere, retained by + * the Dataflow runtime, or used in other unspecified ways. + * + * <p>A value is added to the main output {@code PCollection} by {@link ProcessContext#output}. + * Once passed to {@code output} the element should be considered immutable and not be modified in + * any way. It may be cached elsewhere, retained by the Dataflow runtime, or used in other + * unspecified ways. + * + * @see ProcessContext + */ + public abstract void processElement(ProcessContext c) throws Exception; + + /** + * Finishes processing this batch of elements. + * + * <p>By default, does nothing. + */ + public void finishBundle(Context c) throws Exception { + } + + /** + * {@inheritDoc} + * + * <p>By default, does not register any display data. Implementors may override this method + * to provide their own display data. + */ + @Override + public void populateDisplayData(DisplayData.Builder builder) { + } + + ///////////////////////////////////////////////////////////////////////////// + + /** + * Returns a {@link TypeDescriptor} capturing what is known statically + * about the input type of this {@code OldDoFn} instance's most-derived + * class. + * + * <p>See {@link #getOutputTypeDescriptor} for more discussion. + */ + protected TypeDescriptor<InputT> getInputTypeDescriptor() { + return new TypeDescriptor<InputT>(getClass()) {}; + } + + /** + * Returns a {@link TypeDescriptor} capturing what is known statically + * about the output type of this {@code OldDoFn} instance's + * most-derived class. + * + * <p>In the normal case of a concrete {@code OldDoFn} subclass with + * no generic type parameters of its own (including anonymous inner + * classes), this will be a complete non-generic type, which is good + * for choosing a default output {@code Coder<OutputT>} for the output + * {@code PCollection<OutputT>}. + */ + protected TypeDescriptor<OutputT> getOutputTypeDescriptor() { + return new TypeDescriptor<OutputT>(getClass()) {}; + } + + /** + * Returns an {@link Aggregator} with aggregation logic specified by the + * {@link CombineFn} argument. The name provided must be unique across + * {@link Aggregator}s created within the OldDoFn. Aggregators can only be created + * during pipeline construction. + * + * @param name the name of the aggregator + * @param combiner the {@link CombineFn} to use in the aggregator + * @return an aggregator for the provided name and combiner in the scope of + * this OldDoFn + * @throws NullPointerException if the name or combiner is null + * @throws IllegalArgumentException if the given name collides with another + * aggregator in this scope + * @throws IllegalStateException if called during pipeline processing. + */ + protected final <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> + createAggregator(String name, CombineFn<? super AggInputT, ?, AggOutputT> combiner) { + checkNotNull(name, "name cannot be null"); + checkNotNull(combiner, "combiner cannot be null"); + checkArgument(!aggregators.containsKey(name), + "Cannot create aggregator with name %s." + + " An Aggregator with that name already exists within this scope.", + name); + + checkState(!aggregatorsAreFinal, "Cannot create an aggregator during OldDoFn processing." + + " Aggregators should be registered during pipeline construction."); + + DelegatingAggregator<AggInputT, AggOutputT> aggregator = + new DelegatingAggregator<>(name, combiner); + aggregators.put(name, aggregator); + return aggregator; + } + + /** + * Returns an {@link Aggregator} with the aggregation logic specified by the + * {@link SerializableFunction} argument. The name provided must be unique + * across {@link Aggregator}s created within the OldDoFn. Aggregators can only be + * created during pipeline construction. + * + * @param name the name of the aggregator + * @param combiner the {@link SerializableFunction} to use in the aggregator + * @return an aggregator for the provided name and combiner in the scope of + * this OldDoFn + * @throws NullPointerException if the name or combiner is null + * @throws IllegalArgumentException if the given name collides with another + * aggregator in this scope + * @throws IllegalStateException if called during pipeline processing. + */ + protected final <AggInputT> Aggregator<AggInputT, AggInputT> createAggregator(String name, + SerializableFunction<Iterable<AggInputT>, AggInputT> combiner) { + checkNotNull(combiner, "combiner cannot be null."); + return createAggregator(name, Combine.IterableCombineFn.of(combiner)); + } + + /** + * Returns the {@link Aggregator Aggregators} created by this {@code OldDoFn}. + */ + Collection<Aggregator<?, ?>> getAggregators() { + return Collections.<Aggregator<?, ?>>unmodifiableCollection(aggregators.values()); + } + + /** + * An {@link Aggregator} that delegates calls to addValue to another + * aggregator. + * + * @param <AggInputT> the type of input element + * @param <AggOutputT> the type of output element + */ + static class DelegatingAggregator<AggInputT, AggOutputT> implements + Aggregator<AggInputT, AggOutputT>, Serializable { + private final UUID id; + + private final String name; + + private final CombineFn<AggInputT, ?, AggOutputT> combineFn; + + private Aggregator<AggInputT, ?> delegate; + + public DelegatingAggregator(String name, + CombineFn<? super AggInputT, ?, AggOutputT> combiner) { + this.id = UUID.randomUUID(); + this.name = checkNotNull(name, "name cannot be null"); + // Safe contravariant cast + @SuppressWarnings("unchecked") + CombineFn<AggInputT, ?, AggOutputT> specificCombiner = + (CombineFn<AggInputT, ?, AggOutputT>) checkNotNull(combiner, "combineFn cannot be null"); + this.combineFn = specificCombiner; + } + + @Override + public void addValue(AggInputT value) { + if (delegate == null) { + throw new IllegalStateException( + "addValue cannot be called on Aggregator outside of the execution of a OldDoFn."); + } else { + delegate.addValue(value); + } + } + + @Override + public String getName() { + return name; + } + + @Override + public CombineFn<AggInputT, ?, AggOutputT> getCombineFn() { + return combineFn; + } + + /** + * Sets the current delegate of the Aggregator. + * + * @param delegate the delegate to set in this aggregator + */ + public void setDelegate(Aggregator<AggInputT, ?> delegate) { + this.delegate = delegate; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(getClass()) + .add("name", name) + .add("combineFn", combineFn) + .toString(); + } + + @Override + public int hashCode() { + return Objects.hash(id, name, combineFn.getClass()); + } + + /** + * Indicates whether some other object is "equal to" this one. + * + * <p>{@code DelegatingAggregator} instances are equal if they have the same name, their + * CombineFns are the same class, and they have identical IDs. + */ + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } + if (o == null) { + return false; + } + if (o instanceof DelegatingAggregator) { + DelegatingAggregator<?, ?> that = (DelegatingAggregator<?, ?>) o; + return Objects.equals(this.id, that.id) + && Objects.equals(this.name, that.name) + && Objects.equals(this.combineFn.getClass(), that.combineFn.getClass()); + } + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java index fe6e8ad..12ab54d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java @@ -147,7 +147,7 @@ import java.io.Serializable; * implementing {@code Serializable}. * * <p>{@code PTransform} is marked {@code Serializable} solely - * because it is common for an anonymous {@code DoFn}, + * because it is common for an anonymous {@code OldDoFn}, * instance to be created within an * {@code apply()} method of a composite {@code PTransform}. * http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index 16dfcac..36d8101 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -52,13 +52,13 @@ import java.util.List; * <p>The {@link ParDo} processing style is similar to what happens inside * the "Mapper" or "Reducer" class of a MapReduce-style algorithm. * - * <h2>{@link DoFn DoFns}</h2> + * <h2>{@link OldDoFn DoFns}</h2> * * <p>The function to use to process each element is specified by a - * {@link DoFn DoFn<InputT, OutputT>}, primarily via its - * {@link DoFn#processElement processElement} method. The {@link DoFn} may also - * override the default implementations of {@link DoFn#startBundle startBundle} - * and {@link DoFn#finishBundle finishBundle}. + * {@link OldDoFn OldDoFn<InputT, OutputT>}, primarily via its + * {@link OldDoFn#processElement processElement} method. The {@link OldDoFn} may also + * override the default implementations of {@link OldDoFn#startBundle startBundle} + * and {@link OldDoFn#finishBundle finishBundle}. * * <p>Conceptually, when a {@link ParDo} transform is executed, the * elements of the input {@link PCollection} are first divided up @@ -67,26 +67,27 @@ import java.util.List; * For each bundle of input elements processing proceeds as follows: * * <ol> - * <li>If required, a fresh instance of the argument {@link DoFn} is created + * <li>If required, a fresh instance of the argument {@link OldDoFn} is created * on a worker. This may be through deserialization or other means. A - * {@link PipelineRunner} may reuse {@link DoFn} instances for multiple bundles. - * A {@link DoFn} that has terminated abnormally (by throwing an {@link Exception} + * {@link PipelineRunner} may reuse {@link OldDoFn} instances for multiple bundles. + * A {@link OldDoFn} that has terminated abnormally (by throwing an {@link Exception} * will never be reused.</li> - * <li>The {@link DoFn DoFn's} {@link DoFn#startBundle} method is called to + * <li>The {@link OldDoFn OldDoFn's} {@link OldDoFn#startBundle} method is called to * initialize it. If this method is not overridden, the call may be optimized * away.</li> - * <li>The {@link DoFn DoFn's} {@link DoFn#processElement} method + * <li>The {@link OldDoFn OldDoFn's} {@link OldDoFn#processElement} method * is called on each of the input elements in the bundle.</li> - * <li>The {@link DoFn DoFn's} {@link DoFn#finishBundle} method is called - * to complete its work. After {@link DoFn#finishBundle} is called, the - * framework will not again invoke {@link DoFn#processElement} or {@link DoFn#finishBundle} - * until a new call to {@link DoFn#startBundle} has occurred. + * <li>The {@link OldDoFn OldDoFn's} {@link OldDoFn#finishBundle} method is called + * to complete its work. After {@link OldDoFn#finishBundle} is called, the + * framework will not again invoke {@link OldDoFn#processElement} or + * {@link OldDoFn#finishBundle} + * until a new call to {@link OldDoFn#startBundle} has occurred. * If this method is not overridden, this call may be optimized away.</li> * </ol> * - * Each of the calls to any of the {@link DoFn DoFn's} processing + * Each of the calls to any of the {@link OldDoFn OldDoFn's} processing * methods can produce zero or more output elements. All of the - * of output elements from all of the {@link DoFn} instances + * of output elements from all of the {@link OldDoFn} instances * are included in the output {@link PCollection}. * * <p>For example: @@ -94,7 +95,7 @@ import java.util.List; * <pre> {@code * PCollection<String> lines = ...; * PCollection<String> words = - * lines.apply(ParDo.of(new DoFn<String, String>() { + * lines.apply(ParDo.of(new OldDoFn<String, String>() { * public void processElement(ProcessContext c) { * String line = c.element(); * for (String word : line.split("[^a-zA-Z']+")) { @@ -102,7 +103,7 @@ import java.util.List; * } * }})); * PCollection<Integer> wordLengths = - * words.apply(ParDo.of(new DoFn<String, Integer>() { + * words.apply(ParDo.of(new OldDoFn<String, Integer>() { * public void processElement(ProcessContext c) { * String word = c.element(); * Integer length = word.length(); @@ -127,9 +128,9 @@ import java.util.List; * * <pre> {@code * PCollection<String> words = - * lines.apply("ExtractWords", ParDo.of(new DoFn<String, String>() { ... })); + * lines.apply("ExtractWords", ParDo.of(new OldDoFn<String, String>() { ... })); * PCollection<Integer> wordLengths = - * words.apply("ComputeWordLengths", ParDo.of(new DoFn<String, Integer>() { ... })); + * words.apply("ComputeWordLengths", ParDo.of(new OldDoFn<String, Integer>() { ... })); * } </pre> * * <h2>Side Inputs</h2> @@ -141,7 +142,7 @@ import java.util.List; * {@link PCollection PCollections} computed by earlier pipeline operations, * passed in to the {@link ParDo} transform using * {@link #withSideInputs}, and their contents accessible to each of - * the {@link DoFn} operations via {@link DoFn.ProcessContext#sideInput sideInput}. + * the {@link OldDoFn} operations via {@link OldDoFn.ProcessContext#sideInput sideInput}. * For example: * * <pre> {@code @@ -151,7 +152,7 @@ import java.util.List; * maxWordLengthCutOff.apply(View.<Integer>asSingleton()); * PCollection<String> wordsBelowCutOff = * words.apply(ParDo.withSideInputs(maxWordLengthCutOffView) - * .of(new DoFn<String, String>() { + * .of(new OldDoFn<String, String>() { * public void processElement(ProcessContext c) { * String word = c.element(); * int lengthCutOff = c.sideInput(maxWordLengthCutOffView); @@ -170,11 +171,11 @@ import java.util.List; * and bundled in a {@link PCollectionTuple}. The {@link TupleTag TupleTags} * to be used for the output {@link PCollectionTuple} are specified by * invoking {@link #withOutputTags}. Unconsumed side outputs do not - * necessarily need to be explicitly specified, even if the {@link DoFn} - * generates them. Within the {@link DoFn}, an element is added to the + * necessarily need to be explicitly specified, even if the {@link OldDoFn} + * generates them. Within the {@link OldDoFn}, an element is added to the * main output {@link PCollection} as normal, using - * {@link DoFn.Context#output}, while an element is added to a side output - * {@link PCollection} using {@link DoFn.Context#sideOutput}. For example: + * {@link OldDoFn.Context#output}, while an element is added to a side output + * {@link PCollection} using {@link OldDoFn.Context#sideOutput}. For example: * * <pre> {@code * PCollection<String> words = ...; @@ -197,7 +198,7 @@ import java.util.List; * .withOutputTags(wordsBelowCutOffTag, * TupleTagList.of(wordLengthsAboveCutOffTag) * .and(markedWordsTag)) - * .of(new DoFn<String, String>() { + * .of(new OldDoFn<String, String>() { * // Create a tag for the unconsumed side output. * final TupleTag<String> specialWordsTag = * new TupleTag<String>(){}; @@ -232,7 +233,7 @@ import java.util.List; * * <p>Several properties can be specified for a {@link ParDo} * {@link PTransform}, including name, side inputs, side output tags, - * and {@link DoFn} to invoke. Only the {@link DoFn} is required; the + * and {@link OldDoFn} to invoke. Only the {@link OldDoFn} is required; the * name is encouraged but not required, and side inputs and side * output tags are only specified when they're needed. These * properties can be specified in any order, as long as they're @@ -246,23 +247,23 @@ import java.util.List; * {@link ParDo.Bound} nested classes, each of which offer * property setter instance methods to enable setting additional * properties. {@link ParDo.Bound} is used for {@link ParDo} - * transforms whose {@link DoFn} is specified and whose input and + * transforms whose {@link OldDoFn} is specified and whose input and * output static types have been bound. {@link ParDo.Unbound ParDo.Unbound} is used * for {@link ParDo} transforms that have not yet had their - * {@link DoFn} specified. Only {@link ParDo.Bound} instances can be + * {@link OldDoFn} specified. Only {@link ParDo.Bound} instances can be * applied. * * <p>Another benefit of this approach is that it reduces the number * of type parameters that need to be specified manually. In * particular, the input and output types of the {@link ParDo} * {@link PTransform} are inferred automatically from the type - * parameters of the {@link DoFn} argument passed to {@link ParDo#of}. + * parameters of the {@link OldDoFn} argument passed to {@link ParDo#of}. * * <h2>Output Coders</h2> * * <p>By default, the {@link Coder Coder<OutputT>} for the * elements of the main output {@link PCollection PCollection<OutputT>} is - * inferred from the concrete type of the {@link DoFn DoFn<InputT, OutputT>}. + * inferred from the concrete type of the {@link OldDoFn OldDoFn<InputT, OutputT>}. * * <p>By default, the {@link Coder Coder<SideOutputT>} for the elements of * a side output {@link PCollection PCollection<SideOutputT>} is inferred @@ -282,74 +283,74 @@ import java.util.List; * This style of {@code TupleTag} instantiation is used in the example of * multiple side outputs, above. * - * <h2>Serializability of {@link DoFn DoFns}</h2> + * <h2>Serializability of {@link OldDoFn DoFns}</h2> * - * <p>A {@link DoFn} passed to a {@link ParDo} transform must be - * {@link Serializable}. This allows the {@link DoFn} instance + * <p>A {@link OldDoFn} passed to a {@link ParDo} transform must be + * {@link Serializable}. This allows the {@link OldDoFn} instance * created in this "main program" to be sent (in serialized form) to * remote worker machines and reconstituted for bundles of elements - * of the input {@link PCollection} being processed. A {@link DoFn} + * of the input {@link PCollection} being processed. A {@link OldDoFn} * can have instance variable state, and non-transient instance * variable state will be serialized in the main program and then * deserialized on remote worker machines for some number of bundles * of elements to process. * - * <p>{@link DoFn DoFns} expressed as anonymous inner classes can be + * <p>{@link OldDoFn DoFns} expressed as anonymous inner classes can be * convenient, but due to a quirk in Java's rules for serializability, * non-static inner or nested classes (including anonymous inner * classes) automatically capture their enclosing class's instance in * their serialized state. This can lead to including much more than - * intended in the serialized state of a {@link DoFn}, or even things + * intended in the serialized state of a {@link OldDoFn}, or even things * that aren't {@link Serializable}. * * <p>There are two ways to avoid unintended serialized state in a - * {@link DoFn}: + * {@link OldDoFn}: * * <ul> * - * <li>Define the {@link DoFn} as a named, static class. + * <li>Define the {@link OldDoFn} as a named, static class. * - * <li>Define the {@link DoFn} as an anonymous inner class inside of + * <li>Define the {@link OldDoFn} as an anonymous inner class inside of * a static method. * * </ul> * * <p>Both of these approaches ensure that there is no implicit enclosing - * instance serialized along with the {@link DoFn} instance. + * instance serialized along with the {@link OldDoFn} instance. * * <p>Prior to Java 8, any local variables of the enclosing * method referenced from within an anonymous inner class need to be - * marked as {@code final}. If defining the {@link DoFn} as a named + * marked as {@code final}. If defining the {@link OldDoFn} as a named * static class, such variables would be passed as explicit * constructor arguments and stored in explicit instance variables. * * <p>There are three main ways to initialize the state of a - * {@link DoFn} instance processing a bundle: + * {@link OldDoFn} instance processing a bundle: * * <ul> * * <li>Define instance variable state (including implicit instance * variables holding final variables captured by an anonymous inner - * class), initialized by the {@link DoFn}'s constructor (which is + * class), initialized by the {@link OldDoFn}'s constructor (which is * implicit for an anonymous inner class). This state will be - * automatically serialized and then deserialized in the {@code DoFn} + * automatically serialized and then deserialized in the {@code OldDoFn} * instances created for bundles. This method is good for state - * known when the original {@code DoFn} is created in the main + * known when the original {@code OldDoFn} is created in the main * program, if it's not overly large. This is not suitable for any - * state which must only be used for a single bundle, as {@link DoFn DoFn's} + * state which must only be used for a single bundle, as {@link OldDoFn OldDoFn's} * may be used to process multiple bundles. * * <li>Compute the state as a singleton {@link PCollection} and pass it - * in as a side input to the {@link DoFn}. This is good if the state + * in as a side input to the {@link OldDoFn}. This is good if the state * needs to be computed by the pipeline, or if the state is very large * and so is best read from file(s) rather than sent as part of the - * {@code DoFn}'s serialized state. + * {@code OldDoFn}'s serialized state. * - * <li>Initialize the state in each {@link DoFn} instance, in - * {@link DoFn#startBundle}. This is good if the initialization + * <li>Initialize the state in each {@link OldDoFn} instance, in + * {@link OldDoFn#startBundle}. This is good if the initialization * doesn't depend on any information known only by the main program or * computed by earlier pipeline operations, but is the same for all - * instances of this {@link DoFn} for all program executions, say + * instances of this {@link OldDoFn} for all program executions, say * setting up empty caches or initializing constant data. * * </ul> @@ -362,13 +363,13 @@ import java.util.List; * no support in the Google Cloud Dataflow system for communicating * and synchronizing updates to shared state across worker machines, * so programs should not access any mutable static variable state in - * their {@link DoFn}, without understanding that the Java processes + * their {@link OldDoFn}, without understanding that the Java processes * for the main program and workers will each have its own independent * copy of such state, and there won't be any automatic copying of * that state across Java processes. All information should be - * communicated to {@link DoFn} instances via main and side inputs and + * communicated to {@link OldDoFn} instances via main and side inputs and * serialized state, and all output should be communicated from a - * {@link DoFn} instance via main and side outputs, in the absence of + * {@link OldDoFn} instance via main and side outputs, in the absence of * external communication mechanisms written by user code. * * <h2>Fault Tolerance</h2> @@ -378,23 +379,23 @@ import java.util.List; * While individual failures are rare, the larger the job, the greater * the chance that something, somewhere, will fail. The Google Cloud * Dataflow service strives to mask such failures automatically, - * principally by retrying failed {@link DoFn} bundle. This means - * that a {@code DoFn} instance might process a bundle partially, then + * principally by retrying failed {@link OldDoFn} bundle. This means + * that a {@code OldDoFn} instance might process a bundle partially, then * crash for some reason, then be rerun (often on a different worker * machine) on that same bundle and on the same elements as before. - * Sometimes two or more {@link DoFn} instances will be running on the + * Sometimes two or more {@link OldDoFn} instances will be running on the * same bundle simultaneously, with the system taking the results of * the first instance to complete successfully. Consequently, the - * code in a {@link DoFn} needs to be written such that these + * code in a {@link OldDoFn} needs to be written such that these * duplicate (sequential or concurrent) executions do not cause - * problems. If the outputs of a {@link DoFn} are a pure function of + * problems. If the outputs of a {@link OldDoFn} are a pure function of * its inputs, then this requirement is satisfied. However, if a - * {@link DoFn DoFn's} execution has external side-effects, such as performing - * updates to external HTTP services, then the {@link DoFn DoFn's} code + * {@link OldDoFn OldDoFn's} execution has external side-effects, such as performing + * updates to external HTTP services, then the {@link OldDoFn OldDoFn's} code * needs to take care to ensure that those updates are idempotent and * that concurrent updates are acceptable. This property can be * difficult to achieve, so it is advisable to strive to keep - * {@link DoFn DoFns} as pure functions as much as possible. + * {@link OldDoFn DoFns} as pure functions as much as possible. * * <h2>Optimization</h2> * @@ -439,15 +440,15 @@ public class ParDo { * * <p>Side inputs are {@link PCollectionView PCollectionViews}, whose contents are * computed during pipeline execution and then made accessible to - * {@link DoFn} code via {@link DoFn.ProcessContext#sideInput sideInput}. Each - * invocation of the {@link DoFn} receives the same values for these + * {@link OldDoFn} code via {@link OldDoFn.ProcessContext#sideInput sideInput}. Each + * invocation of the {@link OldDoFn} receives the same values for these * side inputs. * * <p>See the discussion of Side Inputs above for more explanation. * * <p>The resulting {@link PTransform} is incomplete, and its * input/output types are not yet bound. Use - * {@link ParDo.Unbound#of} to specify the {@link DoFn} to + * {@link ParDo.Unbound#of} to specify the {@link OldDoFn} to * invoke, which will also bind the input/output types of this * {@link PTransform}. */ @@ -460,13 +461,13 @@ public class ParDo { * * <p>Side inputs are {@link PCollectionView}s, whose contents are * computed during pipeline execution and then made accessible to - * {@code DoFn} code via {@link DoFn.ProcessContext#sideInput sideInput}. + * {@code OldDoFn} code via {@link OldDoFn.ProcessContext#sideInput sideInput}. * * <p>See the discussion of Side Inputs above for more explanation. * * <p>The resulting {@link PTransform} is incomplete, and its * input/output types are not yet bound. Use - * {@link ParDo.Unbound#of} to specify the {@link DoFn} to + * {@link ParDo.Unbound#of} to specify the {@link OldDoFn} to * invoke, which will also bind the input/output types of this * {@link PTransform}. */ @@ -482,11 +483,11 @@ public class ParDo { * * <p>{@link TupleTag TupleTags} are used to name (with its static element * type {@code T}) each main and side output {@code PCollection<T>}. - * This {@link PTransform PTransform's} {@link DoFn} emits elements to the main + * This {@link PTransform PTransform's} {@link OldDoFn} emits elements to the main * output {@link PCollection} as normal, using - * {@link DoFn.Context#output}. The {@link DoFn} emits elements to + * {@link OldDoFn.Context#output}. The {@link OldDoFn} emits elements to * a side output {@code PCollection} using - * {@link DoFn.Context#sideOutput}, passing that side output's tag + * {@link OldDoFn.Context#sideOutput}, passing that side output's tag * as an argument. The result of invoking this {@link PTransform} * will be a {@link PCollectionTuple}, and any of the the main and * side output {@code PCollection}s can be retrieved from it via @@ -497,7 +498,7 @@ public class ParDo { * * <p>The resulting {@link PTransform} is incomplete, and its input * type is not yet bound. Use {@link ParDo.UnboundMulti#of} - * to specify the {@link DoFn} to invoke, which will also bind the + * to specify the {@link OldDoFn} to invoke, which will also bind the * input type of this {@link PTransform}. */ public static <OutputT> UnboundMulti<OutputT> withOutputTags( @@ -508,24 +509,24 @@ public class ParDo { /** * Creates a {@link ParDo} {@link PTransform} that will invoke the - * given {@link DoFn} function. + * given {@link OldDoFn} function. * * <p>The resulting {@link PTransform PTransform's} types have been bound, with the * input being a {@code PCollection<InputT>} and the output a * {@code PCollection<OutputT>}, inferred from the types of the argument - * {@code DoFn<InputT, OutputT>}. It is ready to be applied, or further + * {@code OldDoFn<InputT, OutputT>}. It is ready to be applied, or further * properties can be set on it first. */ - public static <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn) { + public static <InputT, OutputT> Bound<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn) { return of(fn, fn.getClass()); } private static <InputT, OutputT> Bound<InputT, OutputT> of( - DoFn<InputT, OutputT> fn, Class<?> fnClass) { + OldDoFn<InputT, OutputT> fn, Class<?> fnClass) { return new Unbound().of(fn, fnClass); } - private static <InputT, OutputT> DoFn<InputT, OutputT> + private static <InputT, OutputT> OldDoFn<InputT, OutputT> adapt(DoFnWithContext<InputT, OutputT> fn) { return DoFnReflector.of(fn.getClass()).toDoFn(fn); } @@ -537,11 +538,11 @@ public class ParDo { * <p>The resulting {@link PTransform PTransform's} types have been bound, with the * input being a {@code PCollection<InputT>} and the output a * {@code PCollection<OutputT>}, inferred from the types of the argument - * {@code DoFn<InputT, OutputT>}. It is ready to be applied, or further + * {@code OldDoFn<InputT, OutputT>}. It is ready to be applied, or further * properties can be set on it first. * * <p>{@link DoFnWithContext} is an experimental alternative to - * {@link DoFn} which simplifies accessing the window of the element. + * {@link OldDoFn} which simplifies accessing the window of the element. */ @Experimental public static <InputT, OutputT> Bound<InputT, OutputT> of(DoFnWithContext<InputT, OutputT> fn) { @@ -552,7 +553,7 @@ public class ParDo { * An incomplete {@link ParDo} transform, with unbound input/output types. * * <p>Before being applied, {@link ParDo.Unbound#of} must be - * invoked to specify the {@link DoFn} to invoke, which will also + * invoked to specify the {@link OldDoFn} to invoke, which will also * bind the input/output types of this {@link PTransform}. */ public static class Unbound { @@ -614,18 +615,18 @@ public class ParDo { /** * Returns a new {@link ParDo} {@link PTransform} that's like this - * transform but that will invoke the given {@link DoFn} + * transform but that will invoke the given {@link OldDoFn} * function, and that has its input and output types bound. Does * not modify this transform. The resulting {@link PTransform} is * sufficiently specified to be applied, but more properties can * still be specified. */ - public <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn) { + public <InputT, OutputT> Bound<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn) { return of(fn, fn.getClass()); } private <InputT, OutputT> Bound<InputT, OutputT> of( - DoFn<InputT, OutputT> fn, Class<?> fnClass) { + OldDoFn<InputT, OutputT> fn, Class<?> fnClass) { return new Bound<>(name, sideInputs, fn, fnClass); } @@ -645,7 +646,7 @@ public class ParDo { /** * A {@link PTransform} that, when applied to a {@code PCollection<InputT>}, - * invokes a user-specified {@code DoFn<InputT, OutputT>} on all its elements, + * invokes a user-specified {@code OldDoFn<InputT, OutputT>} on all its elements, * with all its outputs collected into an output * {@code PCollection<OutputT>}. * @@ -659,12 +660,12 @@ public class ParDo { extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> { // Inherits name. private final List<PCollectionView<?>> sideInputs; - private final DoFn<InputT, OutputT> fn; + private final OldDoFn<InputT, OutputT> fn; private final Class<?> fnClass; Bound(String name, List<PCollectionView<?>> sideInputs, - DoFn<InputT, OutputT> fn, + OldDoFn<InputT, OutputT> fn, Class<?> fnClass) { super(name); this.sideInputs = sideInputs; @@ -746,9 +747,9 @@ public class ParDo { /** * {@inheritDoc} * - * <p>{@link ParDo} registers its internal {@link DoFn} as a subcomponent for display data. - * {@link DoFn} implementations can register display data by overriding - * {@link DoFn#populateDisplayData}. + * <p>{@link ParDo} registers its internal {@link OldDoFn} as a subcomponent for display data. + * {@link OldDoFn} implementations can register display data by overriding + * {@link OldDoFn#populateDisplayData}. */ @Override public void populateDisplayData(Builder builder) { @@ -756,7 +757,7 @@ public class ParDo { ParDo.populateDisplayData(builder, fn, fnClass); } - public DoFn<InputT, OutputT> getFn() { + public OldDoFn<InputT, OutputT> getFn() { return fn; } @@ -770,7 +771,7 @@ public class ParDo { * input type. * * <p>Before being applied, {@link ParDo.UnboundMulti#of} must be - * invoked to specify the {@link DoFn} to invoke, which will also + * invoked to specify the {@link OldDoFn} to invoke, which will also * bind the input type of this {@link PTransform}. * * @param <OutputT> the type of the main output {@code PCollection} elements @@ -827,16 +828,16 @@ public class ParDo { /** * Returns a new multi-output {@link ParDo} {@link PTransform} * that's like this transform but that will invoke the given - * {@link DoFn} function, and that has its input type bound. + * {@link OldDoFn} function, and that has its input type bound. * Does not modify this transform. The resulting * {@link PTransform} is sufficiently specified to be applied, but * more properties can still be specified. */ - public <InputT> BoundMulti<InputT, OutputT> of(DoFn<InputT, OutputT> fn) { + public <InputT> BoundMulti<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn) { return of(fn, fn.getClass()); } - public <InputT> BoundMulti<InputT, OutputT> of(DoFn<InputT, OutputT> fn, Class<?> fnClass) { + public <InputT> BoundMulti<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn, Class<?> fnClass) { return new BoundMulti<>( name, sideInputs, mainOutputTag, sideOutputTags, fn, fnClass); } @@ -857,7 +858,7 @@ public class ParDo { /** * A {@link PTransform} that, when applied to a * {@code PCollection<InputT>}, invokes a user-specified - * {@code DoFn<InputT, OutputT>} on all its elements, which can emit elements + * {@code OldDoFn<InputT, OutputT>} on all its elements, which can emit elements * to any of the {@link PTransform}'s main and side output * {@code PCollection}s, which are bundled into a result * {@code PCollectionTuple}. @@ -871,14 +872,14 @@ public class ParDo { private final List<PCollectionView<?>> sideInputs; private final TupleTag<OutputT> mainOutputTag; private final TupleTagList sideOutputTags; - private final DoFn<InputT, OutputT> fn; + private final OldDoFn<InputT, OutputT> fn; private final Class<?> fnClass; BoundMulti(String name, List<PCollectionView<?>> sideInputs, TupleTag<OutputT> mainOutputTag, TupleTagList sideOutputTags, - DoFn<InputT, OutputT> fn, + OldDoFn<InputT, OutputT> fn, Class<?> fnClass) { super(name); this.sideInputs = sideInputs; @@ -929,7 +930,7 @@ public class ParDo { input.isBounded()); // The fn will likely be an instance of an anonymous subclass - // such as DoFn<Integer, String> { }, thus will have a high-fidelity + // such as OldDoFn<Integer, String> { }, thus will have a high-fidelity // TypeDescriptor for the output type. outputs.get(mainOutputTag).setTypeDescriptorInternal(fn.getOutputTypeDescriptor()); @@ -970,7 +971,7 @@ public class ParDo { ParDo.populateDisplayData(builder, fn, fnClass); } - public DoFn<InputT, OutputT> getFn() { + public OldDoFn<InputT, OutputT> getFn() { return fn; } @@ -988,7 +989,7 @@ public class ParDo { } private static void populateDisplayData( - DisplayData.Builder builder, DoFn<?, ?> fn, Class<?> fnClass) { + DisplayData.Builder builder, OldDoFn<?, ?> fn, Class<?> fnClass) { builder .include(fn) .add(DisplayData.item("fn", fnClass) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java index 6281b30..2ddcc29 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java @@ -134,7 +134,7 @@ public class Partition<T> extends PTransform<PCollection<T>, PCollectionList<T>> this.partitionDoFn = partitionDoFn; } - private static class PartitionDoFn<X> extends DoFn<X, Void> { + private static class PartitionDoFn<X> extends OldDoFn<X, Void> { private final int numPartitions; private final PartitionFn<? super X> partitionFn; private final TupleTagList outputTags; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java index b82744d..d82c457 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java @@ -85,7 +85,7 @@ public class RemoveDuplicates<T> extends PTransform<PCollection<T>, @Override public PCollection<T> apply(PCollection<T> in) { return in - .apply("CreateIndex", ParDo.of(new DoFn<T, KV<T, Void>>() { + .apply("CreateIndex", ParDo.of(new OldDoFn<T, KV<T, Void>>() { @Override public void processElement(ProcessContext c) { c.output(KV.of(c.element(), (Void) null)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java index 4fcd17e..724b252 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java @@ -164,9 +164,9 @@ public class Sample { } /** - * A {@link DoFn} that returns up to limit elements from the side input PCollection. + * A {@link OldDoFn} that returns up to limit elements from the side input PCollection. */ - private static class SampleAnyDoFn<T> extends DoFn<Void, T> { + private static class SampleAnyDoFn<T> extends OldDoFn<Void, T> { long limit; final PCollectionView<Iterable<T>> iterableView; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java index a879925..6623c6a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java @@ -29,7 +29,7 @@ public abstract class SimpleFunction<InputT, OutputT> /** * Returns a {@link TypeDescriptor} capturing what is known statically - * about the input type of this {@code DoFn} instance's most-derived + * about the input type of this {@code OldDoFn} instance's most-derived * class. * * <p>See {@link #getOutputTypeDescriptor} for more discussion. @@ -40,10 +40,10 @@ public abstract class SimpleFunction<InputT, OutputT> /** * Returns a {@link TypeDescriptor} capturing what is known statically - * about the output type of this {@code DoFn} instance's + * about the output type of this {@code OldDoFn} instance's * most-derived class. * - * <p>In the normal case of a concrete {@code DoFn} subclass with + * <p>In the normal case of a concrete {@code OldDoFn} subclass with * no generic type parameters of its own (including anonymous inner * classes), this will be a complete non-generic type, which is good * for choosing a default output {@code Coder<OutputT>} for the output http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java index 5212261..856e32a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java @@ -58,7 +58,7 @@ public class Values<V> extends PTransform<PCollection<? extends KV<?, V>>, @Override public PCollection<V> apply(PCollection<? extends KV<?, V>> in) { return - in.apply("Values", ParDo.of(new DoFn<KV<?, V>, V>() { + in.apply("Values", ParDo.of(new OldDoFn<KV<?, V>, V>() { @Override public void processElement(ProcessContext c) { c.output(c.element().getValue()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java index 7a97c13..8a61637 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java @@ -38,7 +38,7 @@ import java.util.Map; * * <p>When a {@link ParDo} tranform is processing a main input * element in a window {@code w} and a {@link PCollectionView} is read via - * {@link DoFn.ProcessContext#sideInput}, the value of the view for {@code w} is + * {@link OldDoFn.ProcessContext#sideInput}, the value of the view for {@code w} is * returned. * * <p>The SDK supports viewing a {@link PCollection}, per window, as a single value, @@ -118,7 +118,7 @@ import java.util.Map; * * PCollection PageVisits = urlVisits * .apply(ParDo.withSideInputs(urlToPage) - * .of(new DoFn<UrlVisit, PageVisit>() { + * .of(new OldDoFn<UrlVisit, PageVisit>() { * {@literal @}Override * void processElement(ProcessContext context) { * UrlVisit urlVisit = context.element(); @@ -154,11 +154,11 @@ public class View { * * <p>If the input {@link PCollection} is empty, * throws {@link java.util.NoSuchElementException} in the consuming - * {@link DoFn}. + * {@link OldDoFn}. * * <p>If the input {@link PCollection} contains more than one * element, throws {@link IllegalArgumentException} in the - * consuming {@link DoFn}. + * consuming {@link OldDoFn}. */ public static <T> AsSingleton<T> asSingleton() { return new AsSingleton<>(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java index 25116d8..37d45aa 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java @@ -113,7 +113,7 @@ public class WithKeys<K, V> extends PTransform<PCollection<V>, @Override public PCollection<KV<K, V>> apply(PCollection<V> in) { PCollection<KV<K, V>> result = - in.apply("AddKeys", ParDo.of(new DoFn<V, KV<K, V>>() { + in.apply("AddKeys", ParDo.of(new OldDoFn<V, KV<K, V>>() { @Override public void processElement(ProcessContext c) { c.output(KV.of(fn.apply(c.element()), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java index ef4b269..41b549b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java @@ -92,7 +92,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T> * Returns the allowed timestamp skew duration, which is the maximum * duration that timestamps can be shifted backwards from the timestamp of the input element. * - * @see DoFn#getAllowedTimestampSkew() + * @see OldDoFn#getAllowedTimestampSkew() */ public Duration getAllowedTimestampSkew() { return allowedTimestampSkew; @@ -105,7 +105,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T> .setTypeDescriptorInternal(input.getTypeDescriptor()); } - private static class AddTimestampsDoFn<T> extends DoFn<T, T> { + private static class AddTimestampsDoFn<T> extends OldDoFn<T, T> { private final SerializableFunction<T, Instant> fn; private final Duration allowedTimestampSkew; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java index ee7323b..5dcaec8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java @@ -30,7 +30,6 @@ import com.google.common.collect.Sets; import com.fasterxml.jackson.annotation.JsonGetter; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonValue; - import org.apache.avro.reflect.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java index 5e4cb52..aa26cbb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java @@ -37,7 +37,6 @@ import com.google.common.collect.PeekingIterator; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java index ba4a4a7..1bd9f4a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java @@ -19,9 +19,9 @@ package org.apache.beam.sdk.transforms.join; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.join.CoGbkResult.CoGbkResultCoder; @@ -57,7 +57,7 @@ import java.util.List; * * PCollection<T> finalResultCollection = * coGbkResultCollection.apply(ParDo.of( - * new DoFn<KV<K, CoGbkResult>, T>() { + * new OldDoFn<KV<K, CoGbkResult>, T>() { * @Override * public void processElement(ProcessContext c) { * KV<K, CoGbkResult> e = c.element(); @@ -167,12 +167,12 @@ public class CoGroupByKey<K> extends } /** - * A DoFn to construct a UnionTable (i.e., a + * A OldDoFn to construct a UnionTable (i.e., a * {@code PCollection<KV<K, RawUnionValue>>} from a * {@code PCollection<KV<K, V>>}. */ private static class ConstructUnionTableFn<K, V> extends - DoFn<KV<K, V>, KV<K, RawUnionValue>> { + OldDoFn<KV<K, V>, KV<K, RawUnionValue>> { private final int index; @@ -188,12 +188,12 @@ public class CoGroupByKey<K> extends } /** - * A DoFn to construct a CoGbkResult from an input grouped union + * A OldDoFn to construct a CoGbkResult from an input grouped union * table. */ private static class ConstructCoGbkResultFn<K> - extends DoFn<KV<K, Iterable<RawUnionValue>>, - KV<K, CoGbkResult>> { + extends OldDoFn<KV<K, Iterable<RawUnionValue>>, + KV<K, CoGbkResult>> { private final CoGbkResultSchema schema; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java index bd57339..dc1e74b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java @@ -23,6 +23,7 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.util.ExecutableTrigger; import com.google.common.base.Joiner; + import org.joda.time.Instant; import java.util.Arrays; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java index 563455b..324ab08 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java @@ -25,6 +25,7 @@ import org.joda.time.Instant; import java.util.List; import java.util.Objects; + import javax.annotation.Nullable; /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java index 6f9c717..45898e0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java @@ -24,7 +24,6 @@ import org.apache.beam.sdk.coders.DurationCoder; import org.apache.beam.sdk.coders.InstantCoder; import com.fasterxml.jackson.annotation.JsonCreator; - import org.joda.time.Duration; import org.joda.time.Instant; import org.joda.time.ReadableDuration; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java index 40f3496..7267d00 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.transforms.windowing; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; + import org.joda.time.Instant; import java.util.List;
