Repository: beam Updated Branches: refs/heads/master f75dfe711 -> 0e429b33f
Add support for PipelineOptions parameters This is a step towards eliminating catch-all context parameters and making DoFns express their fine-grained data needs. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/56cb6c51 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/56cb6c51 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/56cb6c51 Branch: refs/heads/master Commit: 56cb6c51748fde6ad56522733ab10edca062e802 Parents: f75dfe7 Author: Kenneth Knowles <[email protected]> Authored: Tue Jun 13 10:29:50 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Fri Jun 30 15:38:47 2017 -0700 ---------------------------------------------------------------------- ...eBoundedSplittableProcessElementInvoker.java | 5 ++ .../beam/runners/core/SimpleDoFnRunner.java | 20 +++++++ .../apache/beam/sdk/transforms/DoFnTester.java | 5 ++ .../reflect/ByteBuddyDoFnInvokerFactory.java | 6 ++ .../sdk/transforms/reflect/DoFnInvoker.java | 13 +++- .../sdk/transforms/reflect/DoFnSignature.java | 23 +++++++ .../sdk/transforms/reflect/DoFnSignatures.java | 22 ++++++- .../apache/beam/sdk/transforms/ParDoTest.java | 63 ++++++++++++++++++++ .../transforms/reflect/DoFnSignaturesTest.java | 14 +++++ 9 files changed, 169 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/56cb6c51/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java index 2db6531..475abf2 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java @@ -118,6 +118,11 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker< } @Override + public PipelineOptions pipelineOptions() { + return pipelineOptions; + } + + @Override public StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) { throw new IllegalStateException( "Should not access startBundleContext() from @" http://git-wip-us.apache.org/repos/asf/beam/blob/56cb6c51/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 7d7babd..c3bfef6 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -233,6 +233,11 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out } @Override + public PipelineOptions pipelineOptions() { + return getPipelineOptions(); + } + + @Override public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) { return this; } @@ -298,6 +303,11 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out } @Override + public PipelineOptions pipelineOptions() { + return getPipelineOptions(); + } + + @Override public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) { throw new UnsupportedOperationException( "Cannot access StartBundleContext outside of @StartBundle method."); @@ -467,6 +477,11 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out } @Override + public PipelineOptions pipelineOptions() { + return getPipelineOptions(); + } + + @Override public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) { throw new UnsupportedOperationException("StartBundleContext parameters are not supported."); } @@ -568,6 +583,11 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out } @Override + public PipelineOptions pipelineOptions() { + return getPipelineOptions(); + } + + @Override public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) { throw new UnsupportedOperationException("StartBundleContext parameters are not supported."); } http://git-wip-us.apache.org/repos/asf/beam/blob/56cb6c51/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index 4da9a80..b2377dd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -290,6 +290,11 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { } @Override + public PipelineOptions pipelineOptions() { + return getPipelineOptions(); + } + + @Override public DoFn<InputT, OutputT>.StartBundleContext startBundleContext( DoFn<InputT, OutputT> doFn) { throw new UnsupportedOperationException( http://git-wip-us.apache.org/repos/asf/beam/blob/56cb6c51/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java index 4f67db4..8378204 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java @@ -90,6 +90,7 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { public static final String PROCESS_CONTEXT_PARAMETER_METHOD = "processContext"; public static final String ON_TIMER_CONTEXT_PARAMETER_METHOD = "onTimerContext"; public static final String WINDOW_PARAMETER_METHOD = "window"; + public static final String PIPELINE_OPTIONS_PARAMETER_METHOD = "pipelineOptions"; public static final String RESTRICTION_TRACKER_PARAMETER_METHOD = "restrictionTracker"; public static final String STATE_PARAMETER_METHOD = "state"; public static final String TIMER_PARAMETER_METHOD = "timer"; @@ -627,6 +628,11 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { getExtraContextFactoryMethodDescription(TIMER_PARAMETER_METHOD, String.class)), TypeCasting.to(new TypeDescription.ForLoadedType(Timer.class))); } + + @Override + public StackManipulation dispatch(DoFnSignature.Parameter.PipelineOptionsParameter p) { + return simpleExtraContextParameter(PIPELINE_OPTIONS_PARAMETER_METHOD); + } }); } http://git-wip-us.apache.org/repos/asf/beam/blob/56cb6c51/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java index ed81f42..3b22fda 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.transforms.reflect; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.state.State; import org.apache.beam.sdk.state.Timer; import org.apache.beam.sdk.transforms.DoFn; @@ -102,7 +103,12 @@ public interface DoFnInvoker<InputT, OutputT> { */ BoundedWindow window(); - /** Provide a {@link DoFn.StartBundleContext} to use with the given {@link DoFn}. */ + /** Provide {@link PipelineOptions}. */ + PipelineOptions pipelineOptions(); + + /** + * Provide a {@link DoFn.StartBundleContext} to use with the given {@link DoFn}. + */ DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn); /** Provide a {@link DoFn.FinishBundleContext} to use with the given {@link DoFn}. */ @@ -140,6 +146,11 @@ public interface DoFnInvoker<InputT, OutputT> { } @Override + public PipelineOptions pipelineOptions() { + return null; + } + + @Override public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) { return null; } http://git-wip-us.apache.org/repos/asf/beam/blob/56cb6c51/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java index 0b4bf90..6eeed8e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.state.State; import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.state.Timer; @@ -193,6 +194,8 @@ public abstract class DoFnSignature { return cases.dispatch((StateParameter) this); } else if (this instanceof TimerParameter) { return cases.dispatch((TimerParameter) this); + } else if (this instanceof PipelineOptionsParameter) { + return cases.dispatch((PipelineOptionsParameter) this); } else { throw new IllegalStateException( String.format("Attempt to case match on unknown %s subclass %s", @@ -212,6 +215,7 @@ public abstract class DoFnSignature { ResultT dispatch(RestrictionTrackerParameter p); ResultT dispatch(StateParameter p); ResultT dispatch(TimerParameter p); + ResultT dispatch(PipelineOptionsParameter p); /** * A base class for a visitor with a default method for cases it is not interested in. @@ -259,6 +263,11 @@ public abstract class DoFnSignature { public ResultT dispatch(TimerParameter p) { return dispatchDefault(p); } + + @Override + public ResultT dispatch(PipelineOptionsParameter p) { + return dispatchDefault(p); + } } } @@ -287,6 +296,11 @@ public abstract class DoFnSignature { return new AutoValue_DoFnSignature_Parameter_WindowParameter(windowT); } + /** Returns a {@link PipelineOptionsParameter}. */ + public static PipelineOptionsParameter pipelineOptions() { + return new AutoValue_DoFnSignature_Parameter_PipelineOptionsParameter(); + } + /** * Returns a {@link RestrictionTrackerParameter}. */ @@ -306,6 +320,14 @@ public abstract class DoFnSignature { } /** + * Descriptor for a {@link Parameter} of a subtype of {@link PipelineOptions}. + */ + @AutoValue + public abstract static class PipelineOptionsParameter extends Parameter { + PipelineOptionsParameter() {} + } + + /** * Descriptor for a {@link Parameter} of type {@link DoFn.StartBundleContext}. * * <p>All such descriptors are equal. @@ -314,6 +336,7 @@ public abstract class DoFnSignature { public abstract static class StartBundleContextParameter extends Parameter { StartBundleContextParameter() {} } + /** * Descriptor for a {@link Parameter} of type {@link DoFn.FinishBundleContext}. * http://git-wip-us.apache.org/repos/asf/beam/blob/56cb6c51/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java index bb191b1..1b27e66 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java @@ -42,6 +42,7 @@ import java.util.List; import java.util.Map; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.state.State; import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.state.Timer; @@ -78,19 +79,23 @@ public class DoFnSignatures { ImmutableList.of( Parameter.ProcessContextParameter.class, Parameter.WindowParameter.class, + Parameter.PipelineOptionsParameter.class, Parameter.TimerParameter.class, Parameter.StateParameter.class); private static final Collection<Class<? extends Parameter>> ALLOWED_SPLITTABLE_PROCESS_ELEMENT_PARAMETERS = ImmutableList.of( - Parameter.ProcessContextParameter.class, Parameter.RestrictionTrackerParameter.class); + Parameter.PipelineOptionsParameter.class, + Parameter.ProcessContextParameter.class, + Parameter.RestrictionTrackerParameter.class); private static final Collection<Class<? extends Parameter>> ALLOWED_ON_TIMER_PARAMETERS = ImmutableList.of( Parameter.OnTimerContextParameter.class, Parameter.WindowParameter.class, + Parameter.PipelineOptionsParameter.class, Parameter.TimerParameter.class, Parameter.StateParameter.class); @@ -187,6 +192,15 @@ public class DoFnSignatures { extraParameters, Predicates.instanceOf(WindowParameter.class)); } + /** + * Indicates whether a {@link Parameter.PipelineOptionsParameter} is + * known in this context. + */ + public boolean hasPipelineOptionsParamter() { + return Iterables.any( + extraParameters, Predicates.instanceOf(Parameter.PipelineOptionsParameter.class)); + } + /** The window type, if any, used by this method. */ @Nullable public TypeDescriptor<? extends BoundedWindow> getWindowType() { @@ -789,6 +803,12 @@ public class DoFnSignatures { "Multiple %s parameters", BoundedWindow.class.getSimpleName()); return Parameter.boundedWindow((TypeDescriptor<? extends BoundedWindow>) paramT); + } else if (PipelineOptions.class.equals(rawType)) { + methodErrors.checkArgument( + !methodContext.hasPipelineOptionsParamter(), + "Multiple %s parameters", + PipelineOptions.class.getSimpleName()); + return Parameter.pipelineOptions(); } else if (RestrictionTracker.class.isAssignableFrom(rawType)) { methodErrors.checkArgument( !methodContext.hasRestrictionTrackerParameter(), http://git-wip-us.apache.org/repos/asf/beam/blob/56cb6c51/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index c67cf2a..5b60ef3 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -62,6 +62,8 @@ import org.apache.beam.sdk.coders.SetCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.state.BagState; import org.apache.beam.sdk.state.CombiningState; import org.apache.beam.sdk.state.MapState; @@ -2942,4 +2944,65 @@ public class ParDoTest implements Serializable { // If it doesn't crash, we made it! } + + /** A {@link PipelineOptions} subclass for testing passing to a {@link DoFn}. */ + public interface MyOptions extends PipelineOptions { + @Default.String("fake option") + String getFakeOption(); + void setFakeOption(String value); + } + + @Test + @Category(ValidatesRunner.class) + public void testPipelineOptionsParameter() { + PCollection<String> results = pipeline + .apply(Create.of(1)) + .apply( + ParDo.of( + new DoFn<Integer, String>() { + @ProcessElement + public void process(ProcessContext c, PipelineOptions options) { + c.output(options.as(MyOptions.class).getFakeOption()); + } + })); + + String testOptionValue = "not fake anymore"; + pipeline.getOptions().as(MyOptions.class).setFakeOption(testOptionValue); + PAssert.that(results).containsInAnyOrder("not fake anymore"); + + pipeline.run(); + } + + @Test + @Category({ValidatesRunner.class, UsesTimersInParDo.class}) + public void testPipelineOptionsParameterOnTimer() { + final String timerId = "thisTimer"; + + PCollection<String> results = + pipeline + .apply(Create.of(KV.of(0, 0))) + .apply( + ParDo.of( + new DoFn<KV<Integer, Integer>, String>() { + @TimerId(timerId) + private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void process( + ProcessContext c, BoundedWindow w, @TimerId(timerId) Timer timer) { + timer.set(w.maxTimestamp()); + } + + @OnTimer(timerId) + public void onTimer(OnTimerContext c, PipelineOptions options) { + c.output(options.as(MyOptions.class).getFakeOption()); + } + })); + + String testOptionValue = "not fake anymore"; + pipeline.getOptions().as(MyOptions.class).setFakeOption(testOptionValue); + PAssert.that(results).containsInAnyOrder("not fake anymore"); + + pipeline.run(); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/56cb6c51/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java index cffb0ad..70c8dfd 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java @@ -29,6 +29,7 @@ import static org.junit.Assert.fail; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.state.StateSpecs; import org.apache.beam.sdk.state.TimeDomain; @@ -329,6 +330,19 @@ public class DoFnSignaturesTest { } @Test + public void testPipelineOptionsParameter() throws Exception { + DoFnSignature sig = + DoFnSignatures.getSignature(new DoFn<String, String>() { + @ProcessElement + public void process(ProcessContext c, PipelineOptions options) {} + }.getClass()); + + assertThat( + sig.processElement().extraParameters(), + Matchers.<Parameter>hasItem(instanceOf(Parameter.PipelineOptionsParameter.class))); + } + + @Test public void testDeclAndUsageOfTimerInSuperclass() throws Exception { DoFnSignature sig = DoFnSignatures.getSignature(new DoFnOverridingAbstractTimerUse().getClass());
