Rename DoFnWithContext to DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3bcb6f46 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3bcb6f46 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3bcb6f46 Branch: refs/heads/master Commit: 3bcb6f46ad0ae483d1d8785edc2d9d5846c71a73 Parents: e160966 Author: Kenneth Knowles <k...@google.com> Authored: Fri Jul 22 14:10:01 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Wed Aug 3 18:25:52 2016 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/transforms/DoFn.java | 429 +++++++++++++++++++ .../beam/sdk/transforms/DoFnReflector.java | 84 ++-- .../apache/beam/sdk/transforms/DoFnTester.java | 2 +- .../beam/sdk/transforms/DoFnWithContext.java | 429 ------------------- .../org/apache/beam/sdk/transforms/OldDoFn.java | 2 +- .../org/apache/beam/sdk/transforms/ParDo.java | 16 +- .../beam/sdk/transforms/DoFnReflectorTest.java | 86 ++-- .../apache/beam/sdk/transforms/DoFnTest.java | 237 ++++++++++ .../sdk/transforms/DoFnWithContextTest.java | 237 ---------- .../apache/beam/sdk/transforms/ParDoTest.java | 12 +- .../dofnreflector/DoFnReflectorTestHelper.java | 26 +- .../transforms/DoFnReflectorBenchmark.java | 30 +- 12 files changed, 795 insertions(+), 795 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bcb6f46/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java new file mode 100644 index 0000000..eb6753c --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.OldDoFn.DelegatingAggregator; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowingInternals; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TypeDescriptor; + +import org.joda.time.Duration; +import org.joda.time.Instant; + +import java.io.Serializable; +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import java.util.HashMap; +import java.util.Map; + +/** + * The argument to {@link ParDo} providing the code to use to process + * elements of the input + * {@link org.apache.beam.sdk.values.PCollection}. + * + * <p>See {@link ParDo} for more explanation, examples of use, and + * discussion of constraints on {@code DoFn}s, including their + * serializability, lack of access to global shared mutable state, + * requirements for failure tolerance, and benefits of optimization. + * + * <p>{@code DoFn}s can be tested in a particular + * {@code Pipeline} by running that {@code Pipeline} on sample input + * and then checking its output. Unit testing of a {@code DoFn}, + * separately from any {@code ParDo} transform or {@code Pipeline}, + * can be done via the {@link DoFnTester} harness. + * + * <p>Implementations must define a method annotated with {@link ProcessElement} + * that satisfies the requirements described there. See the {@link ProcessElement} + * for details. + * + * <p>This functionality is experimental and likely to change. + * + * <p>Example usage: + * + * <pre> {@code + * PCollection<String> lines = ... ; + * PCollection<String> words = + * lines.apply(ParDo.of(new DoFn<String, String>() { + * @ProcessElement + * public void processElement(ProcessContext c, BoundedWindow window) { + * + * }})); + * } </pre> + * + * @param <InputT> the type of the (main) input elements + * @param <OutputT> the type of the (main) output elements + */ +@Experimental +public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayData { + + /** Information accessible to all methods in this {@code DoFn}. */ + public abstract class Context { + + /** + * Returns the {@code PipelineOptions} specified with the + * {@link org.apache.beam.sdk.runners.PipelineRunner} + * invoking this {@code DoFn}. The {@code PipelineOptions} will + * be the default running via {@link DoFnTester}. + */ + public abstract PipelineOptions getPipelineOptions(); + + /** + * Adds the given element to the main output {@code PCollection}. + * + * <p>Once passed to {@code output} the element should not be modified in + * any way. + * + * <p>If invoked from {@link ProcessElement}, the output + * element will have the same timestamp and be in the same windows + * as the input element passed to the method annotated with + * {@code @ProcessElement}. + * + * <p>If invoked from {@link StartBundle} or {@link FinishBundle}, + * this will attempt to use the + * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} + * of the input {@code PCollection} to determine what windows the element + * should be in, throwing an exception if the {@code WindowFn} attempts + * to access any information about the input element. The output element + * will have a timestamp of negative infinity. + */ + public abstract void output(OutputT output); + + /** + * Adds the given element to the main output {@code PCollection}, + * with the given timestamp. + * + * <p>Once passed to {@code outputWithTimestamp} the element should not be + * modified in any way. + * + * <p>If invoked from {@link ProcessElement}), the timestamp + * must not be older than the input element's timestamp minus + * {@link OldDoFn#getAllowedTimestampSkew}. The output element will + * be in the same windows as the input element. + * + * <p>If invoked from {@link StartBundle} or {@link FinishBundle}, + * this will attempt to use the + * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} + * of the input {@code PCollection} to determine what windows the element + * should be in, throwing an exception if the {@code WindowFn} attempts + * to access any information about the input element except for the + * timestamp. + */ + public abstract void outputWithTimestamp(OutputT output, Instant timestamp); + + /** + * Adds the given element to the side output {@code PCollection} with the + * given tag. + * + * <p>Once passed to {@code sideOutput} the element should not be modified + * in any way. + * + * <p>The caller of {@code ParDo} uses {@link ParDo#withOutputTags} to + * specify the tags of side outputs that it consumes. Non-consumed side + * outputs, e.g., outputs for monitoring purposes only, don't necessarily + * need to be specified. + * + * <p>The output element will have the same timestamp and be in the same + * windows as the input element passed to {@link ProcessElement}). + * + * <p>If invoked from {@link StartBundle} or {@link FinishBundle}, + * this will attempt to use the + * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} + * of the input {@code PCollection} to determine what windows the element + * should be in, throwing an exception if the {@code WindowFn} attempts + * to access any information about the input element. The output element + * will have a timestamp of negative infinity. + * + * @see ParDo#withOutputTags + */ + public abstract <T> void sideOutput(TupleTag<T> tag, T output); + + /** + * Adds the given element to the specified side output {@code PCollection}, + * with the given timestamp. + * + * <p>Once passed to {@code sideOutputWithTimestamp} the element should not be + * modified in any way. + * + * <p>If invoked from {@link ProcessElement}), the timestamp + * must not be older than the input element's timestamp minus + * {@link OldDoFn#getAllowedTimestampSkew}. The output element will + * be in the same windows as the input element. + * + * <p>If invoked from {@link StartBundle} or {@link FinishBundle}, + * this will attempt to use the + * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} + * of the input {@code PCollection} to determine what windows the element + * should be in, throwing an exception if the {@code WindowFn} attempts + * to access any information about the input element except for the + * timestamp. + * + * @see ParDo#withOutputTags + */ + public abstract <T> void sideOutputWithTimestamp( + TupleTag<T> tag, T output, Instant timestamp); + } + + /** + * Information accessible when running {@link OldDoFn#processElement}. + */ + public abstract class ProcessContext extends Context { + + /** + * Returns the input element to be processed. + * + * <p>The element will not be changed -- it is safe to cache, etc. + * without copying. + */ + public abstract InputT element(); + + + /** + * Returns the value of the side input. + * + * @throws IllegalArgumentException if this is not a side input + * @see ParDo#withSideInputs + */ + public abstract <T> T sideInput(PCollectionView<T> view); + + /** + * Returns the timestamp of the input element. + * + * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window} + * for more information. + */ + public abstract Instant timestamp(); + + /** + * Returns information about the pane within this window into which the + * input element has been assigned. + * + * <p>Generally all data is in a single, uninteresting pane unless custom + * triggering and/or late data has been explicitly requested. + * See {@link org.apache.beam.sdk.transforms.windowing.Window} + * for more information. + */ + public abstract PaneInfo pane(); + } + + /** + * Returns the allowed timestamp skew duration, which is the maximum + * duration that timestamps can be shifted backward in + * {@link DoFn.Context#outputWithTimestamp}. + * + * <p>The default value is {@code Duration.ZERO}, in which case + * timestamps can only be shifted forward to future. For infinite + * skew, return {@code Duration.millis(Long.MAX_VALUE)}. + */ + public Duration getAllowedTimestampSkew() { + return Duration.ZERO; + } + + ///////////////////////////////////////////////////////////////////////////// + + Map<String, DelegatingAggregator<?, ?>> aggregators = new HashMap<>(); + + /** + * Protects aggregators from being created after initialization. + */ + private boolean aggregatorsAreFinal; + + /** + * Returns a {@link TypeDescriptor} capturing what is known statically + * about the input type of this {@code DoFn} instance's most-derived + * class. + * + * <p>See {@link #getOutputTypeDescriptor} for more discussion. + */ + protected TypeDescriptor<InputT> getInputTypeDescriptor() { + return new TypeDescriptor<InputT>(getClass()) {}; + } + + /** + * Returns a {@link TypeDescriptor} capturing what is known statically + * about the output type of this {@code DoFn} instance's + * most-derived class. + * + * <p>In the normal case of a concrete {@code DoFn} subclass with + * no generic type parameters of its own (including anonymous inner + * classes), this will be a complete non-generic type, which is good + * for choosing a default output {@code Coder<O>} for the output + * {@code PCollection<O>}. + */ + protected TypeDescriptor<OutputT> getOutputTypeDescriptor() { + return new TypeDescriptor<OutputT>(getClass()) {}; + } + + /** + * Interface for runner implementors to provide implementations of extra context information. + * + * <p>The methods on this interface are called by {@link DoFnReflector} before invoking an + * annotated {@link StartBundle}, {@link ProcessElement} or {@link FinishBundle} method that + * has indicated it needs the given extra context. + * + * <p>In the case of {@link ProcessElement} it is called once per invocation of + * {@link ProcessElement}. + */ + public interface ExtraContextFactory<InputT, OutputT> { + /** + * Construct the {@link BoundedWindow} to use within a {@link DoFn} that + * needs it. This is called if the {@link ProcessElement} method has a parameter of type + * {@link BoundedWindow}. + * + * @return {@link BoundedWindow} of the element currently being processed. + */ + BoundedWindow window(); + + /** + * Construct the {@link WindowingInternals} to use within a {@link DoFn} that + * needs it. This is called if the {@link ProcessElement} method has a parameter of type + * {@link WindowingInternals}. + */ + WindowingInternals<InputT, OutputT> windowingInternals(); + } + + ///////////////////////////////////////////////////////////////////////////// + + /** + * Annotation for the method to use to prepare an instance for processing a batch of elements. + * The method annotated with this must satisfy the following constraints: + * <ul> + * <li>It must have at least one argument. + * <li>Its first (and only) argument must be a {@link DoFn.Context}. + * </ul> + */ + @Documented + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.METHOD) + public @interface StartBundle {} + + /** + * Annotation for the method to use for processing elements. A subclass of + * {@link DoFn} must have a method with this annotation satisfying + * the following constraints in order for it to be executable: + * <ul> + * <li>It must have at least one argument. + * <li>Its first argument must be a {@link DoFn.ProcessContext}. + * <li>Its remaining arguments must be {@link BoundedWindow}, or + * {@link WindowingInternals WindowingInternals<InputT, OutputT>}. + * </ul> + */ + @Documented + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.METHOD) + public @interface ProcessElement {} + + /** + * Annotation for the method to use to prepare an instance for processing a batch of elements. + * The method annotated with this must satisfy the following constraints: + * <ul> + * <li>It must have at least one argument. + * <li>Its first (and only) argument must be a {@link DoFn.Context}. + * </ul> + */ + @Documented + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.METHOD) + public @interface FinishBundle {} + + /** + * Returns an {@link Aggregator} with aggregation logic specified by the + * {@link CombineFn} argument. The name provided must be unique across + * {@link Aggregator}s created within the OldDoFn. Aggregators can only be created + * during pipeline construction. + * + * @param name the name of the aggregator + * @param combiner the {@link CombineFn} to use in the aggregator + * @return an aggregator for the provided name and combiner in the scope of + * this OldDoFn + * @throws NullPointerException if the name or combiner is null + * @throws IllegalArgumentException if the given name collides with another + * aggregator in this scope + * @throws IllegalStateException if called during pipeline execution. + */ + public final <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> + createAggregator(String name, Combine.CombineFn<? super AggInputT, ?, AggOutputT> combiner) { + checkNotNull(name, "name cannot be null"); + checkNotNull(combiner, "combiner cannot be null"); + checkArgument(!aggregators.containsKey(name), + "Cannot create aggregator with name %s." + + " An Aggregator with that name already exists within this scope.", + name); + checkState(!aggregatorsAreFinal, + "Cannot create an aggregator during pipeline execution." + + " Aggregators should be registered during pipeline construction."); + + DelegatingAggregator<AggInputT, AggOutputT> aggregator = + new DelegatingAggregator<>(name, combiner); + aggregators.put(name, aggregator); + return aggregator; + } + + /** + * Returns an {@link Aggregator} with the aggregation logic specified by the + * {@link SerializableFunction} argument. The name provided must be unique + * across {@link Aggregator}s created within the OldDoFn. Aggregators can only be + * created during pipeline construction. + * + * @param name the name of the aggregator + * @param combiner the {@link SerializableFunction} to use in the aggregator + * @return an aggregator for the provided name and combiner in the scope of + * this OldDoFn + * @throws NullPointerException if the name or combiner is null + * @throws IllegalArgumentException if the given name collides with another + * aggregator in this scope + * @throws IllegalStateException if called during pipeline execution. + */ + public final <AggInputT> Aggregator<AggInputT, AggInputT> createAggregator( + String name, SerializableFunction<Iterable<AggInputT>, AggInputT> combiner) { + checkNotNull(combiner, "combiner cannot be null."); + return createAggregator(name, Combine.IterableCombineFn.of(combiner)); + } + + /** + * Finalize the {@link DoFn} construction to prepare for processing. + * This method should be called by runners before any processing methods. + */ + public void prepareForProcessing() { + aggregatorsAreFinal = true; + } + + /** + * {@inheritDoc} + * + * <p>By default, does not register any display data. Implementors may override this method + * to provide their own display data. + */ + @Override + public void populateDisplayData(DisplayData.Builder builder) { + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bcb6f46/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java index d8d4181..b504cb4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java @@ -18,10 +18,10 @@ package org.apache.beam.sdk.transforms; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.DoFnWithContext.ExtraContextFactory; -import org.apache.beam.sdk.transforms.DoFnWithContext.FinishBundle; -import org.apache.beam.sdk.transforms.DoFnWithContext.ProcessElement; -import org.apache.beam.sdk.transforms.DoFnWithContext.StartBundle; +import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory; +import org.apache.beam.sdk.transforms.DoFn.FinishBundle; +import org.apache.beam.sdk.transforms.DoFn.ProcessElement; +import org.apache.beam.sdk.transforms.DoFn.StartBundle; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -94,7 +94,7 @@ import javax.annotation.Nullable; /** - * Utility implementing the necessary reflection for working with {@link DoFnWithContext}s. + * Utility implementing the necessary reflection for working with {@link DoFn}s. */ public abstract class DoFnReflector { @@ -109,7 +109,7 @@ public abstract class DoFnReflector { /** * Enumeration of the parameters available from the {@link ExtraContextFactory} to use as - * additional parameters for {@link DoFnWithContext} methods. + * additional parameters for {@link DoFn} methods. * <p> * We don't rely on looking for properly annotated methods within {@link ExtraContextFactory} * because erasure would make it impossible to completely fill in the type token for context @@ -139,7 +139,7 @@ public abstract class DoFnReflector { /** * Create a type token representing the given parameter. May use the type token associated - * with the input and output types of the {@link DoFnWithContext}, depending on the extra + * with the input and output types of the {@link DoFn}, depending on the extra * context. */ abstract <InputT, OutputT> TypeToken<?> tokenFor( @@ -190,22 +190,22 @@ public abstract class DoFnReflector { } /** - * @return true if the reflected {@link DoFnWithContext} uses a Single Window. + * @return true if the reflected {@link DoFn} uses a Single Window. */ public abstract boolean usesSingleWindow(); /** Create an {@link DoFnInvoker} bound to the given {@link OldDoFn}. */ public abstract <InputT, OutputT> DoFnInvoker<InputT, OutputT> bindInvoker( - DoFnWithContext<InputT, OutputT> fn); + DoFn<InputT, OutputT> fn); private static final Map<Class<?>, DoFnReflector> REFLECTOR_CACHE = new LinkedHashMap<Class<?>, DoFnReflector>(); /** - * @return the {@link DoFnReflector} for the given {@link DoFnWithContext}. + * @return the {@link DoFnReflector} for the given {@link DoFn}. */ public static DoFnReflector of( - @SuppressWarnings("rawtypes") Class<? extends DoFnWithContext> fn) { + @SuppressWarnings("rawtypes") Class<? extends DoFn> fn) { DoFnReflector reflector = REFLECTOR_CACHE.get(fn); if (reflector != null) { return reflector; @@ -217,9 +217,9 @@ public abstract class DoFnReflector { } /** - * Create a {@link OldDoFn} that the {@link DoFnWithContext}. + * Create a {@link OldDoFn} that the {@link DoFn}. */ - public <InputT, OutputT> OldDoFn<InputT, OutputT> toDoFn(DoFnWithContext<InputT, OutputT> fn) { + public <InputT, OutputT> OldDoFn<InputT, OutputT> toDoFn(DoFn<InputT, OutputT> fn) { if (usesSingleWindow()) { return new WindowDoFnAdapter<InputT, OutputT>(this, fn); } else { @@ -259,7 +259,7 @@ public abstract class DoFnReflector { static <InputT, OutputT> List<AdditionalParameter> verifyProcessMethodArguments(Method m) { return verifyMethodArguments(m, EXTRA_PROCESS_CONTEXTS, - new TypeToken<DoFnWithContext<InputT, OutputT>.ProcessContext>() {}, + new TypeToken<DoFn<InputT, OutputT>.ProcessContext>() {}, new TypeParameter<InputT>() {}, new TypeParameter<OutputT>() {}); } @@ -271,13 +271,13 @@ public abstract class DoFnReflector { } return verifyMethodArguments(m, EXTRA_CONTEXTS, - new TypeToken<DoFnWithContext<InputT, OutputT>.Context>() {}, + new TypeToken<DoFn<InputT, OutputT>.Context>() {}, new TypeParameter<InputT>() {}, new TypeParameter<OutputT>() {}); } /** - * Verify the method arguments for a given {@link DoFnWithContext} method. + * Verify the method arguments for a given {@link DoFn} method. * * <p>The requirements for a method to be valid, are: * <ol> @@ -330,7 +330,7 @@ public abstract class DoFnReflector { // Fill in the generics in the allExtraContextArgs interface from the types in the // Context or ProcessContext OldDoFn. ParameterizedType pt = (ParameterizedType) contextToken.getType(); - // We actually want the owner, since ProcessContext and Context are owned by DoFnWithContext. + // We actually want the owner, since ProcessContext and Context are owned by DoFn. pt = (ParameterizedType) pt.getOwnerType(); @SuppressWarnings("unchecked") TypeToken<InputT> iActual = (TypeToken<InputT>) TypeToken.of(pt.getActualTypeArguments()[0]); @@ -368,21 +368,21 @@ public abstract class DoFnReflector { public interface DoFnInvoker<InputT, OutputT> { /** Invoke {@link OldDoFn#startBundle} on the bound {@code OldDoFn}. */ void invokeStartBundle( - DoFnWithContext<InputT, OutputT>.Context c, + DoFn<InputT, OutputT>.Context c, ExtraContextFactory<InputT, OutputT> extra); /** Invoke {@link OldDoFn#finishBundle} on the bound {@code OldDoFn}. */ void invokeFinishBundle( - DoFnWithContext<InputT, OutputT>.Context c, + DoFn<InputT, OutputT>.Context c, ExtraContextFactory<InputT, OutputT> extra); /** Invoke {@link OldDoFn#processElement} on the bound {@code OldDoFn}. */ public void invokeProcessElement( - DoFnWithContext<InputT, OutputT>.ProcessContext c, + DoFn<InputT, OutputT>.ProcessContext c, ExtraContextFactory<InputT, OutputT> extra); } /** - * Implementation of {@link DoFnReflector} for the arbitrary {@link DoFnWithContext}. + * Implementation of {@link DoFnReflector} for the arbitrary {@link DoFn}. */ private static class GenericDoFnReflector extends DoFnReflector { @@ -395,7 +395,7 @@ public abstract class DoFnReflector { private final Constructor<?> constructor; private GenericDoFnReflector( - @SuppressWarnings("rawtypes") Class<? extends DoFnWithContext> fn) { + @SuppressWarnings("rawtypes") Class<? extends DoFn> fn) { // Locate the annotated methods this.processElement = findAnnotatedMethod(ProcessElement.class, fn, true); this.startBundle = findAnnotatedMethod(StartBundle.class, fn, false); @@ -442,7 +442,7 @@ public abstract class DoFnReflector { private static Method findAnnotatedMethod( Class<? extends Annotation> anno, Class<?> fnClazz, boolean required) { Collection<Method> matches = declaredMethodsWithAnnotation( - anno, fnClazz, DoFnWithContext.class); + anno, fnClazz, DoFn.class); if (matches.size() == 0) { if (required == true) { @@ -493,12 +493,12 @@ public abstract class DoFnReflector { /** * Use ByteBuddy to generate the code for a {@link DoFnInvoker} that invokes the given - * {@link DoFnWithContext}. + * {@link DoFn}. * @param clazz * @return */ private Constructor<? extends DoFnInvoker<?, ?>> createInvokerConstructor( - @SuppressWarnings("rawtypes") Class<? extends DoFnWithContext> clazz) { + @SuppressWarnings("rawtypes") Class<? extends DoFn> clazz) { final TypeDescription clazzDescription = new TypeDescription.ForLoadedType(clazz); @@ -545,7 +545,7 @@ public abstract class DoFnReflector { @Override public <InputT, OutputT> DoFnInvoker<InputT, OutputT> bindInvoker( - DoFnWithContext<InputT, OutputT> fn) { + DoFn<InputT, OutputT> fn) { try { @SuppressWarnings("unchecked") DoFnInvoker<InputT, OutputT> invoker = @@ -562,13 +562,13 @@ public abstract class DoFnReflector { } private static class ContextAdapter<InputT, OutputT> - extends DoFnWithContext<InputT, OutputT>.Context - implements DoFnWithContext.ExtraContextFactory<InputT, OutputT> { + extends DoFn<InputT, OutputT>.Context + implements DoFn.ExtraContextFactory<InputT, OutputT> { private OldDoFn<InputT, OutputT>.Context context; private ContextAdapter( - DoFnWithContext<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.Context context) { + DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.Context context) { fn.super(); this.context = context; } @@ -600,14 +600,14 @@ public abstract class DoFnReflector { @Override public BoundedWindow window() { - // The DoFnWithContext doesn't allow us to ask for these outside ProcessElements, so this + // The DoFn doesn't allow us to ask for these outside ProcessElements, so this // should be unreachable. throw new UnsupportedOperationException("Can only get the window in ProcessElements"); } @Override public WindowingInternals<InputT, OutputT> windowingInternals() { - // The DoFnWithContext doesn't allow us to ask for these outside ProcessElements, so this + // The DoFn doesn't allow us to ask for these outside ProcessElements, so this // should be unreachable. throw new UnsupportedOperationException( "Can only get the windowingInternals in ProcessElements"); @@ -615,13 +615,13 @@ public abstract class DoFnReflector { } private static class ProcessContextAdapter<InputT, OutputT> - extends DoFnWithContext<InputT, OutputT>.ProcessContext - implements DoFnWithContext.ExtraContextFactory<InputT, OutputT> { + extends DoFn<InputT, OutputT>.ProcessContext + implements DoFn.ExtraContextFactory<InputT, OutputT> { private OldDoFn<InputT, OutputT>.ProcessContext context; private ProcessContextAdapter( - DoFnWithContext<InputT, OutputT> fn, + DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.ProcessContext context) { fn.super(); this.context = context; @@ -693,10 +693,10 @@ public abstract class DoFnReflector { private static class SimpleDoFnAdapter<InputT, OutputT> extends OldDoFn<InputT, OutputT> { - private final DoFnWithContext<InputT, OutputT> fn; + private final DoFn<InputT, OutputT> fn; private transient DoFnInvoker<InputT, OutputT> invoker; - private SimpleDoFnAdapter(DoFnReflector reflector, DoFnWithContext<InputT, OutputT> fn) { + private SimpleDoFnAdapter(DoFnReflector reflector, DoFn<InputT, OutputT> fn) { super(fn.aggregators); this.fn = fn; this.invoker = reflector.bindInvoker(fn); @@ -745,7 +745,7 @@ public abstract class DoFnReflector { private static class WindowDoFnAdapter<InputT, OutputT> extends SimpleDoFnAdapter<InputT, OutputT> implements OldDoFn.RequiresWindowAccess { - private WindowDoFnAdapter(DoFnReflector reflector, DoFnWithContext<InputT, OutputT> fn) { + private WindowDoFnAdapter(DoFnReflector reflector, DoFn<InputT, OutputT> fn) { super(reflector, fn); } } @@ -770,7 +770,7 @@ public abstract class DoFnReflector { try { prepareMethod = new MethodLocator.ForExplicitMethod( new MethodDescription.ForLoadedMethod( - DoFnWithContext.class.getDeclaredMethod("prepareForProcessing"))) + DoFn.class.getDeclaredMethod("prepareForProcessing"))) .resolve(instrumentedMethod); } catch (NoSuchMethodException | SecurityException e) { throw new RuntimeException("Unable to locate prepareForProcessing method", e); @@ -817,7 +817,7 @@ public abstract class DoFnReflector { /** * A byte-buddy {@link Implementation} that delegates a call that receives - * {@link AdditionalParameter} to the given {@link DoFnWithContext} method. + * {@link AdditionalParameter} to the given {@link DoFn} method. */ private static final class InvokerDelegation implements Implementation { @Nullable @@ -845,7 +845,7 @@ public abstract class DoFnReflector { /** * Generate the {@link Implementation} of one of the life-cycle methods of a - * {@link DoFnWithContext}. + * {@link DoFn}. */ private static Implementation create( @Nullable final Method target, BeforeDelegation before, List<AdditionalParameter> args) { @@ -869,7 +869,7 @@ public abstract class DoFnReflector { } /** - * Stack manipulation to push the {@link DoFnWithContext} reference stored in the + * Stack manipulation to push the {@link DoFn} reference stored in the * delegate field of the invoker on to the top of the stack. * * <p>This implementation is derived from the code for @@ -1018,7 +1018,7 @@ public abstract class DoFnReflector { /** * A constructor {@link Implementation} for a {@link DoFnInvoker class}. Produces the byte code * for a constructor that takes a single argument and assigns it to the delegate field. - * {@link AdditionalParameter} to the given {@link DoFnWithContext} method. + * {@link AdditionalParameter} to the given {@link DoFn} method. */ private static final class InvokerConstructor implements Implementation { @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bcb6f46/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index 9336e4c..f44a9ae 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -90,7 +90,7 @@ public class DoFnTester<InputT, OutputT> { */ @SuppressWarnings("unchecked") public static <InputT, OutputT> DoFnTester<InputT, OutputT> - of(DoFnWithContext<InputT, OutputT> fn) { + of(DoFn<InputT, OutputT> fn) { return new DoFnTester<InputT, OutputT>(DoFnReflector.of(fn.getClass()).toDoFn(fn)); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bcb6f46/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnWithContext.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnWithContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnWithContext.java deleted file mode 100644 index b27163a..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnWithContext.java +++ /dev/null @@ -1,429 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.OldDoFn.DelegatingAggregator; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.display.HasDisplayData; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.WindowingInternals; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TypeDescriptor; - -import org.joda.time.Duration; -import org.joda.time.Instant; - -import java.io.Serializable; -import java.lang.annotation.Documented; -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; -import java.util.HashMap; -import java.util.Map; - -/** - * The argument to {@link ParDo} providing the code to use to process - * elements of the input - * {@link org.apache.beam.sdk.values.PCollection}. - * - * <p>See {@link ParDo} for more explanation, examples of use, and - * discussion of constraints on {@code DoFnWithContext}s, including their - * serializability, lack of access to global shared mutable state, - * requirements for failure tolerance, and benefits of optimization. - * - * <p>{@code DoFnWithContext}s can be tested in a particular - * {@code Pipeline} by running that {@code Pipeline} on sample input - * and then checking its output. Unit testing of a {@code DoFnWithContext}, - * separately from any {@code ParDo} transform or {@code Pipeline}, - * can be done via the {@link DoFnTester} harness. - * - * <p>Implementations must define a method annotated with {@link ProcessElement} - * that satisfies the requirements described there. See the {@link ProcessElement} - * for details. - * - * <p>This functionality is experimental and likely to change. - * - * <p>Example usage: - * - * <pre> {@code - * PCollection<String> lines = ... ; - * PCollection<String> words = - * lines.apply(ParDo.of(new DoFnWithContext<String, String>() { - * @ProcessElement - * public void processElement(ProcessContext c, BoundedWindow window) { - * - * }})); - * } </pre> - * - * @param <InputT> the type of the (main) input elements - * @param <OutputT> the type of the (main) output elements - */ -@Experimental -public abstract class DoFnWithContext<InputT, OutputT> implements Serializable, HasDisplayData { - - /** Information accessible to all methods in this {@code DoFnWithContext}. */ - public abstract class Context { - - /** - * Returns the {@code PipelineOptions} specified with the - * {@link org.apache.beam.sdk.runners.PipelineRunner} - * invoking this {@code DoFnWithContext}. The {@code PipelineOptions} will - * be the default running via {@link DoFnTester}. - */ - public abstract PipelineOptions getPipelineOptions(); - - /** - * Adds the given element to the main output {@code PCollection}. - * - * <p>Once passed to {@code output} the element should not be modified in - * any way. - * - * <p>If invoked from {@link ProcessElement}, the output - * element will have the same timestamp and be in the same windows - * as the input element passed to the method annotated with - * {@code @ProcessElement}. - * - * <p>If invoked from {@link StartBundle} or {@link FinishBundle}, - * this will attempt to use the - * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} - * of the input {@code PCollection} to determine what windows the element - * should be in, throwing an exception if the {@code WindowFn} attempts - * to access any information about the input element. The output element - * will have a timestamp of negative infinity. - */ - public abstract void output(OutputT output); - - /** - * Adds the given element to the main output {@code PCollection}, - * with the given timestamp. - * - * <p>Once passed to {@code outputWithTimestamp} the element should not be - * modified in any way. - * - * <p>If invoked from {@link ProcessElement}), the timestamp - * must not be older than the input element's timestamp minus - * {@link OldDoFn#getAllowedTimestampSkew}. The output element will - * be in the same windows as the input element. - * - * <p>If invoked from {@link StartBundle} or {@link FinishBundle}, - * this will attempt to use the - * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} - * of the input {@code PCollection} to determine what windows the element - * should be in, throwing an exception if the {@code WindowFn} attempts - * to access any information about the input element except for the - * timestamp. - */ - public abstract void outputWithTimestamp(OutputT output, Instant timestamp); - - /** - * Adds the given element to the side output {@code PCollection} with the - * given tag. - * - * <p>Once passed to {@code sideOutput} the element should not be modified - * in any way. - * - * <p>The caller of {@code ParDo} uses {@link ParDo#withOutputTags} to - * specify the tags of side outputs that it consumes. Non-consumed side - * outputs, e.g., outputs for monitoring purposes only, don't necessarily - * need to be specified. - * - * <p>The output element will have the same timestamp and be in the same - * windows as the input element passed to {@link ProcessElement}). - * - * <p>If invoked from {@link StartBundle} or {@link FinishBundle}, - * this will attempt to use the - * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} - * of the input {@code PCollection} to determine what windows the element - * should be in, throwing an exception if the {@code WindowFn} attempts - * to access any information about the input element. The output element - * will have a timestamp of negative infinity. - * - * @see ParDo#withOutputTags - */ - public abstract <T> void sideOutput(TupleTag<T> tag, T output); - - /** - * Adds the given element to the specified side output {@code PCollection}, - * with the given timestamp. - * - * <p>Once passed to {@code sideOutputWithTimestamp} the element should not be - * modified in any way. - * - * <p>If invoked from {@link ProcessElement}), the timestamp - * must not be older than the input element's timestamp minus - * {@link OldDoFn#getAllowedTimestampSkew}. The output element will - * be in the same windows as the input element. - * - * <p>If invoked from {@link StartBundle} or {@link FinishBundle}, - * this will attempt to use the - * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} - * of the input {@code PCollection} to determine what windows the element - * should be in, throwing an exception if the {@code WindowFn} attempts - * to access any information about the input element except for the - * timestamp. - * - * @see ParDo#withOutputTags - */ - public abstract <T> void sideOutputWithTimestamp( - TupleTag<T> tag, T output, Instant timestamp); - } - - /** - * Information accessible when running {@link OldDoFn#processElement}. - */ - public abstract class ProcessContext extends Context { - - /** - * Returns the input element to be processed. - * - * <p>The element will not be changed -- it is safe to cache, etc. - * without copying. - */ - public abstract InputT element(); - - - /** - * Returns the value of the side input. - * - * @throws IllegalArgumentException if this is not a side input - * @see ParDo#withSideInputs - */ - public abstract <T> T sideInput(PCollectionView<T> view); - - /** - * Returns the timestamp of the input element. - * - * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window} - * for more information. - */ - public abstract Instant timestamp(); - - /** - * Returns information about the pane within this window into which the - * input element has been assigned. - * - * <p>Generally all data is in a single, uninteresting pane unless custom - * triggering and/or late data has been explicitly requested. - * See {@link org.apache.beam.sdk.transforms.windowing.Window} - * for more information. - */ - public abstract PaneInfo pane(); - } - - /** - * Returns the allowed timestamp skew duration, which is the maximum - * duration that timestamps can be shifted backward in - * {@link DoFnWithContext.Context#outputWithTimestamp}. - * - * <p>The default value is {@code Duration.ZERO}, in which case - * timestamps can only be shifted forward to future. For infinite - * skew, return {@code Duration.millis(Long.MAX_VALUE)}. - */ - public Duration getAllowedTimestampSkew() { - return Duration.ZERO; - } - - ///////////////////////////////////////////////////////////////////////////// - - Map<String, DelegatingAggregator<?, ?>> aggregators = new HashMap<>(); - - /** - * Protects aggregators from being created after initialization. - */ - private boolean aggregatorsAreFinal; - - /** - * Returns a {@link TypeDescriptor} capturing what is known statically - * about the input type of this {@code DoFnWithContext} instance's most-derived - * class. - * - * <p>See {@link #getOutputTypeDescriptor} for more discussion. - */ - protected TypeDescriptor<InputT> getInputTypeDescriptor() { - return new TypeDescriptor<InputT>(getClass()) {}; - } - - /** - * Returns a {@link TypeDescriptor} capturing what is known statically - * about the output type of this {@code DoFnWithContext} instance's - * most-derived class. - * - * <p>In the normal case of a concrete {@code DoFnWithContext} subclass with - * no generic type parameters of its own (including anonymous inner - * classes), this will be a complete non-generic type, which is good - * for choosing a default output {@code Coder<O>} for the output - * {@code PCollection<O>}. - */ - protected TypeDescriptor<OutputT> getOutputTypeDescriptor() { - return new TypeDescriptor<OutputT>(getClass()) {}; - } - - /** - * Interface for runner implementors to provide implementations of extra context information. - * - * <p>The methods on this interface are called by {@link DoFnReflector} before invoking an - * annotated {@link StartBundle}, {@link ProcessElement} or {@link FinishBundle} method that - * has indicated it needs the given extra context. - * - * <p>In the case of {@link ProcessElement} it is called once per invocation of - * {@link ProcessElement}. - */ - public interface ExtraContextFactory<InputT, OutputT> { - /** - * Construct the {@link BoundedWindow} to use within a {@link DoFnWithContext} that - * needs it. This is called if the {@link ProcessElement} method has a parameter of type - * {@link BoundedWindow}. - * - * @return {@link BoundedWindow} of the element currently being processed. - */ - BoundedWindow window(); - - /** - * Construct the {@link WindowingInternals} to use within a {@link DoFnWithContext} that - * needs it. This is called if the {@link ProcessElement} method has a parameter of type - * {@link WindowingInternals}. - */ - WindowingInternals<InputT, OutputT> windowingInternals(); - } - - ///////////////////////////////////////////////////////////////////////////// - - /** - * Annotation for the method to use to prepare an instance for processing a batch of elements. - * The method annotated with this must satisfy the following constraints: - * <ul> - * <li>It must have at least one argument. - * <li>Its first (and only) argument must be a {@link DoFnWithContext.Context}. - * </ul> - */ - @Documented - @Retention(RetentionPolicy.RUNTIME) - @Target(ElementType.METHOD) - public @interface StartBundle {} - - /** - * Annotation for the method to use for processing elements. A subclass of - * {@link DoFnWithContext} must have a method with this annotation satisfying - * the following constraints in order for it to be executable: - * <ul> - * <li>It must have at least one argument. - * <li>Its first argument must be a {@link DoFnWithContext.ProcessContext}. - * <li>Its remaining arguments must be {@link BoundedWindow}, or - * {@link WindowingInternals WindowingInternals<InputT, OutputT>}. - * </ul> - */ - @Documented - @Retention(RetentionPolicy.RUNTIME) - @Target(ElementType.METHOD) - public @interface ProcessElement {} - - /** - * Annotation for the method to use to prepare an instance for processing a batch of elements. - * The method annotated with this must satisfy the following constraints: - * <ul> - * <li>It must have at least one argument. - * <li>Its first (and only) argument must be a {@link DoFnWithContext.Context}. - * </ul> - */ - @Documented - @Retention(RetentionPolicy.RUNTIME) - @Target(ElementType.METHOD) - public @interface FinishBundle {} - - /** - * Returns an {@link Aggregator} with aggregation logic specified by the - * {@link CombineFn} argument. The name provided must be unique across - * {@link Aggregator}s created within the OldDoFn. Aggregators can only be created - * during pipeline construction. - * - * @param name the name of the aggregator - * @param combiner the {@link CombineFn} to use in the aggregator - * @return an aggregator for the provided name and combiner in the scope of - * this OldDoFn - * @throws NullPointerException if the name or combiner is null - * @throws IllegalArgumentException if the given name collides with another - * aggregator in this scope - * @throws IllegalStateException if called during pipeline execution. - */ - public final <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> - createAggregator(String name, Combine.CombineFn<? super AggInputT, ?, AggOutputT> combiner) { - checkNotNull(name, "name cannot be null"); - checkNotNull(combiner, "combiner cannot be null"); - checkArgument(!aggregators.containsKey(name), - "Cannot create aggregator with name %s." - + " An Aggregator with that name already exists within this scope.", - name); - checkState(!aggregatorsAreFinal, - "Cannot create an aggregator during pipeline execution." - + " Aggregators should be registered during pipeline construction."); - - DelegatingAggregator<AggInputT, AggOutputT> aggregator = - new DelegatingAggregator<>(name, combiner); - aggregators.put(name, aggregator); - return aggregator; - } - - /** - * Returns an {@link Aggregator} with the aggregation logic specified by the - * {@link SerializableFunction} argument. The name provided must be unique - * across {@link Aggregator}s created within the OldDoFn. Aggregators can only be - * created during pipeline construction. - * - * @param name the name of the aggregator - * @param combiner the {@link SerializableFunction} to use in the aggregator - * @return an aggregator for the provided name and combiner in the scope of - * this OldDoFn - * @throws NullPointerException if the name or combiner is null - * @throws IllegalArgumentException if the given name collides with another - * aggregator in this scope - * @throws IllegalStateException if called during pipeline execution. - */ - public final <AggInputT> Aggregator<AggInputT, AggInputT> createAggregator( - String name, SerializableFunction<Iterable<AggInputT>, AggInputT> combiner) { - checkNotNull(combiner, "combiner cannot be null."); - return createAggregator(name, Combine.IterableCombineFn.of(combiner)); - } - - /** - * Finalize the {@link DoFnWithContext} construction to prepare for processing. - * This method should be called by runners before any processing methods. - */ - void prepareForProcessing() { - aggregatorsAreFinal = true; - } - - /** - * {@inheritDoc} - * - * <p>By default, does not register any display data. Implementors may override this method - * to provide their own display data. - */ - @Override - public void populateDisplayData(DisplayData.Builder builder) { - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bcb6f46/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java index 48c6033..f640442 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java @@ -63,7 +63,7 @@ import java.util.UUID; * separately from any {@code ParDo} transform or {@code Pipeline}, * can be done via the {@link DoFnTester} harness. * - * <p>{@link DoFnWithContext} (currently experimental) offers an alternative + * <p>{@link DoFn} (currently experimental) offers an alternative * mechanism for accessing {@link ProcessContext#window()} without the need * to implement {@link RequiresWindowAccess}. * http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bcb6f46/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index 36d8101..bb1af9c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -527,13 +527,13 @@ public class ParDo { } private static <InputT, OutputT> OldDoFn<InputT, OutputT> - adapt(DoFnWithContext<InputT, OutputT> fn) { + adapt(DoFn<InputT, OutputT> fn) { return DoFnReflector.of(fn.getClass()).toDoFn(fn); } /** * Creates a {@link ParDo} {@link PTransform} that will invoke the - * given {@link DoFnWithContext} function. + * given {@link DoFn} function. * * <p>The resulting {@link PTransform PTransform's} types have been bound, with the * input being a {@code PCollection<InputT>} and the output a @@ -541,11 +541,11 @@ public class ParDo { * {@code OldDoFn<InputT, OutputT>}. It is ready to be applied, or further * properties can be set on it first. * - * <p>{@link DoFnWithContext} is an experimental alternative to + * <p>{@link DoFn} is an experimental alternative to * {@link OldDoFn} which simplifies accessing the window of the element. */ @Experimental - public static <InputT, OutputT> Bound<InputT, OutputT> of(DoFnWithContext<InputT, OutputT> fn) { + public static <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn) { return of(adapt(fn), fn.getClass()); } @@ -633,13 +633,13 @@ public class ParDo { /** * Returns a new {@link ParDo} {@link PTransform} that's like this - * transform but which will invoke the given {@link DoFnWithContext} + * transform but which will invoke the given {@link DoFn} * function, and which has its input and output types bound. Does * not modify this transform. The resulting {@link PTransform} is * sufficiently specified to be applied, but more properties can * still be specified. */ - public <InputT, OutputT> Bound<InputT, OutputT> of(DoFnWithContext<InputT, OutputT> fn) { + public <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn) { return of(adapt(fn), fn.getClass()); } } @@ -845,12 +845,12 @@ public class ParDo { /** * Returns a new multi-output {@link ParDo} {@link PTransform} * that's like this transform but which will invoke the given - * {@link DoFnWithContext} function, and which has its input type bound. + * {@link DoFn} function, and which has its input type bound. * Does not modify this transform. The resulting * {@link PTransform} is sufficiently specified to be applied, but * more properties can still be specified. */ - public <InputT> BoundMulti<InputT, OutputT> of(DoFnWithContext<InputT, OutputT> fn) { + public <InputT> BoundMulti<InputT, OutputT> of(DoFn<InputT, OutputT> fn) { return of(adapt(fn), fn.getClass()); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bcb6f46/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java index 0cb3d7b..df9e441 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java @@ -21,10 +21,10 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; -import org.apache.beam.sdk.transforms.DoFnWithContext.Context; -import org.apache.beam.sdk.transforms.DoFnWithContext.ExtraContextFactory; -import org.apache.beam.sdk.transforms.DoFnWithContext.ProcessContext; -import org.apache.beam.sdk.transforms.DoFnWithContext.ProcessElement; +import org.apache.beam.sdk.transforms.DoFn.Context; +import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory; +import org.apache.beam.sdk.transforms.DoFn.ProcessContext; +import org.apache.beam.sdk.transforms.DoFn.ProcessElement; import org.apache.beam.sdk.transforms.dofnreflector.DoFnReflectorTestHelper; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.UserCodeException; @@ -61,13 +61,13 @@ public class DoFnReflectorTest { } } - private DoFnWithContext<String, String> fn; + private DoFn<String, String> fn; @Rule public ExpectedException thrown = ExpectedException.none(); @Mock - private DoFnWithContext<String, String>.ProcessContext mockContext; + private DoFn<String, String>.ProcessContext mockContext; @Mock private BoundedWindow mockWindow; @Mock @@ -91,7 +91,7 @@ public class DoFnReflectorTest { }; } - private DoFnReflector underTest(DoFnWithContext<String, String> fn) { + private DoFnReflector underTest(DoFn<String, String> fn) { this.fn = fn; return DoFnReflector.of(fn.getClass()); } @@ -141,7 +141,7 @@ public class DoFnReflectorTest { @Test public void testDoFnWithNoExtraContext() throws Exception { final Invocations invocations = new Invocations("AnonymousClass"); - DoFnReflector reflector = underTest(new DoFnWithContext<String, String>() { + DoFnReflector reflector = underTest(new DoFn<String, String>() { @ProcessElement public void processElement(ProcessContext c) @@ -172,19 +172,19 @@ public class DoFnReflectorTest { interface InterfaceWithProcessElement { @ProcessElement - void processElement(DoFnWithContext<String, String>.ProcessContext c); + void processElement(DoFn<String, String>.ProcessContext c); } interface LayersOfInterfaces extends InterfaceWithProcessElement {} private class IdentityUsingInterfaceWithProcessElement - extends DoFnWithContext<String, String> + extends DoFn<String, String> implements LayersOfInterfaces { private Invocations invocations = new Invocations("Named Class"); @Override - public void processElement(DoFnWithContext<String, String>.ProcessContext c) { + public void processElement(DoFn<String, String>.ProcessContext c) { invocations.wasProcessElementInvoked = true; assertSame(c, mockContext); } @@ -198,7 +198,7 @@ public class DoFnReflectorTest { checkInvokeProcessElementWorks(reflector, fn.invocations); } - private class IdentityParent extends DoFnWithContext<String, String> { + private class IdentityParent extends DoFn<String, String> { protected Invocations parentInvocations = new Invocations("IdentityParent"); @ProcessElement @@ -215,7 +215,7 @@ public class DoFnReflectorTest { protected Invocations childInvocations = new Invocations("IdentityChildWithOverride"); @Override - public void process(DoFnWithContext<String, String>.ProcessContext c) { + public void process(DoFn<String, String>.ProcessContext c) { super.process(c); childInvocations.wasProcessElementInvoked = true; } @@ -240,7 +240,7 @@ public class DoFnReflectorTest { @Test public void testDoFnWithWindow() throws Exception { final Invocations invocations = new Invocations("AnonymousClass"); - DoFnReflector reflector = underTest(new DoFnWithContext<String, String>() { + DoFnReflector reflector = underTest(new DoFn<String, String>() { @ProcessElement public void processElement(ProcessContext c, BoundedWindow w) @@ -259,7 +259,7 @@ public class DoFnReflectorTest { @Test public void testDoFnWithWindowingInternals() throws Exception { final Invocations invocations = new Invocations("AnonymousClass"); - DoFnReflector reflector = underTest(new DoFnWithContext<String, String>() { + DoFnReflector reflector = underTest(new DoFn<String, String>() { @ProcessElement public void processElement(ProcessContext c, WindowingInternals<String, String> w) @@ -278,7 +278,7 @@ public class DoFnReflectorTest { @Test public void testDoFnWithStartBundle() throws Exception { final Invocations invocations = new Invocations("AnonymousClass"); - DoFnReflector reflector = underTest(new DoFnWithContext<String, String>() { + DoFnReflector reflector = underTest(new DoFn<String, String>() { @ProcessElement public void processElement(@SuppressWarnings("unused") ProcessContext c) {} @@ -304,7 +304,7 @@ public class DoFnReflectorTest { thrown.expect(IllegalStateException.class); thrown.expectMessage("No method annotated with @ProcessElement found"); thrown.expectMessage(getClass().getName() + "$"); - underTest(new DoFnWithContext<String, String>() {}); + underTest(new DoFn<String, String>() {}); } @Test @@ -314,7 +314,7 @@ public class DoFnReflectorTest { thrown.expectMessage("foo()"); thrown.expectMessage("bar()"); thrown.expectMessage(getClass().getName() + "$"); - underTest(new DoFnWithContext<String, String>() { + underTest(new DoFn<String, String>() { @ProcessElement public void foo() {} @@ -330,7 +330,7 @@ public class DoFnReflectorTest { thrown.expectMessage("bar()"); thrown.expectMessage("baz()"); thrown.expectMessage(getClass().getName() + "$"); - underTest(new DoFnWithContext<String, String>() { + underTest(new DoFn<String, String>() { @ProcessElement public void foo() {} @@ -349,7 +349,7 @@ public class DoFnReflectorTest { thrown.expectMessage("bar()"); thrown.expectMessage("baz()"); thrown.expectMessage(getClass().getName() + "$"); - underTest(new DoFnWithContext<String, String>() { + underTest(new DoFn<String, String>() { @ProcessElement public void foo() {} @@ -361,7 +361,7 @@ public class DoFnReflectorTest { }); } - private static class PrivateDoFnClass extends DoFnWithContext<String, String> { + private static class PrivateDoFnClass extends DoFn<String, String> { final Invocations invocations = new Invocations(getClass().getName()); @ProcessElement @@ -429,7 +429,7 @@ public class DoFnReflectorTest { thrown.expect(IllegalStateException.class); thrown.expectMessage("process() must be public"); thrown.expectMessage(getClass().getName() + "$"); - underTest(new DoFnWithContext<String, String>() { + underTest(new DoFn<String, String>() { @ProcessElement private void process() {} }); @@ -440,7 +440,7 @@ public class DoFnReflectorTest { thrown.expect(IllegalStateException.class); thrown.expectMessage("startBundle() must be public"); thrown.expectMessage(getClass().getName() + "$"); - underTest(new DoFnWithContext<String, String>() { + underTest(new DoFn<String, String>() { @ProcessElement public void processElement() {} @@ -454,7 +454,7 @@ public class DoFnReflectorTest { thrown.expect(IllegalStateException.class); thrown.expectMessage("finishBundle() must be public"); thrown.expectMessage(getClass().getName() + "$"); - underTest(new DoFnWithContext<String, String>() { + underTest(new DoFn<String, String>() { @ProcessElement public void processElement() {} @@ -490,7 +490,7 @@ public class DoFnReflectorTest { } @SuppressWarnings({"unused"}) - private void badExtraContext(DoFnWithContext<Integer, String>.Context c, int n) {} + private void badExtraContext(DoFn<Integer, String>.Context c, int n) {} @Test public void testBadExtraContext() throws Exception { @@ -505,7 +505,7 @@ public class DoFnReflectorTest { @SuppressWarnings({"unused"}) private void badExtraProcessContext( - DoFnWithContext<Integer, String>.ProcessContext c, Integer n) {} + DoFn<Integer, String>.ProcessContext c, Integer n) {} @Test public void testBadExtraProcessContextType() throws Exception { @@ -534,58 +534,58 @@ public class DoFnReflectorTest { } @SuppressWarnings("unused") - private void goodGenerics(DoFnWithContext<Integer, String>.ProcessContext c, + private void goodGenerics(DoFn<Integer, String>.ProcessContext c, WindowingInternals<Integer, String> i1) {} @Test public void testValidGenerics() throws Exception { Method method = getClass().getDeclaredMethod("goodGenerics", - DoFnWithContext.ProcessContext.class, WindowingInternals.class); + DoFn.ProcessContext.class, WindowingInternals.class); DoFnReflector.verifyProcessMethodArguments(method); } @SuppressWarnings("unused") - private void goodWildcards(DoFnWithContext<Integer, String>.ProcessContext c, + private void goodWildcards(DoFn<Integer, String>.ProcessContext c, WindowingInternals<?, ?> i1) {} @Test public void testGoodWildcards() throws Exception { Method method = getClass().getDeclaredMethod("goodWildcards", - DoFnWithContext.ProcessContext.class, WindowingInternals.class); + DoFn.ProcessContext.class, WindowingInternals.class); DoFnReflector.verifyProcessMethodArguments(method); } @SuppressWarnings("unused") - private void goodBoundedWildcards(DoFnWithContext<Integer, String>.ProcessContext c, + private void goodBoundedWildcards(DoFn<Integer, String>.ProcessContext c, WindowingInternals<? super Integer, ? super String> i1) {} @Test public void testGoodBoundedWildcards() throws Exception { Method method = getClass().getDeclaredMethod("goodBoundedWildcards", - DoFnWithContext.ProcessContext.class, WindowingInternals.class); + DoFn.ProcessContext.class, WindowingInternals.class); DoFnReflector.verifyProcessMethodArguments(method); } @SuppressWarnings("unused") private <InputT, OutputT> void goodTypeVariables( - DoFnWithContext<InputT, OutputT>.ProcessContext c, + DoFn<InputT, OutputT>.ProcessContext c, WindowingInternals<InputT, OutputT> i1) {} @Test public void testGoodTypeVariables() throws Exception { Method method = getClass().getDeclaredMethod("goodTypeVariables", - DoFnWithContext.ProcessContext.class, WindowingInternals.class); + DoFn.ProcessContext.class, WindowingInternals.class); DoFnReflector.verifyProcessMethodArguments(method); } @SuppressWarnings("unused") - private void badGenericTwoArgs(DoFnWithContext<Integer, String>.ProcessContext c, + private void badGenericTwoArgs(DoFn<Integer, String>.ProcessContext c, WindowingInternals<Integer, Integer> i1) {} @Test public void testBadGenericsTwoArgs() throws Exception { Method method = getClass().getDeclaredMethod("badGenericTwoArgs", - DoFnWithContext.ProcessContext.class, WindowingInternals.class); + DoFn.ProcessContext.class, WindowingInternals.class); thrown.expect(IllegalStateException.class); thrown.expectMessage("Incompatible generics in context parameter " @@ -598,13 +598,13 @@ public class DoFnReflectorTest { } @SuppressWarnings("unused") - private void badGenericWildCards(DoFnWithContext<Integer, String>.ProcessContext c, + private void badGenericWildCards(DoFn<Integer, String>.ProcessContext c, WindowingInternals<Integer, ? super Integer> i1) {} @Test public void testBadGenericWildCards() throws Exception { Method method = getClass().getDeclaredMethod("badGenericWildCards", - DoFnWithContext.ProcessContext.class, WindowingInternals.class); + DoFn.ProcessContext.class, WindowingInternals.class); thrown.expect(IllegalStateException.class); thrown.expectMessage("Incompatible generics in context parameter " @@ -617,13 +617,13 @@ public class DoFnReflectorTest { } @SuppressWarnings("unused") - private <InputT, OutputT> void badTypeVariables(DoFnWithContext<InputT, OutputT>.ProcessContext c, + private <InputT, OutputT> void badTypeVariables(DoFn<InputT, OutputT>.ProcessContext c, WindowingInternals<InputT, InputT> i1) {} @Test public void testBadTypeVariables() throws Exception { Method method = getClass().getDeclaredMethod("badTypeVariables", - DoFnWithContext.ProcessContext.class, WindowingInternals.class); + DoFn.ProcessContext.class, WindowingInternals.class); thrown.expect(IllegalStateException.class); thrown.expectMessage("Incompatible generics in context parameter " @@ -636,7 +636,7 @@ public class DoFnReflectorTest { @Test public void testProcessElementException() throws Exception { - DoFnWithContext<Integer, Integer> fn = new DoFnWithContext<Integer, Integer>() { + DoFn<Integer, Integer> fn = new DoFn<Integer, Integer>() { @ProcessElement public void processElement(@SuppressWarnings("unused") ProcessContext c) { throw new IllegalArgumentException("bogus"); @@ -650,7 +650,7 @@ public class DoFnReflectorTest { @Test public void testStartBundleException() throws Exception { - DoFnWithContext<Integer, Integer> fn = new DoFnWithContext<Integer, Integer>() { + DoFn<Integer, Integer> fn = new DoFn<Integer, Integer>() { @StartBundle public void startBundle(@SuppressWarnings("unused") Context c) { throw new IllegalArgumentException("bogus"); @@ -668,7 +668,7 @@ public class DoFnReflectorTest { @Test public void testFinishBundleException() throws Exception { - DoFnWithContext<Integer, Integer> fn = new DoFnWithContext<Integer, Integer>() { + DoFn<Integer, Integer> fn = new DoFn<Integer, Integer>() { @FinishBundle public void finishBundle(@SuppressWarnings("unused") Context c) { throw new IllegalArgumentException("bogus"); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bcb6f46/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java new file mode 100644 index 0000000..c7e8972 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.isA; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.beam.sdk.Pipeline.PipelineExecutionException; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.Max.MaxIntegerFn; +import org.apache.beam.sdk.transforms.display.DisplayData; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.Serializable; + +/** Tests for {@link DoFn}. */ +@RunWith(JUnit4.class) +public class DoFnTest implements Serializable { + @Rule + public transient ExpectedException thrown = ExpectedException.none(); + + private class NoOpDoFn extends DoFn<Void, Void> { + + /** + * @param c context + */ + @ProcessElement + public void processElement(ProcessContext c) { + } + } + + @Test + public void testCreateAggregatorWithCombinerSucceeds() { + String name = "testAggregator"; + Sum.SumLongFn combiner = new Sum.SumLongFn(); + + DoFn<Void, Void> doFn = new NoOpDoFn(); + + Aggregator<Long, Long> aggregator = doFn.createAggregator(name, combiner); + + assertEquals(name, aggregator.getName()); + assertEquals(combiner, aggregator.getCombineFn()); + } + + @Test + public void testCreateAggregatorWithNullNameThrowsException() { + thrown.expect(NullPointerException.class); + thrown.expectMessage("name cannot be null"); + + DoFn<Void, Void> doFn = new NoOpDoFn(); + + doFn.createAggregator(null, new Sum.SumLongFn()); + } + + @Test + public void testCreateAggregatorWithNullCombineFnThrowsException() { + CombineFn<Object, Object, Object> combiner = null; + + thrown.expect(NullPointerException.class); + thrown.expectMessage("combiner cannot be null"); + + DoFn<Void, Void> doFn = new NoOpDoFn(); + + doFn.createAggregator("testAggregator", combiner); + } + + @Test + public void testCreateAggregatorWithNullSerializableFnThrowsException() { + SerializableFunction<Iterable<Object>, Object> combiner = null; + + thrown.expect(NullPointerException.class); + thrown.expectMessage("combiner cannot be null"); + + DoFn<Void, Void> doFn = new NoOpDoFn(); + + doFn.createAggregator("testAggregator", combiner); + } + + @Test + public void testCreateAggregatorWithSameNameThrowsException() { + String name = "testAggregator"; + CombineFn<Double, ?, Double> combiner = new Max.MaxDoubleFn(); + + DoFn<Void, Void> doFn = new NoOpDoFn(); + + doFn.createAggregator(name, combiner); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Cannot create"); + thrown.expectMessage(name); + thrown.expectMessage("already exists"); + + doFn.createAggregator(name, combiner); + } + + @Test + public void testCreateAggregatorsWithDifferentNamesSucceeds() { + String nameOne = "testAggregator"; + String nameTwo = "aggregatorPrime"; + CombineFn<Double, ?, Double> combiner = new Max.MaxDoubleFn(); + + DoFn<Void, Void> doFn = new NoOpDoFn(); + + Aggregator<Double, Double> aggregatorOne = + doFn.createAggregator(nameOne, combiner); + Aggregator<Double, Double> aggregatorTwo = + doFn.createAggregator(nameTwo, combiner); + + assertNotEquals(aggregatorOne, aggregatorTwo); + } + + @Test + public void testDoFnWithContextUsingAggregators() { + NoOpOldDoFn<Object, Object> noOpFn = new NoOpOldDoFn<>(); + OldDoFn<Object, Object>.Context context = noOpFn.context(); + + OldDoFn<Object, Object> fn = spy(noOpFn); + context = spy(context); + + @SuppressWarnings("unchecked") + Aggregator<Long, Long> agg = mock(Aggregator.class); + + Sum.SumLongFn combiner = new Sum.SumLongFn(); + Aggregator<Long, Long> delegateAggregator = + fn.createAggregator("test", combiner); + + when(context.createAggregatorInternal("test", combiner)).thenReturn(agg); + + context.setupDelegateAggregators(); + delegateAggregator.addValue(1L); + + verify(agg).addValue(1L); + } + + @Test + public void testDefaultPopulateDisplayDataImplementation() { + DoFn<String, String> fn = new DoFn<String, String>() { + }; + DisplayData displayData = DisplayData.from(fn); + assertThat(displayData.items(), empty()); + } + + @Test + @Category(NeedsRunner.class) + public void testCreateAggregatorInStartBundleThrows() { + TestPipeline p = createTestPipeline(new DoFn<String, String>() { + @StartBundle + public void startBundle(Context c) { + createAggregator("anyAggregate", new MaxIntegerFn()); + } + + @ProcessElement + public void processElement(ProcessContext c) {} + }); + + thrown.expect(PipelineExecutionException.class); + thrown.expectCause(isA(IllegalStateException.class)); + + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testCreateAggregatorInProcessElementThrows() { + TestPipeline p = createTestPipeline(new DoFn<String, String>() { + @ProcessElement + public void processElement(ProcessContext c) { + createAggregator("anyAggregate", new MaxIntegerFn()); + } + }); + + thrown.expect(PipelineExecutionException.class); + thrown.expectCause(isA(IllegalStateException.class)); + + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testCreateAggregatorInFinishBundleThrows() { + TestPipeline p = createTestPipeline(new DoFn<String, String>() { + @FinishBundle + public void finishBundle(Context c) { + createAggregator("anyAggregate", new MaxIntegerFn()); + } + + @ProcessElement + public void processElement(ProcessContext c) {} + }); + + thrown.expect(PipelineExecutionException.class); + thrown.expectCause(isA(IllegalStateException.class)); + + p.run(); + } + + /** + * Initialize a test pipeline with the specified {@link OldDoFn}. + */ + private <InputT, OutputT> TestPipeline createTestPipeline(DoFn<InputT, OutputT> fn) { + TestPipeline pipeline = TestPipeline.create(); + pipeline.apply(Create.of((InputT) null)) + .apply(ParDo.of(fn)); + + return pipeline; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bcb6f46/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java deleted file mode 100644 index 0a910b8..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java +++ /dev/null @@ -1,237 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms; - -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.isA; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import org.apache.beam.sdk.Pipeline.PipelineExecutionException; -import org.apache.beam.sdk.testing.NeedsRunner; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.Max.MaxIntegerFn; -import org.apache.beam.sdk.transforms.display.DisplayData; - -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.Serializable; - -/** Tests for {@link DoFnWithContext}. */ -@RunWith(JUnit4.class) -public class DoFnWithContextTest implements Serializable { - @Rule - public transient ExpectedException thrown = ExpectedException.none(); - - private class NoOpDoFnWithContext extends DoFnWithContext<Void, Void> { - - /** - * @param c context - */ - @ProcessElement - public void processElement(ProcessContext c) { - } - } - - @Test - public void testCreateAggregatorWithCombinerSucceeds() { - String name = "testAggregator"; - Sum.SumLongFn combiner = new Sum.SumLongFn(); - - DoFnWithContext<Void, Void> doFn = new NoOpDoFnWithContext(); - - Aggregator<Long, Long> aggregator = doFn.createAggregator(name, combiner); - - assertEquals(name, aggregator.getName()); - assertEquals(combiner, aggregator.getCombineFn()); - } - - @Test - public void testCreateAggregatorWithNullNameThrowsException() { - thrown.expect(NullPointerException.class); - thrown.expectMessage("name cannot be null"); - - DoFnWithContext<Void, Void> doFn = new NoOpDoFnWithContext(); - - doFn.createAggregator(null, new Sum.SumLongFn()); - } - - @Test - public void testCreateAggregatorWithNullCombineFnThrowsException() { - CombineFn<Object, Object, Object> combiner = null; - - thrown.expect(NullPointerException.class); - thrown.expectMessage("combiner cannot be null"); - - DoFnWithContext<Void, Void> doFn = new NoOpDoFnWithContext(); - - doFn.createAggregator("testAggregator", combiner); - } - - @Test - public void testCreateAggregatorWithNullSerializableFnThrowsException() { - SerializableFunction<Iterable<Object>, Object> combiner = null; - - thrown.expect(NullPointerException.class); - thrown.expectMessage("combiner cannot be null"); - - DoFnWithContext<Void, Void> doFn = new NoOpDoFnWithContext(); - - doFn.createAggregator("testAggregator", combiner); - } - - @Test - public void testCreateAggregatorWithSameNameThrowsException() { - String name = "testAggregator"; - CombineFn<Double, ?, Double> combiner = new Max.MaxDoubleFn(); - - DoFnWithContext<Void, Void> doFn = new NoOpDoFnWithContext(); - - doFn.createAggregator(name, combiner); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Cannot create"); - thrown.expectMessage(name); - thrown.expectMessage("already exists"); - - doFn.createAggregator(name, combiner); - } - - @Test - public void testCreateAggregatorsWithDifferentNamesSucceeds() { - String nameOne = "testAggregator"; - String nameTwo = "aggregatorPrime"; - CombineFn<Double, ?, Double> combiner = new Max.MaxDoubleFn(); - - DoFnWithContext<Void, Void> doFn = new NoOpDoFnWithContext(); - - Aggregator<Double, Double> aggregatorOne = - doFn.createAggregator(nameOne, combiner); - Aggregator<Double, Double> aggregatorTwo = - doFn.createAggregator(nameTwo, combiner); - - assertNotEquals(aggregatorOne, aggregatorTwo); - } - - @Test - public void testDoFnWithContextUsingAggregators() { - NoOpOldDoFn<Object, Object> noOpFn = new NoOpOldDoFn<>(); - OldDoFn<Object, Object>.Context context = noOpFn.context(); - - OldDoFn<Object, Object> fn = spy(noOpFn); - context = spy(context); - - @SuppressWarnings("unchecked") - Aggregator<Long, Long> agg = mock(Aggregator.class); - - Sum.SumLongFn combiner = new Sum.SumLongFn(); - Aggregator<Long, Long> delegateAggregator = - fn.createAggregator("test", combiner); - - when(context.createAggregatorInternal("test", combiner)).thenReturn(agg); - - context.setupDelegateAggregators(); - delegateAggregator.addValue(1L); - - verify(agg).addValue(1L); - } - - @Test - public void testDefaultPopulateDisplayDataImplementation() { - DoFnWithContext<String, String> fn = new DoFnWithContext<String, String>() { - }; - DisplayData displayData = DisplayData.from(fn); - assertThat(displayData.items(), empty()); - } - - @Test - @Category(NeedsRunner.class) - public void testCreateAggregatorInStartBundleThrows() { - TestPipeline p = createTestPipeline(new DoFnWithContext<String, String>() { - @StartBundle - public void startBundle(Context c) { - createAggregator("anyAggregate", new MaxIntegerFn()); - } - - @ProcessElement - public void processElement(ProcessContext c) {} - }); - - thrown.expect(PipelineExecutionException.class); - thrown.expectCause(isA(IllegalStateException.class)); - - p.run(); - } - - @Test - @Category(NeedsRunner.class) - public void testCreateAggregatorInProcessElementThrows() { - TestPipeline p = createTestPipeline(new DoFnWithContext<String, String>() { - @ProcessElement - public void processElement(ProcessContext c) { - createAggregator("anyAggregate", new MaxIntegerFn()); - } - }); - - thrown.expect(PipelineExecutionException.class); - thrown.expectCause(isA(IllegalStateException.class)); - - p.run(); - } - - @Test - @Category(NeedsRunner.class) - public void testCreateAggregatorInFinishBundleThrows() { - TestPipeline p = createTestPipeline(new DoFnWithContext<String, String>() { - @FinishBundle - public void finishBundle(Context c) { - createAggregator("anyAggregate", new MaxIntegerFn()); - } - - @ProcessElement - public void processElement(ProcessContext c) {} - }); - - thrown.expect(PipelineExecutionException.class); - thrown.expectCause(isA(IllegalStateException.class)); - - p.run(); - } - - /** - * Initialize a test pipeline with the specified {@link OldDoFn}. - */ - private <InputT, OutputT> TestPipeline createTestPipeline(DoFnWithContext<InputT, OutputT> fn) { - TestPipeline pipeline = TestPipeline.create(); - pipeline.apply(Create.of((InputT) null)) - .apply(ParDo.of(fn)); - - return pipeline; - } -}