Add OnTimerContext parameter support to DoFnSignature
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/42b506f0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/42b506f0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/42b506f0 Branch: refs/heads/master Commit: 42b506f06dbd73e03a2cfad4e7677e9698b3c020 Parents: 3f8c807 Author: Kenneth Knowles <[email protected]> Authored: Tue Dec 6 20:18:18 2016 -0800 Committer: Kenneth Knowles <[email protected]> Committed: Wed Dec 7 19:22:43 2016 -0800 ---------------------------------------------------------------------- .../reflect/ByteBuddyDoFnInvokerFactory.java | 6 ++ .../sdk/transforms/reflect/DoFnSignature.java | 26 +++++- .../sdk/transforms/reflect/DoFnSignatures.java | 90 ++++++++++++++++---- .../DoFnSignaturesSplittableDoFnTest.java | 3 +- .../transforms/reflect/DoFnSignaturesTest.java | 47 ++++++++++ 5 files changed, 154 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42b506f0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java index 8750d64..3480603 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java @@ -69,6 +69,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.OnTimerMethod; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Cases; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.ContextParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.InputProviderParameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OnTimerContextParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OutputReceiverParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.ProcessContextParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter; @@ -554,6 +555,11 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { } @Override + public StackManipulation dispatch(OnTimerContextParameter p) { + throw new UnsupportedOperationException("OnTimerContext is not yet supported."); + } + + @Override public StackManipulation dispatch(WindowParameter p) { return new StackManipulation.Compound( simpleExtraContextParameter(WINDOW_PARAMETER_METHOD), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42b506f0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java index 0750949..ccc9ac3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java @@ -175,6 +175,8 @@ public abstract class DoFnSignature { return cases.dispatch((ContextParameter) this); } else if (this instanceof ProcessContextParameter) { return cases.dispatch((ProcessContextParameter) this); + } else if (this instanceof OnTimerContextParameter) { + return cases.dispatch((OnTimerContextParameter) this); } else if (this instanceof WindowParameter) { return cases.dispatch((WindowParameter) this); } else if (this instanceof RestrictionTrackerParameter) { @@ -200,6 +202,7 @@ public abstract class DoFnSignature { public interface Cases<ResultT> { ResultT dispatch(ContextParameter p); ResultT dispatch(ProcessContextParameter p); + ResultT dispatch(OnTimerContextParameter p); ResultT dispatch(WindowParameter p); ResultT dispatch(InputProviderParameter p); ResultT dispatch(OutputReceiverParameter p); @@ -225,6 +228,11 @@ public abstract class DoFnSignature { } @Override + public ResultT dispatch(OnTimerContextParameter p) { + return dispatchDefault(p); + } + + @Override public ResultT dispatch(WindowParameter p) { return dispatchDefault(p); } @@ -261,12 +269,14 @@ public abstract class DoFnSignature { new AutoValue_DoFnSignature_Parameter_ContextParameter(); private static final ProcessContextParameter PROCESS_CONTEXT_PARAMETER = new AutoValue_DoFnSignature_Parameter_ProcessContextParameter(); + private static final OnTimerContextParameter ON_TIMER_CONTEXT_PARAMETER = + new AutoValue_DoFnSignature_Parameter_OnTimerContextParameter(); private static final InputProviderParameter INPUT_PROVIDER_PARAMETER = new AutoValue_DoFnSignature_Parameter_InputProviderParameter(); private static final OutputReceiverParameter OUTPUT_RECEIVER_PARAMETER = new AutoValue_DoFnSignature_Parameter_OutputReceiverParameter(); - /** Returns a {@link ProcessContextParameter}. */ + /** Returns a {@link ContextParameter}. */ public static ContextParameter context() { return CONTEXT_PARAMETER; } @@ -276,6 +286,11 @@ public abstract class DoFnSignature { return PROCESS_CONTEXT_PARAMETER; } + /** Returns a {@link OnTimerContextParameter}. */ + public static OnTimerContextParameter onTimerContext() { + return ON_TIMER_CONTEXT_PARAMETER; + } + /** Returns a {@link WindowParameter}. */ public static WindowParameter boundedWindow(TypeDescriptor<? extends BoundedWindow> windowT) { return new AutoValue_DoFnSignature_Parameter_WindowParameter(windowT); @@ -334,6 +349,15 @@ public abstract class DoFnSignature { } /** + * Descriptor for a {@link Parameter} of type {@link DoFn.OnTimerContext}. + * + * <p>All such descriptors are equal. + */ + @AutoValue + public abstract static class OnTimerContextParameter extends Parameter { + OnTimerContextParameter() {} + } + /** * Descriptor for a {@link Parameter} of type {@link BoundedWindow}. * * <p>All such descriptors are equal. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42b506f0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java index 83d67b7..e3ba966 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java @@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkState; import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; @@ -47,7 +48,6 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.StateId; import org.apache.beam.sdk.transforms.DoFn.TimerId; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.ProcessContextParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter; @@ -74,6 +74,29 @@ public class DoFnSignatures { private static final Map<Class<?>, DoFnSignature> signatureCache = new LinkedHashMap<>(); + private static final Collection<Class<? extends Parameter>> + ALLOWED_NON_SPLITTABLE_PROCESS_ELEMENT_PARAMETERS = + ImmutableList.of( + Parameter.ProcessContextParameter.class, + Parameter.WindowParameter.class, + Parameter.TimerParameter.class, + Parameter.StateParameter.class, + Parameter.InputProviderParameter.class, + Parameter.OutputReceiverParameter.class); + + private static final Collection<Class<? extends Parameter>> + ALLOWED_SPLITTABLE_PROCESS_ELEMENT_PARAMETERS = + ImmutableList.of( + Parameter.ProcessContextParameter.class, Parameter.RestrictionTrackerParameter.class); + + private static final Collection<Class<? extends Parameter>> + ALLOWED_ON_TIMER_PARAMETERS = + ImmutableList.of( + Parameter.OnTimerContextParameter.class, + Parameter.WindowParameter.class, + Parameter.TimerParameter.class, + Parameter.StateParameter.class); + /** @return the {@link DoFnSignature} for the given {@link DoFn} instance. */ public static <FnT extends DoFn<?, ?>> DoFnSignature signatureForDoFn(FnT fn) { return getSignature(fn.getClass()); @@ -583,6 +606,18 @@ public class DoFnSignatures { } /** + * Generates a {@link TypeDescriptor} for {@code DoFn<InputT, OutputT>.Context} given {@code + * InputT} and {@code OutputT}. + */ + private static <InputT, OutputT> + TypeDescriptor<DoFn<InputT, OutputT>.OnTimerContext> doFnOnTimerContextTypeOf( + TypeDescriptor<InputT> inputT, TypeDescriptor<OutputT> outputT) { + return new TypeDescriptor<DoFn<InputT, OutputT>.OnTimerContext>() {}.where( + new TypeParameter<InputT>() {}, inputT) + .where(new TypeParameter<OutputT>() {}, outputT); + } + + /** * Generates a {@link TypeDescriptor} for {@code DoFn.InputProvider<InputT>} given {@code InputT}. */ private static <InputT> TypeDescriptor<DoFn.InputProvider<InputT>> inputProviderTypeOf( @@ -621,7 +656,7 @@ public class DoFnSignatures { List<DoFnSignature.Parameter> extraParameters = new ArrayList<>(); ErrorReporter onTimerErrors = errors.forMethod(DoFn.OnTimer.class, m); for (int i = 0; i < params.length; ++i) { - extraParameters.add( + Parameter parameter = analyzeExtraParameter( onTimerErrors, fnContext, @@ -633,7 +668,14 @@ public class DoFnSignatures { fnClass.resolveType(params[i]), Arrays.asList(m.getParameterAnnotations()[i])), inputT, - outputT)); + outputT); + + checkParameterOneOf( + errors, + parameter, + ALLOWED_ON_TIMER_PARAMETERS); + + extraParameters.add(parameter); } return DoFnSignature.OnTimerMethod.create(m, timerId, windowT, extraParameters); @@ -679,20 +721,15 @@ public class DoFnSignatures { methodContext.addParameter(extraParam); } - // A splittable DoFn can not have any other extra context parameters. + // The allowed parameters depend on whether this DoFn is splittable if (methodContext.hasRestrictionTrackerParameter()) { - errors.checkArgument( - Iterables.all( - methodContext.getExtraParameters(), - Predicates.or( - Predicates.instanceOf(RestrictionTrackerParameter.class), - Predicates.instanceOf(ProcessContextParameter.class))), - "Splittable %s @%s must have only %s and %s parameters, but has: %s", - DoFn.class.getSimpleName(), - DoFn.ProcessElement.class.getSimpleName(), - DoFn.ProcessContext.class.getSimpleName(), - RestrictionTracker.class.getSimpleName(), - methodContext.getExtraParameters()); + for (Parameter parameter : methodContext.getExtraParameters()) { + checkParameterOneOf(errors, parameter, ALLOWED_SPLITTABLE_PROCESS_ELEMENT_PARAMETERS); + } + } else { + for (Parameter parameter : methodContext.getExtraParameters()) { + checkParameterOneOf(errors, parameter, ALLOWED_NON_SPLITTABLE_PROCESS_ELEMENT_PARAMETERS); + } } return DoFnSignature.ProcessElementMethod.create( @@ -703,6 +740,21 @@ public class DoFnSignatures { DoFn.ProcessContinuation.class.equals(m.getReturnType())); } + private static void checkParameterOneOf( + ErrorReporter errors, + Parameter parameter, + Collection<Class<? extends Parameter>> allowedParameterClasses) { + + for (Class<? extends Parameter> paramClass : allowedParameterClasses) { + if (paramClass.isAssignableFrom(parameter.getClass())) { + return; + } + } + + // If we get here, none matched + errors.throwIllegalArgument("Illegal parameter type: %s", parameter); + } + private static Parameter analyzeExtraParameter( ErrorReporter methodErrors, FnAnalysisContext fnContext, @@ -714,6 +766,7 @@ public class DoFnSignatures { TypeDescriptor<?> expectedProcessContextT = doFnProcessContextTypeOf(inputT, outputT); TypeDescriptor<?> expectedContextT = doFnContextTypeOf(inputT, outputT); + TypeDescriptor<?> expectedOnTimerContextT = doFnOnTimerContextTypeOf(inputT, outputT); TypeDescriptor<?> expectedInputProviderT = inputProviderTypeOf(inputT); TypeDescriptor<?> expectedOutputReceiverT = outputReceiverTypeOf(outputT); @@ -732,6 +785,11 @@ public class DoFnSignatures { "Must take %s as the Context argument", formatType(expectedContextT)); return Parameter.context(); + } else if (rawType.equals(DoFn.OnTimerContext.class)) { + methodErrors.checkArgument(paramT.equals(expectedOnTimerContextT), + "Must take %s as the OnTimerContext argument", + formatType(expectedOnTimerContextT)); + return Parameter.onTimerContext(); } else if (BoundedWindow.class.isAssignableFrom(rawType)) { methodErrors.checkArgument( !methodContext.hasWindowParameter(), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42b506f0/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java index 91f2d1b..7b594c9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java @@ -90,7 +90,8 @@ public class DoFnSignaturesSplittableDoFnTest { @Test public void testSplittableProcessElementMustNotHaveOtherParams() throws Exception { thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("must have only ProcessContext and RestrictionTracker parameters"); + thrown.expectMessage("Illegal parameter"); + thrown.expectMessage("BoundedWindow"); DoFnSignature.ProcessElementMethod signature = analyzeProcessElementMethod( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42b506f0/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java index 1381cd9..69d4058 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java @@ -442,6 +442,53 @@ public class DoFnSignaturesTest { } @Test + public void testSimpleTimerWithContext() throws Exception { + DoFnSignature sig = + DoFnSignatures.getSignature( + new DoFn<KV<String, Integer>, Long>() { + @TimerId("foo") + private final TimerSpec bizzle = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void foo(ProcessContext context) {} + + @OnTimer("foo") + public void onFoo(OnTimerContext c) {} + }.getClass()); + + assertThat(sig.timerDeclarations().size(), equalTo(1)); + DoFnSignature.TimerDeclaration decl = sig.timerDeclarations().get("foo"); + + assertThat(decl.id(), equalTo("foo")); + assertThat(decl.field().getName(), equalTo("bizzle")); + + assertThat( + sig.onTimerMethods().get("foo").extraParameters().get(0), + equalTo((Parameter) Parameter.onTimerContext())); + } + + @Test + public void testProcessElementWithOnTimerContextRejected() throws Exception { + thrown.expect(IllegalArgumentException.class); + // The message should at least mention @ProcessElement and OnTimerContext + thrown.expectMessage("@" + DoFn.ProcessElement.class.getSimpleName()); + thrown.expectMessage(DoFn.OnTimerContext.class.getSimpleName()); + + DoFnSignature sig = + DoFnSignatures.getSignature( + new DoFn<KV<String, Integer>, Long>() { + @TimerId("foo") + private final TimerSpec bizzle = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void foo(ProcessContext context, OnTimerContext bogus) {} + + @OnTimer("foo") + public void onFoo() {} + }.getClass()); + } + + @Test public void testSimpleTimerIdNamedDoFn() throws Exception { class DoFnForTestSimpleTimerIdNamedDoFn extends DoFn<KV<String, Integer>, Long> { @TimerId("foo")
