Repository: incubator-beam Updated Branches: refs/heads/master a69a0ea90 -> edcb5eff3
Update DoFn javadocs to remove references to OldDoFn and Dataflow Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4e6230cc Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4e6230cc Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4e6230cc Branch: refs/heads/master Commit: 4e6230cc734ab3dba081e04d135a285b73008270 Parents: f7384e1 Author: Scott Wegner <[email protected]> Authored: Wed Aug 17 14:38:36 2016 -0700 Committer: Scott Wegner <[email protected]> Committed: Thu Aug 25 09:04:59 2016 -0700 ---------------------------------------------------------------------- .../examples/common/PubsubFileInjector.java | 2 +- .../apache/beam/sdk/util/DoFnRunnerBase.java | 16 +- .../beam/sdk/util/GroupAlsoByWindowsDoFn.java | 2 +- .../apache/beam/sdk/util/ReduceFnRunner.java | 5 +- .../apache/beam/sdk/util/SimpleDoFnRunner.java | 4 +- .../ImmutabilityCheckingBundleFactory.java | 4 +- .../direct/TransformEvaluatorFactory.java | 3 +- .../beam/runners/dataflow/util/DoFnInfo.java | 7 +- .../translation/MultiOutputWordCountTest.java | 2 +- .../spark/translation/SerializationTest.java | 4 +- .../org/apache/beam/sdk/AggregatorValues.java | 4 +- .../apache/beam/sdk/transforms/Aggregator.java | 14 +- .../apache/beam/sdk/transforms/CombineFns.java | 18 +- .../org/apache/beam/sdk/transforms/DoFn.java | 23 +- .../apache/beam/sdk/transforms/DoFnTester.java | 62 ++-- .../apache/beam/sdk/transforms/GroupByKey.java | 7 +- .../apache/beam/sdk/transforms/PTransform.java | 2 +- .../org/apache/beam/sdk/transforms/ParDo.java | 306 +++++++++---------- .../beam/sdk/transforms/SimpleFunction.java | 6 +- .../beam/sdk/transforms/windowing/PaneInfo.java | 10 +- .../beam/sdk/util/BaseExecutionContext.java | 4 +- .../sdk/util/ReifyTimestampAndWindowsDoFn.java | 4 +- .../apache/beam/sdk/util/SerializableUtils.java | 2 +- .../beam/sdk/util/SystemDoFnInternal.java | 7 +- .../beam/sdk/util/WindowingInternals.java | 3 +- .../DoFnDelegatingAggregatorTest.java | 2 +- .../beam/sdk/transforms/DoFnTesterTest.java | 3 +- .../apache/beam/sdk/transforms/NoOpOldDoFn.java | 2 +- 28 files changed, 263 insertions(+), 265 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java b/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java index e6a1495..4634159 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java +++ b/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java @@ -69,7 +69,7 @@ public class PubsubFileInjector { } } - /** A OldDoFn that publishes non-empty lines to Google Cloud PubSub. */ + /** A {@link OldDoFn} that publishes non-empty lines to Google Cloud PubSub. */ public static class Bound extends OldDoFn<String, Void> { private final String outputTopic; private final String timestampLabelKey; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java index 8a0f6bf..04a0978 100644 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java @@ -58,10 +58,10 @@ import org.joda.time.format.PeriodFormat; */ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<InputT, OutputT> { - /** The OldDoFn being run. */ + /** The {@link OldDoFn} being run. */ public final OldDoFn<InputT, OutputT> fn; - /** The context used for running the OldDoFn. */ + /** The context used for running the {@link OldDoFn}. */ public final DoFnContext<InputT, OutputT> context; protected DoFnRunnerBase( @@ -164,8 +164,8 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu /** * A concrete implementation of {@code OldDoFn.Context} used for running a {@link OldDoFn}. * - * @param <InputT> the type of the OldDoFn's (main) input elements - * @param <OutputT> the type of the OldDoFn's (main) output elements + * @param <InputT> the type of the {@link OldDoFn} (main) input elements + * @param <OutputT> the type of the {@link OldDoFn} (main) output elements */ private static class DoFnContext<InputT, OutputT> extends OldDoFn<InputT, OutputT>.Context { @@ -350,7 +350,7 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu } /** - * Returns a new {@code OldDoFn.ProcessContext} for the given element. + * Returns a new {@link OldDoFn.ProcessContext} for the given element. */ protected OldDoFn<InputT, OutputT>.ProcessContext createProcessContext( WindowedValue<InputT> elem) { @@ -366,11 +366,11 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu } /** - * A concrete implementation of {@code OldDoFn.ProcessContext} used for + * A concrete implementation of {@link OldDoFn.ProcessContext} used for * running a {@link OldDoFn} over a single element. * - * @param <InputT> the type of the OldDoFn's (main) input elements - * @param <OutputT> the type of the OldDoFn's (main) output elements + * @param <InputT> the type of the {@link OldDoFn} (main) input elements + * @param <OutputT> the type of the {@link OldDoFn} (main) output elements */ static class DoFnProcessContext<InputT, OutputT> extends OldDoFn<InputT, OutputT>.ProcessContext { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java index f82e5df..f386dfb 100644 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java @@ -26,7 +26,7 @@ import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.values.KV; /** - * OldDoFn that merges windows and groups elements in those windows, optionally + * {@link OldDoFn} that merges windows and groups elements in those windows, optionally * combining values. * * @param <K> key type http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java index 61e5b21..7c3e4d7 100644 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java @@ -33,7 +33,6 @@ import java.util.Set; import javax.annotation.Nullable; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -177,8 +176,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { * Store the previously emitted pane (if any) for each window. * * <ul> - * <li>State: The previous {@link PaneInfo} passed to the user's {@link OldDoFn#processElement}, - * if any. + * <li>State: The previous {@link PaneInfo} passed to the user's {@code DoFn.ProcessElement} + * method, if any. * <li>Style style: DIRECT * <li>Merging: Always keyed by actual window, so does not depend on {@link #activeWindows}. * Cleared when window is merged away. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java index 6c1cf45..1ebe5a8 100644 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java @@ -28,8 +28,8 @@ import org.apache.beam.sdk.values.TupleTag; /** * Runs a {@link OldDoFn} by constructing the appropriate contexts and passing them in. * - * @param <InputT> the type of the OldDoFn's (main) input elements - * @param <OutputT> the type of the OldDoFn's (main) output elements + * @param <InputT> the type of the {@link OldDoFn} (main) input elements + * @param <OutputT> the type of the {@link OldDoFn} (main) output elements */ public class SimpleDoFnRunner<InputT, OutputT> extends DoFnRunnerBase<InputT, OutputT>{ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java index d5c0f0c..71bd8b4 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java @@ -25,7 +25,7 @@ import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.util.IllegalMutationException; import org.apache.beam.sdk.util.MutationDetector; import org.apache.beam.sdk.util.MutationDetectors; @@ -40,7 +40,7 @@ import org.joda.time.Instant; * elements added to the bundle will be encoded by the {@link Coder} of the underlying * {@link PCollection}. * - * <p>This catches errors during the execution of a {@link OldDoFn} caused by modifying an element + * <p>This catches errors during the execution of a {@link DoFn} caused by modifying an element * after it is added to an output {@link PCollection}. */ class ImmutabilityCheckingBundleFactory implements BundleFactory { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java index e9fa06b..ecf2da8 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java @@ -22,7 +22,6 @@ import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; /** @@ -37,7 +36,7 @@ public interface TransformEvaluatorFactory { * Create a new {@link TransformEvaluator} for the application of the {@link PTransform}. * * <p>Any work that must be done before input elements are processed (such as calling - * {@link OldDoFn#startBundle(OldDoFn.Context)}) must be done before the + * {@code DoFn.StartBundle}) must be done before the * {@link TransformEvaluator} is made available to the caller. * * <p>May return null if the application cannot produce an evaluator (for example, it is a http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java index 139db9d..949c381 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java @@ -24,10 +24,10 @@ import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollectionView; /** - * Wrapper class holding the necessary information to serialize a OldDoFn. + * Wrapper class holding the necessary information to serialize a {@link OldDoFn}. * - * @param <InputT> the type of the (main) input elements of the OldDoFn - * @param <OutputT> the type of the (main) output elements of the OldDoFn + * @param <InputT> the type of the (main) input elements of the {@link OldDoFn} + * @param <OutputT> the type of the (main) output elements of the {@link OldDoFn} */ public class DoFnInfo<InputT, OutputT> implements Serializable { private final OldDoFn<InputT, OutputT> doFn; @@ -66,3 +66,4 @@ public class DoFnInfo<InputT, OutputT> implements Serializable { return inputCoder; } } + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java index 517596a..acfa3df 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java @@ -100,7 +100,7 @@ public class MultiOutputWordCountTest { } /** - * A OldDoFn that tokenizes lines of text into individual words. + * A {@link DoFn} that tokenizes lines of text into individual words. */ static class ExtractWordsFn extends DoFn<String, String> { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java index 0e9121c..22a40cd 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java @@ -142,7 +142,7 @@ public class SerializationTest { } /** - * A OldDoFn that tokenizes lines of text into individual words. + * A {@link DoFn} that tokenizes lines of text into individual words. */ static class ExtractWordsFn extends DoFn<StringHolder, StringHolder> { private static final Pattern WORD_BOUNDARY = Pattern.compile("[^a-zA-Z']+"); @@ -170,7 +170,7 @@ public class SerializationTest { } /** - * A OldDoFn that converts a Word and Count into a printable string. + * A {@link DoFn} that converts a Word and Count into a printable string. */ private static class FormatCountsFn extends DoFn<KV<StringHolder, Long>, StringHolder> { @ProcessElement http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorValues.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorValues.java index 6297085..1fd034a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorValues.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorValues.java @@ -21,11 +21,11 @@ import java.util.Collection; import java.util.Map; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; /** * A collection of values associated with an {@link Aggregator}. Aggregators declared in a - * {@link OldDoFn} are emitted on a per-{@code OldDoFn}-application basis. + * {@link DoFn} are emitted on a per-{@link DoFn}-application basis. * * @param <T> the output type of the aggregator */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java index db4ab33..67d399f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java @@ -25,8 +25,8 @@ import org.apache.beam.sdk.util.ExecutionContext; * to be combined across all bundles. * * <p>Aggregators are created by calling - * {@link OldDoFn#createAggregator OldDoFn.createAggregatorForDoFn}, - * typically from the {@link OldDoFn} constructor. Elements can be added to the + * {@link DoFn#createAggregator DoFn.createAggregatorForDoFn}, + * typically from the {@link DoFn} constructor. Elements can be added to the * {@code Aggregator} by calling {@link Aggregator#addValue}. * * <p>Aggregators are visible in the monitoring UI, when the pipeline is run @@ -37,14 +37,14 @@ import org.apache.beam.sdk.util.ExecutionContext; * * <p>Example: * <pre> {@code - * class MyDoFn extends OldDoFn<String, String> { + * class MyDoFn extends DoFn<String, String> { * private Aggregator<Integer, Integer> myAggregator; * * public MyDoFn() { * myAggregator = createAggregatorForDoFn("myAggregator", new Sum.SumIntegerFn()); * } * - * @Override + * @ProcessElement * public void processElement(ProcessContext c) { * myAggregator.addValue(1); * } @@ -79,8 +79,8 @@ public interface Aggregator<InputT, OutputT> { /** * Create an aggregator with the given {@code name} and {@link CombineFn}. * - * <p>This method is called to create an aggregator for a {@link OldDoFn}. It receives the - * class of the {@link OldDoFn} being executed and the context of the step it is being + * <p>This method is called to create an aggregator for a {@link DoFn}. It receives the + * class of the {@link DoFn} being executed and the context of the step it is being * executed in. */ <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn( @@ -90,7 +90,7 @@ public interface Aggregator<InputT, OutputT> { // TODO: Consider the following additional API conveniences: // - In addition to createAggregatorForDoFn(), consider adding getAggregator() to - // avoid the need to store the aggregator locally in a OldDoFn, i.e., create + // avoid the need to store the aggregator locally in a DoFn, i.e., create // if not already present. // - Add a shortcut for the most common aggregator: // c.createAggregatorForDoFn("name", new Sum.SumIntegerFn()). http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java index 9fa8ded..6f05993 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java @@ -67,7 +67,7 @@ public class CombineFns { * <p>The same {@link TupleTag} cannot be used in a composition multiple times. * * <p>Example: - * <pre>{ @code + * <pre><code> * PCollection<KV<K, Integer>> latencies = ...; * * TupleTag<Integer> maxLatencyTag = new TupleTag<Integer>(); @@ -75,7 +75,7 @@ public class CombineFns { * * SimpleFunction<Integer, Integer> identityFn = * new SimpleFunction<Integer, Integer>() { - * @Override + * {@literal @}Override * public Integer apply(Integer input) { * return input; * }}; @@ -87,8 +87,8 @@ public class CombineFns { * * PCollection<T> finalResultCollection = maxAndMean * .apply(ParDo.of( - * new OldDoFn<KV<K, CoCombineResult>, T>() { - * @Override + * new DoFn<KV<K, CoCombineResult>, T>() { + * {@literal @}ProcessElement * public void processElement(ProcessContext c) throws Exception { * KV<K, CoCombineResult> e = c.element(); * Integer maxLatency = e.getValue().get(maxLatencyTag); @@ -97,7 +97,7 @@ public class CombineFns { * c.output(...some T...); * } * })); - * } </pre> + * </code></pre> */ public static ComposeKeyedCombineFnBuilder composeKeyed() { return new ComposeKeyedCombineFnBuilder(); @@ -110,7 +110,7 @@ public class CombineFns { * <p>The same {@link TupleTag} cannot be used in a composition multiple times. * * <p>Example: - * <pre>{ @code + * <pre><code> * PCollection<Integer> globalLatencies = ...; * * TupleTag<Integer> maxLatencyTag = new TupleTag<Integer>(); @@ -130,8 +130,8 @@ public class CombineFns { * * PCollection<T> finalResultCollection = maxAndMean * .apply(ParDo.of( - * new OldDoFn<CoCombineResult, T>() { - * @Override + * new DoFn<CoCombineResult, T>() { + * {@literal @}ProcessElement * public void processElement(ProcessContext c) throws Exception { * CoCombineResult e = c.element(); * Integer maxLatency = e.get(maxLatencyTag); @@ -140,7 +140,7 @@ public class CombineFns { * c.output(...some T...); * } * })); - * } </pre> + * </code></pre> */ public static ComposeCombineFnBuilder compose() { return new ComposeCombineFnBuilder(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 9f89826..59c8323 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -63,8 +63,6 @@ import org.joda.time.Instant; * that satisfies the requirements described there. See the {@link ProcessElement} * for details. * - * <p>This functionality is experimental and likely to change. - * * <p>Example usage: * * <pre> {@code @@ -123,7 +121,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD * * <p>If invoked from {@link ProcessElement}), the timestamp * must not be older than the input element's timestamp minus - * {@link OldDoFn#getAllowedTimestampSkew}. The output element will + * {@link DoFn#getAllowedTimestampSkew}. The output element will * be in the same windows as the input element. * * <p>If invoked from {@link StartBundle} or {@link FinishBundle}, @@ -172,7 +170,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD * * <p>If invoked from {@link ProcessElement}), the timestamp * must not be older than the input element's timestamp minus - * {@link OldDoFn#getAllowedTimestampSkew}. The output element will + * {@link DoFn#getAllowedTimestampSkew}. The output element will * be in the same windows as the input element. * * <p>If invoked from {@link StartBundle} or {@link FinishBundle}, @@ -190,7 +188,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD } /** - * Information accessible when running {@link OldDoFn#processElement}. + * Information accessible when running a {@link DoFn.ProcessElement} method. */ public abstract class ProcessContext extends Context { @@ -359,9 +357,14 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD * Annotation for the method to use to prepare an instance for processing a batch of elements. * The method annotated with this must satisfy the following constraints: * <ul> - * <li>It must have at least one argument. + * <li>It must have exactly one argument. * <li>Its first (and only) argument must be a {@link DoFn.Context}. * </ul> + * + * <p>A simple method declaration would look like: + * <code> + * public void setup(DoFn.Context c) { .. } + * </code> */ @Documented @Retention(RetentionPolicy.RUNTIME) @@ -414,13 +417,13 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD /** * Returns an {@link Aggregator} with aggregation logic specified by the * {@link CombineFn} argument. The name provided must be unique across - * {@link Aggregator}s created within the OldDoFn. Aggregators can only be created + * {@link Aggregator}s created within the {@link DoFn}. Aggregators can only be created * during pipeline construction. * * @param name the name of the aggregator * @param combiner the {@link CombineFn} to use in the aggregator * @return an aggregator for the provided name and combiner in the scope of - * this OldDoFn + * this {@link DoFn} * @throws NullPointerException if the name or combiner is null * @throws IllegalArgumentException if the given name collides with another * aggregator in this scope @@ -447,13 +450,13 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD /** * Returns an {@link Aggregator} with the aggregation logic specified by the * {@link SerializableFunction} argument. The name provided must be unique - * across {@link Aggregator}s created within the OldDoFn. Aggregators can only be + * across {@link Aggregator}s created within the {@link DoFn}. Aggregators can only be * created during pipeline construction. * * @param name the name of the aggregator * @param combiner the {@link SerializableFunction} to use in the aggregator * @return an aggregator for the provided name and combiner in the scope of - * this OldDoFn + * this {@link DoFn} * @throws NullPointerException if the name or combiner is null * @throws IllegalArgumentException if the given name collides with another * aggregator in this scope http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index 82c1293..6801768 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -50,12 +50,12 @@ import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Instant; /** - * A harness for unit-testing a {@link OldDoFn}. + * A harness for unit-testing a {@link DoFn}. * * <p>For example: * * <pre> {@code - * OldDoFn<InputT, OutputT> fn = ...; + * DoFn<InputT, OutputT> fn = ...; * * DoFnTester<InputT, OutputT> fnTester = DoFnTester.of(fn); * @@ -72,17 +72,17 @@ import org.joda.time.Instant; * Assert.assertThat(fnTester.processBundle(i1, i2, ...), Matchers.hasItems(...)); * } </pre> * - * @param <InputT> the type of the {@code OldDoFn}'s (main) input elements - * @param <OutputT> the type of the {@code OldDoFn}'s (main) output elements + * @param <InputT> the type of the {@link DoFn}'s (main) input elements + * @param <OutputT> the type of the {@link DoFn}'s (main) output elements */ public class DoFnTester<InputT, OutputT> { /** * Returns a {@code DoFnTester} supporting unit-testing of the given - * {@link OldDoFn}. + * {@link DoFn}. */ @SuppressWarnings("unchecked") - public static <InputT, OutputT> DoFnTester<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn) { - return new DoFnTester<InputT, OutputT>(fn); + public static <InputT, OutputT> DoFnTester<InputT, OutputT> of(DoFn<InputT, OutputT> fn) { + return new DoFnTester<>(DoFnAdapters.toOldDoFn(fn)); } /** @@ -90,19 +90,19 @@ public class DoFnTester<InputT, OutputT> { * {@link OldDoFn}. */ @SuppressWarnings("unchecked") - public static <InputT, OutputT> DoFnTester<InputT, OutputT> - of(DoFn<InputT, OutputT> fn) { - return new DoFnTester<InputT, OutputT>(DoFnAdapters.toOldDoFn(fn)); + public static <InputT, OutputT> DoFnTester<InputT, OutputT> + of(OldDoFn<InputT, OutputT> fn) { + return new DoFnTester<>(fn); } /** * Registers the tuple of values of the side input {@link PCollectionView}s to - * pass to the {@link OldDoFn} under test. + * pass to the {@link DoFn} under test. * * <p>Resets the state of this {@link DoFnTester}. * * <p>If this isn't called, {@code DoFnTester} assumes the - * {@link OldDoFn} takes no side inputs. + * {@link DoFn} takes no side inputs. */ public void setSideInputs(Map<PCollectionView<?>, Map<BoundedWindow, ?>> sideInputs) { this.sideInputs = sideInputs; @@ -110,7 +110,7 @@ public class DoFnTester<InputT, OutputT> { } /** - * Registers the values of a side input {@link PCollectionView} to pass to the {@link OldDoFn} + * Registers the values of a side input {@link PCollectionView} to pass to the {@link DoFn} * under test. * * <p>The provided value is the final value of the side input in the specified window, not @@ -129,7 +129,7 @@ public class DoFnTester<InputT, OutputT> { } /** - * Whether or not a {@link DoFnTester} should clone the {@link OldDoFn} under test. + * Whether or not a {@link DoFnTester} should clone the {@link DoFn} under test. */ public enum CloningBehavior { CLONE, @@ -137,14 +137,14 @@ public class DoFnTester<InputT, OutputT> { } /** - * Instruct this {@link DoFnTester} whether or not to clone the {@link OldDoFn} under test. + * Instruct this {@link DoFnTester} whether or not to clone the {@link DoFn} under test. */ public void setCloningBehavior(CloningBehavior newValue) { this.cloningBehavior = newValue; } /** - * Indicates whether this {@link DoFnTester} will clone the {@link OldDoFn} under test. + * Indicates whether this {@link DoFnTester} will clone the {@link DoFn} under test. */ public CloningBehavior getCloningBehavior() { return cloningBehavior; @@ -166,7 +166,7 @@ public class DoFnTester<InputT, OutputT> { } /** - * A convenience method for testing {@link OldDoFn DoFns} with bundles of elements. + * A convenience method for testing {@link DoFn DoFns} with bundles of elements. * Logic proceeds as follows: * * <ol> @@ -182,9 +182,9 @@ public class DoFnTester<InputT, OutputT> { } /** - * Calls {@link OldDoFn#startBundle} on the {@code OldDoFn} under test. + * Calls the {@link DoFn.StartBundle} method on the {@link DoFn} under test. * - * <p>If needed, first creates a fresh instance of the OldDoFn under test. + * <p>If needed, first creates a fresh instance of the {@link DoFn} under test. */ public void startBundle() throws Exception { resetState(); @@ -210,14 +210,14 @@ public class DoFnTester<InputT, OutputT> { } /** - * Calls {@link OldDoFn#processElement} on the {@code OldDoFn} under test, in a - * context where {@link OldDoFn.ProcessContext#element} returns the + * Calls the {@link DoFn.ProcessElement} method on the {@link DoFn} under test, in a + * context where {@link DoFn.ProcessContext#element} returns the * given element. * * <p>Will call {@link #startBundle} automatically, if it hasn't * already been called. * - * @throws IllegalStateException if the {@code OldDoFn} under test has already + * @throws IllegalStateException if the {@code DoFn} under test has already * been finished */ public void processElement(InputT element) throws Exception { @@ -235,12 +235,12 @@ public class DoFnTester<InputT, OutputT> { } /** - * Calls {@link OldDoFn#finishBundle} of the {@code OldDoFn} under test. + * Calls the {@link DoFn.FinishBundle} method of the {@link DoFn} under test. * * <p>Will call {@link #startBundle} automatically, if it hasn't * already been called. * - * @throws IllegalStateException if the {@code OldDoFn} under test has already + * @throws IllegalStateException if the {@link DoFn} under test has already * been finished */ public void finishBundle() throws Exception { @@ -674,7 +674,7 @@ public class DoFnTester<InputT, OutputT> { ///////////////////////////////////////////////////////////////////////////// - /** The possible states of processing a OldDoFn. */ + /** The possible states of processing a {@link DoFn}. */ enum State { UNSTARTED, STARTED, @@ -683,23 +683,23 @@ public class DoFnTester<InputT, OutputT> { private final PipelineOptions options = PipelineOptionsFactory.create(); - /** The original OldDoFn under test. */ + /** The original {@link OldDoFn} under test. */ private final OldDoFn<InputT, OutputT> origFn; /** - * Whether to clone the original {@link OldDoFn} or just use it as-is. + * Whether to clone the original {@link DoFn} or just use it as-is. * - * <p></p>Worker-side {@link OldDoFn DoFns} may not be serializable, and are not required to be. + * <p>Worker-side {@link DoFn DoFns} may not be serializable, and are not required to be. */ private CloningBehavior cloningBehavior = CloningBehavior.CLONE; - /** The side input values to provide to the OldDoFn under test. */ + /** The side input values to provide to the {@link DoFn} under test. */ private Map<PCollectionView<?>, Map<BoundedWindow, ?>> sideInputs = new HashMap<>(); private Map<String, Object> accumulators; - /** The output tags used by the OldDoFn under test. */ + /** The output tags used by the {@link DoFn} under test. */ private TupleTag<OutputT> mainOutputTag = new TupleTag<>(); /** The original OldDoFn under test, if started. */ @@ -708,7 +708,7 @@ public class DoFnTester<InputT, OutputT> { /** The ListOutputManager to examine the outputs. */ private Map<TupleTag<?>, List<WindowedValue<?>>> outputs; - /** The state of processing of the OldDoFn under test. */ + /** The state of processing of the {@link DoFn} under test. */ private State state; private DoFnTester(OldDoFn<InputT, OutputT> origFn) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java index ed7f411..3a3da65 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java @@ -63,18 +63,19 @@ import org.apache.beam.sdk.values.PCollection.IsBounded; * {@code Coder} of the values of the input. * * <p>Example of use: - * <pre> {@code + * <pre><code> * PCollection<KV<String, Doc>> urlDocPairs = ...; * PCollection<KV<String, Iterable<Doc>>> urlToDocs = * urlDocPairs.apply(GroupByKey.<String, Doc>create()); * PCollection<R> results = - * urlToDocs.apply(ParDo.of(new OldDoFn<KV<String, Iterable<Doc>>, R>() { + * urlToDocs.apply(ParDo.of(new DoFn<KV<String, Iterable<Doc>>, R>() { + * {@literal @}ProcessElement * public void processElement(ProcessContext c) { * String url = c.element().getKey(); * Iterable<Doc> docsWithThatUrl = c.element().getValue(); * ... process all docs having that url ... * }})); - * } </pre> + * </code></pre> * * <p>{@code GroupByKey} is a key primitive in data-parallel * processing, since it is the main way to efficiently bring http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java index 19abef9..4a58141 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java @@ -146,7 +146,7 @@ import org.apache.beam.sdk.values.TypedPValue; * implementing {@code Serializable}. * * <p>{@code PTransform} is marked {@code Serializable} solely - * because it is common for an anonymous {@code OldDoFn}, + * because it is common for an anonymous {@link DoFn}, * instance to be created within an * {@code apply()} method of a composite {@code PTransform}. * http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index 5efbe9f..f9cb557 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -22,7 +22,6 @@ import java.io.Serializable; import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.runners.PipelineRunner; @@ -50,13 +49,12 @@ import org.apache.beam.sdk.values.TypedPValue; * <p>The {@link ParDo} processing style is similar to what happens inside * the "Mapper" or "Reducer" class of a MapReduce-style algorithm. * - * <h2>{@link OldDoFn DoFns}</h2> + * <h2>{@link DoFn DoFns}</h2> * * <p>The function to use to process each element is specified by a - * {@link OldDoFn OldDoFn<InputT, OutputT>}, primarily via its - * {@link OldDoFn#processElement processElement} method. The {@link OldDoFn} may also - * override the default implementations of {@link OldDoFn#startBundle startBundle} - * and {@link OldDoFn#finishBundle finishBundle}. + * {@link DoFn DoFn<InputT, OutputT>}, primarily via its + * {@link DoFn.ProcessElement ProcessElement} method. The {@link DoFn} may also + * provide a {@link DoFn.StartBundle StartBundle} and {@link DoFn.FinishBundle finishBundle} method. * * <p>Conceptually, when a {@link ParDo} transform is executed, the * elements of the input {@link PCollection} are first divided up @@ -66,39 +64,38 @@ import org.apache.beam.sdk.values.TypedPValue; * * <ol> * <li>If required, a fresh instance of the argument {@link DoFn} is created - * on a worker, and {@link DoFn#setup()} is called on this instance. This may be through - * deserialization or other means. A {@link PipelineRunner} may reuse {@link DoFn} instances for - * multiple bundles. A {@link DoFn} that has terminated abnormally (by throwing an + * on a worker, and the {@link DoFn.Setup} method is called on this instance. This may be + * through deserialization or other means. A {@link PipelineRunner} may reuse {@link DoFn} + * instances for multiple bundles. A {@link DoFn} that has terminated abnormally (by throwing an * {@link Exception}) will never be reused.</li> - * <li>The {@link OldDoFn OldDoFn's} {@link OldDoFn#startBundle} method is called to - * initialize it. If this method is not overridden, the call may be optimized - * away.</li> - * <li>The {@link OldDoFn OldDoFn's} {@link OldDoFn#processElement} method + * <li>The {@link DoFn DoFn's} {@link DoFn.StartBundle} method, if provided, is called to + * initialize it.</li> + * <li>The {@link DoFn DoFn's} {@link DoFn.ProcessElement} method * is called on each of the input elements in the bundle.</li> - * <li>The {@link OldDoFn OldDoFn's} {@link OldDoFn#finishBundle} method is called - * to complete its work. After {@link OldDoFn#finishBundle} is called, the - * framework will not again invoke {@link OldDoFn#processElement} or - * {@link OldDoFn#finishBundle} - * until a new call to {@link OldDoFn#startBundle} has occurred. - * If this method is not overridden, this call may be optimized away.</li> - * <li>If any of {@link DoFn#setup}, {@link DoFn#startBundle}, {@link DoFn#processElement} or - * {@link DoFn#finishBundle} throw an exception, {@link DoFn#teardown} will be called on the - * {@link DoFn} instance.</li> - * <li>If a runner will no longer use a {@link DoFn}, {@link DoFn#teardown()} will be called on - * the discarded instance.</li> + * <li>The {@link DoFn DoFn's} {@link DoFn.FinishBundle} method, if provided, is called + * to complete its work. After {@link DoFn.FinishBundle} is called, the + * framework will not again invoke {@link DoFn.ProcessElement} or + * {@link DoFn.FinishBundle} + * until a new call to {@link DoFn.StartBundle} has occurred.</li> + * <li>If any of {@link DoFn.Setup}, {@link DoFn.StartBundle}, {@link DoFn.ProcessElement} or + * {@link DoFn.FinishBundle} methods throw an exception, the {@link DoFn.Teardown} method, if + * provided, will be called on the {@link DoFn} instance.</li> + * <li>If a runner will no longer use a {@link DoFn}, the {@link DoFn.Teardown} method, if + * provided, will be called on the discarded instance.</li> * </ol> * - * Each of the calls to any of the {@link OldDoFn OldDoFn's} processing + * Each of the calls to any of the {@link DoFn DoFn's} processing * methods can produce zero or more output elements. All of the - * of output elements from all of the {@link OldDoFn} instances + * of output elements from all of the {@link DoFn} instances * are included in the output {@link PCollection}. * * <p>For example: * - * <pre> {@code + * <pre><code> * PCollection<String> lines = ...; * PCollection<String> words = - * lines.apply(ParDo.of(new OldDoFn<String, String>() { + * lines.apply(ParDo.of(new DoFn<String, String>() { + * {@literal @}ProcessElement * public void processElement(ProcessContext c) { * String line = c.element(); * for (String word : line.split("[^a-zA-Z']+")) { @@ -106,13 +103,14 @@ import org.apache.beam.sdk.values.TypedPValue; * } * }})); * PCollection<Integer> wordLengths = - * words.apply(ParDo.of(new OldDoFn<String, Integer>() { + * words.apply(ParDo.of(new DoFn<String, Integer>() { + * {@literal @}ProcessElement * public void processElement(ProcessContext c) { * String word = c.element(); * Integer length = word.length(); * c.output(length); * }})); - * } </pre> + * </code></pre> * * <p>Each output element has the same timestamp and is in the same windows * as its corresponding input element, and the output {@code PCollection} @@ -131,9 +129,9 @@ import org.apache.beam.sdk.values.TypedPValue; * * <pre> {@code * PCollection<String> words = - * lines.apply("ExtractWords", ParDo.of(new OldDoFn<String, String>() { ... })); + * lines.apply("ExtractWords", ParDo.of(new DoFn<String, String>() { ... })); * PCollection<Integer> wordLengths = - * words.apply("ComputeWordLengths", ParDo.of(new OldDoFn<String, Integer>() { ... })); + * words.apply("ComputeWordLengths", ParDo.of(new DoFn<String, Integer>() { ... })); * } </pre> * * <h2>Side Inputs</h2> @@ -145,17 +143,18 @@ import org.apache.beam.sdk.values.TypedPValue; * {@link PCollection PCollections} computed by earlier pipeline operations, * passed in to the {@link ParDo} transform using * {@link #withSideInputs}, and their contents accessible to each of - * the {@link OldDoFn} operations via {@link OldDoFn.ProcessContext#sideInput sideInput}. + * the {@link DoFn} operations via {@link DoFn.ProcessContext#sideInput sideInput}. * For example: * - * <pre> {@code + * <pre><code> * PCollection<String> words = ...; * PCollection<Integer> maxWordLengthCutOff = ...; // Singleton PCollection * final PCollectionView<Integer> maxWordLengthCutOffView = * maxWordLengthCutOff.apply(View.<Integer>asSingleton()); * PCollection<String> wordsBelowCutOff = * words.apply(ParDo.withSideInputs(maxWordLengthCutOffView) - * .of(new OldDoFn<String, String>() { + * .of(new DoFn<String, String>() { + * {@literal @}ProcessElement * public void processElement(ProcessContext c) { * String word = c.element(); * int lengthCutOff = c.sideInput(maxWordLengthCutOffView); @@ -163,7 +162,7 @@ import org.apache.beam.sdk.values.TypedPValue; * c.output(word); * } * }})); - * } </pre> + * </code></pre> * * <h2>Side Outputs</h2> * @@ -174,13 +173,13 @@ import org.apache.beam.sdk.values.TypedPValue; * and bundled in a {@link PCollectionTuple}. The {@link TupleTag TupleTags} * to be used for the output {@link PCollectionTuple} are specified by * invoking {@link #withOutputTags}. Unconsumed side outputs do not - * necessarily need to be explicitly specified, even if the {@link OldDoFn} - * generates them. Within the {@link OldDoFn}, an element is added to the + * necessarily need to be explicitly specified, even if the {@link DoFn} + * generates them. Within the {@link DoFn}, an element is added to the * main output {@link PCollection} as normal, using - * {@link OldDoFn.Context#output}, while an element is added to a side output - * {@link PCollection} using {@link OldDoFn.Context#sideOutput}. For example: + * {@link DoFn.Context#output}, while an element is added to a side output + * {@link PCollection} using {@link DoFn.Context#sideOutput}. For example: * - * <pre> {@code + * <pre><code> * PCollection<String> words = ...; * // Select words whose length is below a cut off, * // plus the lengths of words that are above the cut off. @@ -201,10 +200,11 @@ import org.apache.beam.sdk.values.TypedPValue; * .withOutputTags(wordsBelowCutOffTag, * TupleTagList.of(wordLengthsAboveCutOffTag) * .and(markedWordsTag)) - * .of(new OldDoFn<String, String>() { + * .of(new DoFn<String, String>() { * // Create a tag for the unconsumed side output. * final TupleTag<String> specialWordsTag = * new TupleTag<String>(){}; + * {@literal @}ProcessElement * public void processElement(ProcessContext c) { * String word = c.element(); * if (word.length() <= wordLengthCutOff) { @@ -230,14 +230,13 @@ import org.apache.beam.sdk.values.TypedPValue; * results.get(wordLengthsAboveCutOffTag); * PCollection<String> markedWords = * results.get(markedWordsTag); - * } </pre> + * </code></pre> * * <h2>Properties May Be Specified In Any Order</h2> * * <p>Several properties can be specified for a {@link ParDo} - * {@link PTransform}, including name, side inputs, side output tags, - * and {@link OldDoFn} to invoke. Only the {@link OldDoFn} is required; the - * name is encouraged but not required, and side inputs and side + * {@link PTransform}, including side inputs, side output tags, + * and {@link DoFn} to invoke. Only the {@link DoFn} is required; side inputs and side * output tags are only specified when they're needed. These * properties can be specified in any order, as long as they're * specified before the {@link ParDo} {@link PTransform} is applied. @@ -250,23 +249,23 @@ import org.apache.beam.sdk.values.TypedPValue; * {@link ParDo.Bound} nested classes, each of which offer * property setter instance methods to enable setting additional * properties. {@link ParDo.Bound} is used for {@link ParDo} - * transforms whose {@link OldDoFn} is specified and whose input and + * transforms whose {@link DoFn} is specified and whose input and * output static types have been bound. {@link ParDo.Unbound ParDo.Unbound} is used * for {@link ParDo} transforms that have not yet had their - * {@link OldDoFn} specified. Only {@link ParDo.Bound} instances can be + * {@link DoFn} specified. Only {@link ParDo.Bound} instances can be * applied. * * <p>Another benefit of this approach is that it reduces the number * of type parameters that need to be specified manually. In * particular, the input and output types of the {@link ParDo} * {@link PTransform} are inferred automatically from the type - * parameters of the {@link OldDoFn} argument passed to {@link ParDo#of}. + * parameters of the {@link DoFn} argument passed to {@link ParDo#of}. * * <h2>Output Coders</h2> * * <p>By default, the {@link Coder Coder<OutputT>} for the * elements of the main output {@link PCollection PCollection<OutputT>} is - * inferred from the concrete type of the {@link OldDoFn OldDoFn<InputT, OutputT>}. + * inferred from the concrete type of the {@link DoFn DoFn<InputT, OutputT>}. * * <p>By default, the {@link Coder Coder<SideOutputT>} for the elements of * a side output {@link PCollection PCollection<SideOutputT>} is inferred @@ -286,74 +285,74 @@ import org.apache.beam.sdk.values.TypedPValue; * This style of {@code TupleTag} instantiation is used in the example of * multiple side outputs, above. * - * <h2>Serializability of {@link OldDoFn DoFns}</h2> + * <h2>Serializability of {@link DoFn DoFns}</h2> * - * <p>A {@link OldDoFn} passed to a {@link ParDo} transform must be - * {@link Serializable}. This allows the {@link OldDoFn} instance + * <p>A {@link DoFn} passed to a {@link ParDo} transform must be + * {@link Serializable}. This allows the {@link DoFn} instance * created in this "main program" to be sent (in serialized form) to * remote worker machines and reconstituted for bundles of elements - * of the input {@link PCollection} being processed. A {@link OldDoFn} + * of the input {@link PCollection} being processed. A {@link DoFn} * can have instance variable state, and non-transient instance * variable state will be serialized in the main program and then * deserialized on remote worker machines for some number of bundles * of elements to process. * - * <p>{@link OldDoFn DoFns} expressed as anonymous inner classes can be + * <p>{@link DoFn DoFns} expressed as anonymous inner classes can be * convenient, but due to a quirk in Java's rules for serializability, * non-static inner or nested classes (including anonymous inner * classes) automatically capture their enclosing class's instance in * their serialized state. This can lead to including much more than - * intended in the serialized state of a {@link OldDoFn}, or even things + * intended in the serialized state of a {@link DoFn}, or even things * that aren't {@link Serializable}. * * <p>There are two ways to avoid unintended serialized state in a - * {@link OldDoFn}: + * {@link DoFn}: * * <ul> * - * <li>Define the {@link OldDoFn} as a named, static class. + * <li>Define the {@link DoFn} as a named, static class. * - * <li>Define the {@link OldDoFn} as an anonymous inner class inside of + * <li>Define the {@link DoFn} as an anonymous inner class inside of * a static method. * * </ul> * * <p>Both of these approaches ensure that there is no implicit enclosing - * instance serialized along with the {@link OldDoFn} instance. + * instance serialized along with the {@link DoFn} instance. * * <p>Prior to Java 8, any local variables of the enclosing * method referenced from within an anonymous inner class need to be - * marked as {@code final}. If defining the {@link OldDoFn} as a named + * marked as {@code final}. If defining the {@link DoFn} as a named * static class, such variables would be passed as explicit * constructor arguments and stored in explicit instance variables. * * <p>There are three main ways to initialize the state of a - * {@link OldDoFn} instance processing a bundle: + * {@link DoFn} instance processing a bundle: * * <ul> * * <li>Define instance variable state (including implicit instance * variables holding final variables captured by an anonymous inner - * class), initialized by the {@link OldDoFn}'s constructor (which is + * class), initialized by the {@link DoFn}'s constructor (which is * implicit for an anonymous inner class). This state will be - * automatically serialized and then deserialized in the {@code OldDoFn} + * automatically serialized and then deserialized in the {@link DoFn} * instances created for bundles. This method is good for state - * known when the original {@code OldDoFn} is created in the main + * known when the original {@link DoFn} is created in the main * program, if it's not overly large. This is not suitable for any - * state which must only be used for a single bundle, as {@link OldDoFn OldDoFn's} + * state which must only be used for a single bundle, as {@link DoFn DoFn's} * may be used to process multiple bundles. * * <li>Compute the state as a singleton {@link PCollection} and pass it - * in as a side input to the {@link OldDoFn}. This is good if the state + * in as a side input to the {@link DoFn}. This is good if the state * needs to be computed by the pipeline, or if the state is very large * and so is best read from file(s) rather than sent as part of the - * {@code OldDoFn}'s serialized state. + * {@link DoFn DoFn's} serialized state. * - * <li>Initialize the state in each {@link OldDoFn} instance, in - * {@link OldDoFn#startBundle}. This is good if the initialization + * <li>Initialize the state in each {@link DoFn} instance, in a + * {@link DoFn.StartBundle} method. This is good if the initialization * doesn't depend on any information known only by the main program or * computed by earlier pipeline operations, but is the same for all - * instances of this {@link OldDoFn} for all program executions, say + * instances of this {@link DoFn} for all program executions, say * setting up empty caches or initializing constant data. * * </ul> @@ -363,16 +362,16 @@ import org.apache.beam.sdk.values.TypedPValue; * <p>{@link ParDo} operations are intended to be able to run in * parallel across multiple worker machines. This precludes easy * sharing and updating mutable state across those machines. There is - * no support in the Google Cloud Dataflow system for communicating + * no support in the Beam model for communicating * and synchronizing updates to shared state across worker machines, * so programs should not access any mutable static variable state in - * their {@link OldDoFn}, without understanding that the Java processes + * their {@link DoFn}, without understanding that the Java processes * for the main program and workers will each have its own independent * copy of such state, and there won't be any automatic copying of * that state across Java processes. All information should be - * communicated to {@link OldDoFn} instances via main and side inputs and + * communicated to {@link DoFn} instances via main and side inputs and * serialized state, and all output should be communicated from a - * {@link OldDoFn} instance via main and side outputs, in the absence of + * {@link DoFn} instance via main and side outputs, in the absence of * external communication mechanisms written by user code. * * <h2>Fault Tolerance</h2> @@ -380,29 +379,28 @@ import org.apache.beam.sdk.values.TypedPValue; * <p>In a distributed system, things can fail: machines can crash, * machines can be unable to communicate across the network, etc. * While individual failures are rare, the larger the job, the greater - * the chance that something, somewhere, will fail. The Google Cloud - * Dataflow service strives to mask such failures automatically, - * principally by retrying failed {@link OldDoFn} bundle. This means - * that a {@code OldDoFn} instance might process a bundle partially, then - * crash for some reason, then be rerun (often on a different worker - * machine) on that same bundle and on the same elements as before. - * Sometimes two or more {@link OldDoFn} instances will be running on the + * the chance that something, somewhere, will fail. Beam runners may strive + * to mask such failures by retrying failed {@link DoFn} bundle. This means + * that a {@link DoFn} instance might process a bundle partially, then + * crash for some reason, then be rerun (often in a new JVM) on that + * same bundle and on the same elements as before. + * Sometimes two or more {@link DoFn} instances will be running on the * same bundle simultaneously, with the system taking the results of * the first instance to complete successfully. Consequently, the - * code in a {@link OldDoFn} needs to be written such that these + * code in a {@link DoFn} needs to be written such that these * duplicate (sequential or concurrent) executions do not cause - * problems. If the outputs of a {@link OldDoFn} are a pure function of + * problems. If the outputs of a {@link DoFn} are a pure function of * its inputs, then this requirement is satisfied. However, if a - * {@link OldDoFn OldDoFn's} execution has external side-effects, such as performing - * updates to external HTTP services, then the {@link OldDoFn OldDoFn's} code + * {@link DoFn DoFn's} execution has external side-effects, such as performing + * updates to external HTTP services, then the {@link DoFn DoFn's} code * needs to take care to ensure that those updates are idempotent and * that concurrent updates are acceptable. This property can be * difficult to achieve, so it is advisable to strive to keep - * {@link OldDoFn DoFns} as pure functions as much as possible. + * {@link DoFn DoFns} as pure functions as much as possible. * * <h2>Optimization</h2> * - * <p>The Google Cloud Dataflow service automatically optimizes a + * <p>Beam runners may choose to apply optimizations to a * pipeline before it is executed. A key optimization, <i>fusion</i>, * relates to {@link ParDo} operations. If one {@link ParDo} operation produces a * {@link PCollection} that is then consumed as the main input of another @@ -419,18 +417,16 @@ import org.apache.beam.sdk.values.TypedPValue; * written to disk, saving all the I/O and space expense of * constructing it. * - * <p>The Google Cloud Dataflow service applies fusion as much as - * possible, greatly reducing the cost of executing pipelines. As a - * result, it is essentially "free" to write {@link ParDo} operations in a + * <p>When Beam runners apply fusion optimization, it is essentially "free" + * to write {@link ParDo} operations in a * very modular, composable style, each {@link ParDo} operation doing one * clear task, and stringing together sequences of {@link ParDo} operations to * get the desired overall effect. Such programs can be easier to * understand, easier to unit-test, easier to extend and evolve, and * easier to reuse in new programs. The predefined library of - * PTransforms that come with Google Cloud Dataflow makes heavy use of - * this modular, composable style, trusting to the Google Cloud - * Dataflow service's optimizer to "flatten out" all the compositions - * into highly optimized stages. + * PTransforms that come with Beam makes heavy use of + * this modular, composable style, trusting to the runner to + * "flatten out" all the compositions into highly optimized stages. * * @see <a href="https://cloud.google.com/dataflow/model/par-do">the web * documentation for ParDo</a> @@ -443,15 +439,15 @@ public class ParDo { * * <p>Side inputs are {@link PCollectionView PCollectionViews}, whose contents are * computed during pipeline execution and then made accessible to - * {@link OldDoFn} code via {@link OldDoFn.ProcessContext#sideInput sideInput}. Each - * invocation of the {@link OldDoFn} receives the same values for these + * {@link DoFn} code via {@link DoFn.ProcessContext#sideInput sideInput}. Each + * invocation of the {@link DoFn} receives the same values for these * side inputs. * * <p>See the discussion of Side Inputs above for more explanation. * * <p>The resulting {@link PTransform} is incomplete, and its * input/output types are not yet bound. Use - * {@link ParDo.Unbound#of} to specify the {@link OldDoFn} to + * {@link ParDo.Unbound#of} to specify the {@link DoFn} to * invoke, which will also bind the input/output types of this * {@link PTransform}. */ @@ -464,13 +460,13 @@ public class ParDo { * * <p>Side inputs are {@link PCollectionView}s, whose contents are * computed during pipeline execution and then made accessible to - * {@code OldDoFn} code via {@link OldDoFn.ProcessContext#sideInput sideInput}. + * {@link DoFn} code via {@link DoFn.ProcessContext#sideInput sideInput}. * * <p>See the discussion of Side Inputs above for more explanation. * * <p>The resulting {@link PTransform} is incomplete, and its * input/output types are not yet bound. Use - * {@link ParDo.Unbound#of} to specify the {@link OldDoFn} to + * {@link ParDo.Unbound#of} to specify the {@link DoFn} to * invoke, which will also bind the input/output types of this * {@link PTransform}. */ @@ -486,11 +482,11 @@ public class ParDo { * * <p>{@link TupleTag TupleTags} are used to name (with its static element * type {@code T}) each main and side output {@code PCollection<T>}. - * This {@link PTransform PTransform's} {@link OldDoFn} emits elements to the main + * This {@link PTransform PTransform's} {@link DoFn} emits elements to the main * output {@link PCollection} as normal, using - * {@link OldDoFn.Context#output}. The {@link OldDoFn} emits elements to + * {@link DoFn.Context#output}. The {@link DoFn} emits elements to * a side output {@code PCollection} using - * {@link OldDoFn.Context#sideOutput}, passing that side output's tag + * {@link DoFn.Context#sideOutput}, passing that side output's tag * as an argument. The result of invoking this {@link PTransform} * will be a {@link PCollectionTuple}, and any of the the main and * side output {@code PCollection}s can be retrieved from it via @@ -501,7 +497,7 @@ public class ParDo { * * <p>The resulting {@link PTransform} is incomplete, and its input * type is not yet bound. Use {@link ParDo.UnboundMulti#of} - * to specify the {@link OldDoFn} to invoke, which will also bind the + * to specify the {@link DoFn} to invoke, which will also bind the * input type of this {@link PTransform}. */ public static <OutputT> UnboundMulti<OutputT> withOutputTags( @@ -512,6 +508,20 @@ public class ParDo { /** * Creates a {@link ParDo} {@link PTransform} that will invoke the + * given {@link DoFn} function. + * + * <p>The resulting {@link PTransform PTransform's} types have been bound, with the + * input being a {@code PCollection<InputT>} and the output a + * {@code PCollection<OutputT>}, inferred from the types of the argument + * {@code DoFn<InputT, OutputT>}. It is ready to be applied, or further + * properties can be set on it first. + */ + public static <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn) { + return of(adapt(fn), fn.getClass()); + } + + /** + * Creates a {@link ParDo} {@link PTransform} that will invoke the * given {@link OldDoFn} function. * * <p>The resulting {@link PTransform PTransform's} types have been bound, with the @@ -538,28 +548,10 @@ public class ParDo { } /** - * Creates a {@link ParDo} {@link PTransform} that will invoke the - * given {@link DoFn} function. - * - * <p>The resulting {@link PTransform PTransform's} types have been bound, with the - * input being a {@code PCollection<InputT>} and the output a - * {@code PCollection<OutputT>}, inferred from the types of the argument - * {@code OldDoFn<InputT, OutputT>}. It is ready to be applied, or further - * properties can be set on it first. - * - * <p>{@link DoFn} is an experimental alternative to - * {@link OldDoFn} which simplifies accessing the window of the element. - */ - @Experimental - public static <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn) { - return of(adapt(fn), fn.getClass()); - } - - /** * An incomplete {@link ParDo} transform, with unbound input/output types. * * <p>Before being applied, {@link ParDo.Unbound#of} must be - * invoked to specify the {@link OldDoFn} to invoke, which will also + * invoked to specify the {@link DoFn} to invoke, which will also * bind the input/output types of this {@link PTransform}. */ public static class Unbound { @@ -621,6 +613,18 @@ public class ParDo { /** * Returns a new {@link ParDo} {@link PTransform} that's like this + * transform but which will invoke the given {@link DoFn} + * function, and which has its input and output types bound. Does + * not modify this transform. The resulting {@link PTransform} is + * sufficiently specified to be applied, but more properties can + * still be specified. + */ + public <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn) { + return of(adapt(fn), fn.getClass()); + } + + /** + * Returns a new {@link ParDo} {@link PTransform} that's like this * transform but that will invoke the given {@link OldDoFn} * function, and that has its input and output types bound. Does * not modify this transform. The resulting {@link PTransform} is @@ -638,24 +642,11 @@ public class ParDo { OldDoFn<InputT, OutputT> fn, Class<?> fnClass) { return new Bound<>(name, sideInputs, fn, fnClass); } - - - /** - * Returns a new {@link ParDo} {@link PTransform} that's like this - * transform but which will invoke the given {@link DoFn} - * function, and which has its input and output types bound. Does - * not modify this transform. The resulting {@link PTransform} is - * sufficiently specified to be applied, but more properties can - * still be specified. - */ - public <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn) { - return of(adapt(fn), fn.getClass()); - } } /** * A {@link PTransform} that, when applied to a {@code PCollection<InputT>}, - * invokes a user-specified {@code OldDoFn<InputT, OutputT>} on all its elements, + * invokes a user-specified {@code DoFn<InputT, OutputT>} on all its elements, * with all its outputs collected into an output * {@code PCollection<OutputT>}. * @@ -756,9 +747,9 @@ public class ParDo { /** * {@inheritDoc} * - * <p>{@link ParDo} registers its internal {@link OldDoFn} as a subcomponent for display data. - * {@link OldDoFn} implementations can register display data by overriding - * {@link OldDoFn#populateDisplayData}. + * <p>{@link ParDo} registers its internal {@link DoFn} as a subcomponent for display data. + * {@link DoFn} implementations can register display data by overriding + * {@link DoFn#populateDisplayData}. */ @Override public void populateDisplayData(Builder builder) { @@ -780,7 +771,7 @@ public class ParDo { * input type. * * <p>Before being applied, {@link ParDo.UnboundMulti#of} must be - * invoked to specify the {@link OldDoFn} to invoke, which will also + * invoked to specify the {@link DoFn} to invoke, which will also * bind the input type of this {@link PTransform}. * * @param <OutputT> the type of the main output {@code PCollection} elements @@ -836,38 +827,41 @@ public class ParDo { /** * Returns a new multi-output {@link ParDo} {@link PTransform} - * that's like this transform but that will invoke the given - * {@link OldDoFn} function, and that has its input type bound. + * that's like this transform but which will invoke the given + * {@link DoFn} function, and which has its input type bound. * Does not modify this transform. The resulting * {@link PTransform} is sufficiently specified to be applied, but * more properties can still be specified. */ - public <InputT> BoundMulti<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn) { - return of(fn, fn.getClass()); - } - - public <InputT> BoundMulti<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn, Class<?> fnClass) { - return new BoundMulti<>( - name, sideInputs, mainOutputTag, sideOutputTags, fn, fnClass); + public <InputT> BoundMulti<InputT, OutputT> of(DoFn<InputT, OutputT> fn) { + return of(adapt(fn), fn.getClass()); } /** * Returns a new multi-output {@link ParDo} {@link PTransform} - * that's like this transform but which will invoke the given - * {@link DoFn} function, and which has its input type bound. + * that's like this transform but that will invoke the given + * {@link OldDoFn} function, and that has its input type bound. * Does not modify this transform. The resulting * {@link PTransform} is sufficiently specified to be applied, but * more properties can still be specified. + * + * @deprecated please port your {@link OldDoFn} to a {@link DoFn} */ - public <InputT> BoundMulti<InputT, OutputT> of(DoFn<InputT, OutputT> fn) { - return of(adapt(fn), fn.getClass()); + @Deprecated + public <InputT> BoundMulti<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn) { + return of(fn, fn.getClass()); + } + + private <InputT> BoundMulti<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn, Class<?> fnClass) { + return new BoundMulti<>( + name, sideInputs, mainOutputTag, sideOutputTags, fn, fnClass); } } /** * A {@link PTransform} that, when applied to a * {@code PCollection<InputT>}, invokes a user-specified - * {@code OldDoFn<InputT, OutputT>} on all its elements, which can emit elements + * {@code DoFn<InputT, OutputT>} on all its elements, which can emit elements * to any of the {@link PTransform}'s main and side output * {@code PCollection}s, which are bundled into a result * {@code PCollectionTuple}. @@ -939,7 +933,7 @@ public class ParDo { input.isBounded()); // The fn will likely be an instance of an anonymous subclass - // such as OldDoFn<Integer, String> { }, thus will have a high-fidelity + // such as DoFn<Integer, String> { }, thus will have a high-fidelity // TypeDescriptor for the output type. outputs.get(mainOutputTag).setTypeDescriptorInternal(fn.getOutputTypeDescriptor()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java index bf075f8..8604659 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java @@ -37,7 +37,7 @@ public abstract class SimpleFunction<InputT, OutputT> /** * Returns a {@link TypeDescriptor} capturing what is known statically - * about the input type of this {@code OldDoFn} instance's most-derived + * about the input type of this {@link SimpleFunction} instance's most-derived * class. * * <p>See {@link #getOutputTypeDescriptor} for more discussion. @@ -48,10 +48,10 @@ public abstract class SimpleFunction<InputT, OutputT> /** * Returns a {@link TypeDescriptor} capturing what is known statically - * about the output type of this {@code OldDoFn} instance's + * about the output type of this {@link SimpleFunction} instance's * most-derived class. * - * <p>In the normal case of a concrete {@code OldDoFn} subclass with + * <p>In the normal case of a concrete {@link SimpleFunction} subclass with * no generic type parameters of its own (including anonymous inner * classes), this will be a complete non-generic type, which is good * for choosing a default output {@code Coder<OutputT>} for the output http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java index 0c87e22..727a492 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java @@ -29,15 +29,15 @@ import java.util.Objects; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.util.VarInt; /** * Provides information about the pane an element belongs to. Every pane is implicitly associated * with a window. Panes are observable only via the - * {@link OldDoFn.ProcessContext#pane} method of the context - * passed to a {@link OldDoFn#processElement} overridden method. + * {@link DoFn.ProcessContext#pane} method of the context + * passed to a {@link DoFn.ProcessElement} method. * * <p>Note: This does not uniquely identify a pane, and should not be used for comparisons. */ @@ -72,8 +72,8 @@ public final class PaneInfo { * definitions: * <ol> * <li>We'll call a pipeline 'simple' if it does not use - * {@link OldDoFn.Context#outputWithTimestamp} in - * any {@code OldDoFn}, and it uses the same + * {@link DoFn.Context#outputWithTimestamp} in + * any {@link DoFn}, and it uses the same * {@link org.apache.beam.sdk.transforms.windowing.Window.Bound#withAllowedLateness} * argument value on all windows (or uses the default of {@link org.joda.time.Duration#ZERO}). * <li>We'll call an element 'locally late', from the point of view of a computation on a http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java index dead76e..9ee55ad 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java @@ -106,7 +106,7 @@ public abstract class BaseExecutionContext<T extends ExecutionContext.StepContex /** * Hook for subclasses to implement that will be called whenever - * {@link OldDoFn.Context#output} + * {@code DoFn.Context#output} * is called. */ @Override @@ -114,7 +114,7 @@ public abstract class BaseExecutionContext<T extends ExecutionContext.StepContex /** * Hook for subclasses to implement that will be called whenever - * {@link OldDoFn.Context#sideOutput} + * {@code DoFn.Context#sideOutput} * is called. */ @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java index 2808ca9..8f3f540 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java @@ -21,8 +21,8 @@ import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.values.KV; /** - * OldDoFn that makes timestamps and window assignments explicit in the value part of each key/value - * pair. + * {@link OldDoFn} that makes timestamps and window assignments explicit in the value part of each + * key/value pair. * * @param <K> the type of the keys of the input and output {@code PCollection}s * @param <V> the type of the values of the input {@code PCollection} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java index 354aa5d..6b3218e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java @@ -102,7 +102,7 @@ public class SerializableUtils { */ public static CloudObject ensureSerializable(Coder<?> coder) { // Make sure that Coders are java serializable as well since - // they are regularly captured within OldDoFn's. + // they are regularly captured within DoFn's. Coder<?> copy = (Coder<?>) ensureSerializable((Serializable) coder); CloudObject cloudObject = copy.asCloudObject(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemDoFnInternal.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemDoFnInternal.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemDoFnInternal.java index e9904b2..004496b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemDoFnInternal.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemDoFnInternal.java @@ -22,15 +22,14 @@ import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; -import org.apache.beam.sdk.transforms.OldDoFn; /** - * Annotation to mark {@link OldDoFn DoFns} as an internal component of the Dataflow SDK. + * Annotation to mark {@code DoFns} as an internal component of the Beam SDK. * * <p>Currently, the only effect of this is to mark any aggregators reported by an annotated - * {@code OldDoFn} as a system counter (as opposed to a user counter). + * {@code DoFn} as a system counter (as opposed to a user counter). * - * <p>This is internal to the Dataflow SDK. + * <p>This is internal to the Beam SDK. */ @Documented @Retention(RetentionPolicy.RUNTIME) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java index 54158d2..016276c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.util; import java.io.IOException; import java.util.Collection; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.state.StateInternals; @@ -28,7 +29,7 @@ import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Instant; /** - * Interface that may be required by some (internal) {@code OldDoFn}s to implement windowing. It + * Interface that may be required by some (internal) {@link DoFn}s to implement windowing. It * should not be necessary for general user code to interact with this at all. * * <p>This interface should be provided by runner implementors to support windowing on their runner. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java index 25b909a..c072fd7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java @@ -35,7 +35,7 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; /** - * Tests for OldDoFn.DelegatingAggregator. + * Tests for {@link OldDoFn.DelegatingAggregator}. */ @RunWith(JUnit4.class) public class DoFnDelegatingAggregatorTest { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java index 2f1519c..2649be5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java @@ -276,7 +276,8 @@ public class DoFnTesterTest { } /** - * A OldDoFn that adds values to an aggregator and converts input to String in processElement. + * An {@link OldDoFn} that adds values to an aggregator and converts input to String in + * {@link OldDoFn#processElement). */ private static class CounterDoFn extends OldDoFn<Long, String> { Aggregator<Long, Long> agg = createAggregator("ctr", new Sum.SumLongFn()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e6230cc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java index c732510..302b66a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java @@ -28,7 +28,7 @@ import org.joda.time.Instant; /** * A {@link OldDoFn} that does nothing with provided elements. Used for testing - * methods provided by the OldDoFn abstract class. + * methods provided by the {@link OldDoFn} abstract class. * * @param <InputT> unused. * @param <OutputT> unused.
