Repository: incubator-beam Updated Branches: refs/heads/master bb9c38664 -> 2a7169b6f
Move DoFn.ArgumentProvider to DoFnInvoker.ArgumentProvider The arguments provided as a single object are an aspect of the DoFnInvoker, not the DoFn. The DoFn itself is a specification that may have other ways of being invoked, depending on the circumstance. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/33fb8c2d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/33fb8c2d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/33fb8c2d Branch: refs/heads/master Commit: 33fb8c2db8c64275f1b9b8ac6dfd12e92d7fb777 Parents: bb9c386 Author: Kenneth Knowles <k...@google.com> Authored: Thu Nov 17 23:04:55 2016 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Fri Nov 18 14:20:20 2016 -0800 ---------------------------------------------------------------------- .../beam/runners/core/SimpleDoFnRunner.java | 4 +- .../beam/runners/core/SplittableParDo.java | 7 +- .../org/apache/beam/sdk/transforms/DoFn.java | 122 ------------------- .../beam/sdk/transforms/DoFnAdapters.java | 10 +- .../reflect/ByteBuddyDoFnInvokerFactory.java | 41 +++---- .../sdk/transforms/reflect/DoFnInvoker.java | 121 +++++++++++++++++- .../sdk/transforms/reflect/DoFnInvokers.java | 4 +- .../sdk/transforms/reflect/OnTimerInvoker.java | 8 +- .../transforms/reflect/DoFnInvokersTest.java | 5 +- .../transforms/reflect/OnTimerInvokersTest.java | 2 +- .../transforms/DoFnInvokersBenchmark.java | 5 +- 11 files changed, 161 insertions(+), 168 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 76aae8f..841e412 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -183,7 +183,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out * @param <OutputT> the type of the {@link DoFn} (main) output elements */ private static class DoFnContext<InputT, OutputT> extends DoFn<InputT, OutputT>.Context - implements DoFn.ArgumentProvider<InputT, OutputT> { + implements DoFnInvoker.ArgumentProvider<InputT, OutputT> { private static final int MAX_SIDE_OUTPUTS = 1000; final PipelineOptions options; @@ -424,7 +424,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out * @param <OutputT> the type of the {@link DoFn} (main) output elements */ private class DoFnProcessContext<InputT, OutputT> extends DoFn<InputT, OutputT>.ProcessContext - implements DoFn.ArgumentProvider<InputT, OutputT> { + implements DoFnInvoker.ArgumentProvider<InputT, OutputT> { final DoFn<InputT, OutputT> fn; final DoFnContext<InputT, OutputT> context; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java index 3003984..c38ab2f 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java @@ -392,10 +392,11 @@ public class SplittableParDo< } /** - * Creates an {@link DoFn.ArgumentProvider} that provides the given tracker as well as the given + * Creates an {@link DoFnInvoker.ArgumentProvider} that provides the given tracker as well as + * the given * {@link ProcessContext} (which is also provided when a {@link Context} is requested. */ - private DoFn.ArgumentProvider<InputT, OutputT> wrapTracker( + private DoFnInvoker.ArgumentProvider<InputT, OutputT> wrapTracker( TrackerT tracker, DoFn<InputT, OutputT>.ProcessContext processContext) { return new ArgumentProviderForTracker<>(tracker, processContext); @@ -403,7 +404,7 @@ public class SplittableParDo< private static class ArgumentProviderForTracker< InputT, OutputT, TrackerT extends RestrictionTracker<?>> - implements DoFn.ArgumentProvider<InputT, OutputT> { + implements DoFnInvoker.ArgumentProvider<InputT, OutputT> { private final TrackerT tracker; private final DoFn<InputT, OutputT>.ProcessContext processContext; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index bf0631b..9978ef4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -38,13 +38,11 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; -import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.TimerSpec; -import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateSpec; import org.apache.beam.sdk.values.PCollection; @@ -331,78 +329,6 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD return new TypeDescriptor<OutputT>(getClass()) {}; } - /** - * Interface for runner implementors to provide implementations of extra context information. - * - * <p>The methods on this interface are called by {@link DoFnInvoker} before invoking an - * annotated {@link StartBundle}, {@link ProcessElement} or {@link FinishBundle} method that - * has indicated it needs the given extra context. - * - * <p>In the case of {@link ProcessElement} it is called once per invocation of - * {@link ProcessElement}. - */ - public interface ArgumentProvider<InputT, OutputT> { - /** - * Construct the {@link BoundedWindow} to use within a {@link DoFn} that - * needs it. This is called if the {@link ProcessElement} method has a parameter of type - * {@link BoundedWindow}. - * - * @return {@link BoundedWindow} of the element currently being processed. - */ - BoundedWindow window(); - - /** - * Provide a {@link DoFn.Context} to use with the given {@link DoFn}. - */ - DoFn<InputT, OutputT>.Context context(DoFn<InputT, OutputT> doFn); - - /** - * Provide a {@link DoFn.ProcessContext} to use with the given {@link DoFn}. - */ - DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn); - - /** - * A placeholder for testing purposes. - */ - InputProvider<InputT> inputProvider(); - - /** - * A placeholder for testing purposes. - */ - OutputReceiver<OutputT> outputReceiver(); - - /** - * For migration from {@link OldDoFn} to {@link DoFn}, provide - * a {@link WindowingInternals} so an {@link OldDoFn} can be run - * via {@link DoFnInvoker}. - * - * <p>This is <i>not</i> exposed via the reflective capabilities - * of {@link DoFn}. - * - * @deprecated Please port occurences of {@link OldDoFn} to {@link DoFn}. If they require - * state and timers, they will need to wait for the arrival of those features. Do not introduce - * new uses of this method. - */ - @Deprecated - WindowingInternals<InputT, OutputT> windowingInternals(); - - /** - * If this is a splittable {@link DoFn}, returns the {@link RestrictionTracker} associated with - * the current {@link ProcessElement} call. - */ - <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker(); - - /** - * Returns the state cell for the given {@link StateId}. - */ - State state(String stateId); - - /** - * Returns the timer for the given {@link TimerId}. - */ - Timer timer(String timerId); - } - /** Receives values of the given type. */ public interface OutputReceiver<T> { void output(T output); @@ -413,54 +339,6 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD T get(); } - /** For testing only, this {@link ArgumentProvider} returns {@code null} for all parameters. */ - public static class FakeArgumentProvider<InputT, OutputT> - implements ArgumentProvider<InputT, OutputT> { - @Override - public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) { - return null; - } - - @Override - public BoundedWindow window() { - return null; - } - - @Override - public DoFn<InputT, OutputT>.Context context(DoFn<InputT, OutputT> doFn) { - return null; - } - - @Override - public InputProvider<InputT> inputProvider() { - return null; - } - - @Override - public OutputReceiver<OutputT> outputReceiver() { - return null; - } - - @Override - public WindowingInternals<InputT, OutputT> windowingInternals() { - return null; - } - - @Override - public State state(String stateId) { - return null; - } - - @Override - public Timer timer(String timerId) { - return null; - } - - public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() { - return null; - } - } - ///////////////////////////////////////////////////////////////////////////// /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java index 71a6d1d..a3466bb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java @@ -77,7 +77,7 @@ public class DoFnAdapters { public static <InputT, OutputT> OldDoFn<InputT, OutputT>.ProcessContext adaptProcessContext( OldDoFn<InputT, OutputT> fn, final DoFn<InputT, OutputT>.ProcessContext c, - final DoFn.ArgumentProvider<InputT, OutputT> extra) { + final DoFnInvoker.ArgumentProvider<InputT, OutputT> extra) { return fn.new ProcessContext() { @Override public InputT element() { @@ -270,12 +270,12 @@ public class DoFnAdapters { } /** - * Wraps an {@link OldDoFn.Context} as a {@link DoFn.ArgumentProvider} inside a {@link + * Wraps an {@link OldDoFn.Context} as a {@link DoFnInvoker.ArgumentProvider} inside a {@link * DoFn.StartBundle} or {@link DoFn.FinishBundle} method, which means the extra context is * unavailable. */ private static class ContextAdapter<InputT, OutputT> extends DoFn<InputT, OutputT>.Context - implements DoFn.ArgumentProvider<InputT, OutputT> { + implements DoFnInvoker.ArgumentProvider<InputT, OutputT> { private OldDoFn<InputT, OutputT>.Context context; @@ -371,11 +371,11 @@ public class DoFnAdapters { } /** - * Wraps an {@link OldDoFn.ProcessContext} as a {@link DoFn.ArgumentProvider} method. + * Wraps an {@link OldDoFn.ProcessContext} as a {@link DoFnInvoker.ArgumentProvider} method. */ private static class ProcessContextAdapter<InputT, OutputT> extends DoFn<InputT, OutputT>.ProcessContext - implements DoFn.ArgumentProvider<InputT, OutputT> { + implements DoFnInvoker.ArgumentProvider<InputT, OutputT> { private OldDoFn<InputT, OutputT>.ProcessContext context; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java index bc6d8c9..9998c9d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java @@ -101,7 +101,7 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { /** * Creates a {@link DoFnInvoker} for the given {@link DoFn} by generating bytecode that directly - * invokes its methods with arguments extracted from the {@link DoFn.ArgumentProvider}. + * invokes its methods with arguments extracted from the {@link DoFnInvoker.ArgumentProvider}. */ @Override public <InputT, OutputT> DoFnInvoker<InputT, OutputT> invokerFor(DoFn<InputT, OutputT> fn) { @@ -149,19 +149,19 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { /** * Associates the given timer ID with the given {@link OnTimerInvoker}. * - * <p>ByteBuddy does not like to generate conditional code, so we use a map + lookup - * of the timer ID rather than a generated conditional branch to choose which - * OnTimerInvoker to invoke. + * <p>ByteBuddy does not like to generate conditional code, so we use a map + lookup of the + * timer ID rather than a generated conditional branch to choose which OnTimerInvoker to invoke. * - * <p>This method has package level access as it is intended only for assembly of the - * {@link DoFnInvokerBase} not by any subclass. + * <p>This method has package level access as it is intended only for assembly of the {@link + * DoFnInvokerBase} not by any subclass. */ void addOnTimerInvoker(String timerId, OnTimerInvoker onTimerInvoker) { this.onTimerInvokers.put(timerId, onTimerInvoker); } @Override - public void invokeOnTimer(String timerId, DoFn.ArgumentProvider<InputT, OutputT> arguments) { + public void invokeOnTimer( + String timerId, DoFnInvoker.ArgumentProvider<InputT, OutputT> arguments) { @Nullable OnTimerInvoker onTimerInvoker = onTimerInvokers.get(timerId); if (onTimerInvoker != null) { @@ -193,8 +193,8 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { getByteBuddyInvokerConstructor(signature).newInstance(fn); for (OnTimerMethod onTimerMethod : signature.onTimerMethods().values()) { - invoker.addOnTimerInvoker(onTimerMethod.id(), - OnTimerInvokers.forTimer(fn, onTimerMethod.id())); + invoker.addOnTimerInvoker( + onTimerMethod.id(), OnTimerInvokers.forTimer(fn, onTimerMethod.id())); } return invoker; @@ -326,8 +326,7 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { new DefaultRestrictionCoder(signature.getInitialRestriction().restrictionT())); } else { return new DowncastingParametersMethodDelegation( - doFnType, - signature.getRestrictionCoder().targetMethod()); + doFnType, signature.getRestrictionCoder().targetMethod()); } } else { return ExceptionMethod.throwing(UnsupportedOperationException.class); @@ -345,8 +344,8 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { } /** Delegates to the given method if available, or does nothing. */ - private static Implementation delegateOrNoop(TypeDescription doFnType, DoFnSignature.DoFnMethod - method) { + private static Implementation delegateOrNoop( + TypeDescription doFnType, DoFnSignature.DoFnMethod method) { return (method == null) ? FixedValue.originType() : new DoFnMethodDelegation(doFnType, method.targetMethod()); @@ -504,19 +503,19 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { String methodName, Class<?>... parameterTypes) { try { return new MethodDescription.ForLoadedMethod( - DoFn.ArgumentProvider.class.getMethod(methodName, parameterTypes)); + DoFnInvoker.ArgumentProvider.class.getMethod(methodName, parameterTypes)); } catch (Exception e) { throw new IllegalStateException( String.format( "Failed to locate required method %s.%s", - DoFn.ArgumentProvider.class.getSimpleName(), methodName), + DoFnInvoker.ArgumentProvider.class.getSimpleName(), methodName), e); } } /** - * Calls a zero-parameter getter on the {@link DoFn.ArgumentProvider}, which must be on top of the - * stack. + * Calls a zero-parameter getter on the {@link DoFnInvoker.ArgumentProvider}, which must be on top + * of the stack. */ private static StackManipulation simpleExtraContextParameter(String methodName) { return new StackManipulation.Compound( @@ -565,7 +564,7 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { @Override public StackManipulation dispatch(RestrictionTrackerParameter p) { - // DoFn.ArgumentProvider.restrictionTracker() returns a RestrictionTracker, + // DoFnInvoker.ArgumentProvider.restrictionTracker() returns a RestrictionTracker, // but the @ProcessElement method expects a concrete subtype of it. // Insert a downcast. return new StackManipulation.Compound( @@ -613,8 +612,8 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { private final DoFnSignature.ProcessElementMethod signature; /** Implementation of {@link MethodDelegation} for the {@link ProcessElement} method. */ - private ProcessElementDelegation(TypeDescription doFnType, DoFnSignature.ProcessElementMethod - signature) { + private ProcessElementDelegation( + TypeDescription doFnType, DoFnSignature.ProcessElementMethod signature) { super(doFnType, signature.targetMethod()); this.signature = signature; } @@ -622,7 +621,7 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { @Override protected StackManipulation beforeDelegation(MethodDescription instrumentedMethod) { // Parameters of the wrapper invoker method: - // DoFn.ArgumentProvider + // DoFnInvoker.ArgumentProvider // Parameters of the wrapped DoFn method: // [DoFn.ProcessContext, BoundedWindow, InputProvider, OutputReceiver] in any order ArrayList<StackManipulation> pushParameters = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java index 2ae7920..d899207 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java @@ -20,7 +20,19 @@ package org.apache.beam.sdk.transforms.reflect; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.FinishBundle; +import org.apache.beam.sdk.transforms.DoFn.InputProvider; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.ProcessElement; +import org.apache.beam.sdk.transforms.DoFn.StartBundle; +import org.apache.beam.sdk.transforms.DoFn.StateId; +import org.apache.beam.sdk.transforms.DoFn.TimerId; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.Timer; +import org.apache.beam.sdk.util.WindowingInternals; +import org.apache.beam.sdk.util.state.State; /** * Interface for invoking the {@code DoFn} processing methods. @@ -48,11 +60,10 @@ public interface DoFnInvoker<InputT, OutputT> { * @return The {@link DoFn.ProcessContinuation} returned by the underlying method, or {@link * DoFn.ProcessContinuation#stop()} if it returns {@code void}. */ - DoFn.ProcessContinuation invokeProcessElement(DoFn.ArgumentProvider<InputT, OutputT> extra); + DoFn.ProcessContinuation invokeProcessElement(ArgumentProvider<InputT, OutputT> extra); /** Invoke the appropriate {@link DoFn.OnTimer} method on the bound {@link DoFn}. */ - void invokeOnTimer( - String timerId, DoFn.ArgumentProvider<InputT, OutputT> arguments); + void invokeOnTimer(String timerId, ArgumentProvider<InputT, OutputT> arguments); /** Invoke the {@link DoFn.GetInitialRestriction} method on the bound {@link DoFn}. */ <RestrictionT> RestrictionT invokeGetInitialRestriction(InputT element); @@ -72,4 +83,108 @@ public interface DoFnInvoker<InputT, OutputT> { /** Invoke the {@link DoFn.NewTracker} method on the bound {@link DoFn}. */ <RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>> TrackerT invokeNewTracker( RestrictionT restriction); + + /** + * Interface for runner implementors to provide implementations of extra context information. + * + * <p>The methods on this interface are called by {@link DoFnInvoker} before invoking an annotated + * {@link StartBundle}, {@link ProcessElement} or {@link FinishBundle} method that has indicated + * it needs the given extra context. + * + * <p>In the case of {@link ProcessElement} it is called once per invocation of {@link + * ProcessElement}. + */ + interface ArgumentProvider<InputT, OutputT> { + /** + * Construct the {@link BoundedWindow} to use within a {@link DoFn} that needs it. This is + * called if the {@link ProcessElement} method has a parameter of type {@link BoundedWindow}. + * + * @return {@link BoundedWindow} of the element currently being processed. + */ + BoundedWindow window(); + + /** Provide a {@link DoFn.Context} to use with the given {@link DoFn}. */ + DoFn<InputT, OutputT>.Context context(DoFn<InputT, OutputT> doFn); + + /** Provide a {@link DoFn.ProcessContext} to use with the given {@link DoFn}. */ + DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn); + + /** A placeholder for testing purposes. */ + InputProvider<InputT> inputProvider(); + + /** A placeholder for testing purposes. */ + OutputReceiver<OutputT> outputReceiver(); + + /** + * For migration from {@link OldDoFn} to {@link DoFn}, provide a {@link WindowingInternals} so + * an {@link OldDoFn} can be run via {@link DoFnInvoker}. + * + * <p>This is <i>not</i> exposed via the reflective capabilities of {@link DoFn}. + * + * @deprecated Please port occurences of {@link OldDoFn} to {@link DoFn}. If they require state + * and timers, they will need to wait for the arrival of those features. Do not introduce + * new uses of this method. + */ + @Deprecated + WindowingInternals<InputT, OutputT> windowingInternals(); + + /** + * If this is a splittable {@link DoFn}, returns the {@link RestrictionTracker} associated with + * the current {@link ProcessElement} call. + */ + <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker(); + + /** Returns the state cell for the given {@link StateId}. */ + State state(String stateId); + + /** Returns the timer for the given {@link TimerId}. */ + Timer timer(String timerId); + } + + /** For testing only, this {@link ArgumentProvider} returns {@code null} for all parameters. */ + class FakeArgumentProvider<InputT, OutputT> implements ArgumentProvider<InputT, OutputT> { + @Override + public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) { + return null; + } + + @Override + public BoundedWindow window() { + return null; + } + + @Override + public DoFn<InputT, OutputT>.Context context(DoFn<InputT, OutputT> doFn) { + return null; + } + + @Override + public InputProvider<InputT> inputProvider() { + return null; + } + + @Override + public OutputReceiver<OutputT> outputReceiver() { + return null; + } + + @Override + public WindowingInternals<InputT, OutputT> windowingInternals() { + return null; + } + + @Override + public State state(String stateId) { + return null; + } + + @Override + public Timer timer(String timerId) { + return null; + } + + public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() { + return null; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java index 7eccaab..15ba198 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java @@ -100,7 +100,7 @@ public class DoFnInvokers { @Override public DoFn.ProcessContinuation invokeProcessElement( - DoFn.ArgumentProvider<InputT, OutputT> extra) { + ArgumentProvider<InputT, OutputT> extra) { // The outer DoFn is immaterial - it exists only to avoid typing InputT and OutputT repeatedly DoFn<InputT, OutputT>.ProcessContext newCtx = extra.processContext(new DoFn<InputT, OutputT>() {}); @@ -115,7 +115,7 @@ public class DoFnInvokers { } @Override - public void invokeOnTimer(String timerId, DoFn.ArgumentProvider<InputT, OutputT> arguments) { + public void invokeOnTimer(String timerId, ArgumentProvider<InputT, OutputT> arguments) { throw new UnsupportedOperationException( String.format("Timers are not supported for %s", OldDoFn.class.getSimpleName())); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java index bfcafd0..3fbad0f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java @@ -17,11 +17,11 @@ */ package org.apache.beam.sdk.transforms.reflect; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.OnTimer; -/** Interface for invoking the {@link DoFn.OnTimer} method for a particular timer. */ +/** Interface for invoking the {@link OnTimer} method for a particular timer. */ public interface OnTimerInvoker<InputT, OutputT> { - /** Invoke the {@link DoFn.OnTimer} method in the provided context. */ - void invokeOnTimer(DoFn.ArgumentProvider<InputT, OutputT> extra); + /** Invoke the {@link OnTimer} method in the provided context. */ + void invokeOnTimer(DoFnInvoker.ArgumentProvider<InputT, OutputT> extra); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index 3d9e3ec..456a6eb 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -40,10 +40,9 @@ import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFn.ArgumentProvider; -import org.apache.beam.sdk.transforms.DoFn.FakeArgumentProvider; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.FakeArgumentProvider; import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -80,7 +79,7 @@ public class DoFnInvokersTest { @Mock private DoFn.InputProvider<String> mockInputProvider; @Mock private DoFn.OutputReceiver<String> mockOutputReceiver; @Mock private WindowingInternals<String, String> mockWindowingInternals; - @Mock private ArgumentProvider<String, String> mockArgumentProvider; + @Mock private DoFnInvoker.ArgumentProvider<String, String> mockArgumentProvider; @Mock private OldDoFn<String, String> mockOldDoFn; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java index d51e9cc..177f15f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java @@ -43,7 +43,7 @@ public class OnTimerInvokersTest { @Mock private BoundedWindow mockWindow; - @Mock private DoFn.ArgumentProvider<String, String> mockArgumentProvider; + @Mock private DoFnInvoker.ArgumentProvider<String, String> mockArgumentProvider; @Before public void setUp() { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33fb8c2d/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java ---------------------------------------------------------------------- diff --git a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java index e0fdac6..442bdec 100644 --- a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java +++ b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java @@ -21,10 +21,11 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFn.FakeArgumentProvider; import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.ArgumentProvider; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.FakeArgumentProvider; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -56,7 +57,7 @@ public class DoFnInvokersBenchmark { private StubOldDoFnProcessContext stubOldDoFnContext = new StubOldDoFnProcessContext(oldDoFn, ELEMENT); private StubDoFnProcessContext stubDoFnContext = new StubDoFnProcessContext(doFn, ELEMENT); - private DoFn.ArgumentProvider<String, String> argumentProvider = + private ArgumentProvider<String, String> argumentProvider = new FakeArgumentProvider<>(); private OldDoFn<String, String> adaptedDoFnWithContext;