[
https://issues.apache.org/jira/browse/BEAM-1589?focusedWorklogId=137447&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137447
]
ASF GitHub Bot logged work on BEAM-1589:
----------------------------------------
Author: ASF GitHub Bot
Created on: 23/Aug/18 16:29
Start Date: 23/Aug/18 16:29
Worklog Time Spent: 10m
Work Description: reuvenlax closed pull request #4482: [BEAM-1589] Added
@onWindowExpiration annotation.
URL: https://github.com/apache/beam/pull/4482
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 1ddb1d5b88c..67c658e5ac9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -462,6 +462,32 @@ public Duration getAllowedTimestampSkew() {
String value();
}
+ /**
+ * Annotation for the method to use for performing actions on window
expiration. For example,
+ * users can use this annotation to write a method that extracts a value
saved in a state before
+ * it gets garbage collected on window expiration.
+ *
+ * <p>The method annotated with {@code @OnWindowExpiration} may have
parameters according to the
+ * same logic as {@link OnTimer}. See the following code for an example:
+ *
+ * <pre><code>{@literal new DoFn<KV<Key, Foo>, Baz>()} {
+ *
+ * {@literal @ProcessElement}
+ * public void processElement(ProcessContext c) {
+ * }
+ *
+ * {@literal @OnWindowExpiration}
+ * public void onWindowExpiration() {
+ * ...
+ * }
+ * }</code></pre>
+ */
+ @Documented
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.METHOD)
+ @Experimental(Kind.STATE)
+ public @interface OnWindowExpiration {}
+
/**
* Annotation for the method to use to prepare an instance for processing
bundles of elements.
*
@@ -591,9 +617,9 @@ public Duration getAllowedTimestampSkew() {
* <b><i>Experimental - no backwards compatibility guarantees. The exact
name or usage of this
* feature may change.</i></b>
*
- * <p>Annotation that may be added to a {@link ProcessElement} or {@link
OnTimer} method to
- * indicate that the runner must ensure that the observable contents of the
input {@link
- * PCollection} or mutable state must be stable upon retries.
+ * <p>Annotation that may be added to a {@link ProcessElement}, {@link
OnTimer}, or {@link
+ * OnWindowExpiration} method to indicate that the runner must ensure that
the observable contents
+ * of the input {@link PCollection} or mutable state must be stable upon
retries.
*
* <p>This is important for sinks, which must ensure exactly-once semantics
when writing to a
* storage medium outside of your pipeline. A general pattern for a basic
sink is to write a
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index cea0a4f58ad..11fee7874d8 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -325,6 +325,10 @@ public static RestrictionTracker invokeNewTracker(Object
restriction) {
.intercept(delegateOrNoop(clazzDescription, signature.setup()))
.method(ElementMatchers.named("invokeTeardown"))
.intercept(delegateOrNoop(clazzDescription, signature.teardown()))
+ .method(ElementMatchers.named("invokeOnWindowExpiration"))
+ .intercept(
+ delegateMethodWithExtraParametersOrNoop(
+ clazzDescription, signature.onWindowExpiration()))
.method(ElementMatchers.named("invokeGetInitialRestriction"))
.intercept(
delegateWithDowncastOrThrow(clazzDescription,
signature.getInitialRestriction()))
@@ -392,6 +396,14 @@ private static Implementation delegateOrNoop(
: new DoFnMethodDelegation(doFnType, method.targetMethod());
}
+ /** Delegates method with extra parameters to the given method if available,
or does nothing. */
+ private static Implementation delegateMethodWithExtraParametersOrNoop(
+ TypeDescription doFnType, DoFnSignature.MethodWithExtraParameters
method) {
+ return (method == null)
+ ? FixedValue.originType()
+ : new DoFnMethodWithExtraParametersDelegation(doFnType, method);
+ }
+
/** Delegates to the given method if available, or throws
UnsupportedOperationException. */
private static Implementation delegateWithDowncastOrThrow(
TypeDescription doFnType, DoFnSignature.DoFnMethod method) {
@@ -515,6 +527,47 @@ protected StackManipulation
afterDelegation(MethodDescription instrumentedMethod
}
}
+ /**
+ * Implements {@link DoFnInvoker}'s method with extra parameters by
delegating to a "target
+ * method" of the wrapped {@link DoFn}.
+ */
+ static class DoFnMethodWithExtraParametersDelegation extends
DoFnMethodDelegation {
+ private final DoFnSignature.MethodWithExtraParameters signature;
+
+ public DoFnMethodWithExtraParametersDelegation(
+ TypeDescription clazzDescription,
DoFnSignature.MethodWithExtraParameters signature) {
+ super(clazzDescription, signature.targetMethod());
+ this.signature = signature;
+ }
+
+ @Override
+ protected StackManipulation beforeDelegation(MethodDescription
instrumentedMethod) {
+ // Parameters of the wrapper invoker method:
+ // DoFn.ArgumentProvider
+ // Parameters of the wrapped DoFn method:
+ // a dynamic set of allowed "extra" parameters in any order subject to
+ // validation prior to getting the DoFnSignature
+ ArrayList<StackManipulation> parameters = new ArrayList<>();
+
+ // To load the delegate, push `this` and then access the field
+ StackManipulation pushDelegate =
+ new StackManipulation.Compound(
+ MethodVariableAccess.REFERENCE.loadFrom(0),
+ FieldAccess.forField(delegateField).read());
+
+ StackManipulation pushExtraContextFactory =
MethodVariableAccess.REFERENCE.loadFrom(1);
+
+ // Push the extra arguments in their actual order.
+ for (DoFnSignature.Parameter param : signature.extraParameters()) {
+ parameters.add(
+ new StackManipulation.Compound(
+ pushExtraContextFactory,
+ ByteBuddyDoFnInvokerFactory.getExtraContextParameter(param,
pushDelegate)));
+ }
+ return new StackManipulation.Compound(parameters);
+ }
+ }
+
/**
* Passes parameters to the delegated method by downcasting each parameter
of non-primitive type
* to its expected type.
@@ -722,7 +775,8 @@ public StackManipulation
dispatch(DoFnSignature.Parameter.PipelineOptionsParamet
* Implements the invoker's {@link DoFnInvoker#invokeProcessElement} method
by delegating to the
* {@link ProcessElement} method.
*/
- private static final class ProcessElementDelegation extends
DoFnMethodDelegation {
+ private static final class ProcessElementDelegation
+ extends DoFnMethodWithExtraParametersDelegation {
private static final MethodDescription PROCESS_CONTINUATION_STOP_METHOD;
static {
@@ -739,35 +793,10 @@ public StackManipulation
dispatch(DoFnSignature.Parameter.PipelineOptionsParamet
/** Implementation of {@link MethodDelegation} for the {@link
ProcessElement} method. */
private ProcessElementDelegation(
TypeDescription doFnType, DoFnSignature.ProcessElementMethod
signature) {
- super(doFnType, signature.targetMethod());
+ super(doFnType, signature);
this.signature = signature;
}
- @Override
- protected StackManipulation beforeDelegation(MethodDescription
instrumentedMethod) {
- // Parameters of the wrapper invoker method:
- // DoFnInvoker.ArgumentProvider
- // Parameters of the wrapped DoFn method:
- // [DoFn.ProcessContext, BoundedWindow, InputProvider, OutputReceiver]
in any order
- ArrayList<StackManipulation> pushParameters = new ArrayList<>();
-
- // To load the delegate, push `this` and then access the field
- StackManipulation pushDelegate =
- new StackManipulation.Compound(
- MethodVariableAccess.REFERENCE.loadFrom(0),
- FieldAccess.forField(delegateField).read());
-
- StackManipulation pushExtraContextFactory =
MethodVariableAccess.REFERENCE.loadFrom(1);
-
- // Push the arguments in their actual order.
- for (DoFnSignature.Parameter param : signature.extraParameters()) {
- pushParameters.add(
- new StackManipulation.Compound(
- pushExtraContextFactory, getExtraContextParameter(param,
pushDelegate)));
- }
- return new StackManipulation.Compound(pushParameters);
- }
-
@Override
protected StackManipulation afterDelegation(MethodDescription
instrumentedMethod) {
if
(TypeDescription.VOID.equals(targetMethod.getReturnType().asErasure())) {
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
index b2729dfcc6b..ed1b8a44c31 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
@@ -27,10 +27,8 @@
import com.google.common.io.BaseEncoding;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import net.bytebuddy.ByteBuddy;
-import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.modifier.FieldManifestation;
import net.bytebuddy.description.modifier.Visibility;
import net.bytebuddy.description.type.TypeDescription;
@@ -49,7 +47,7 @@
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.OnTimer;
import org.apache.beam.sdk.transforms.DoFn.TimerId;
-import
org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory.DoFnMethodDelegation;
+import
org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory.DoFnMethodWithExtraParametersDelegation;
/**
* Dynamically generates {@link OnTimerInvoker} instances for invoking a
particular {@link TimerId}
@@ -184,13 +182,13 @@ private ByteBuddyOnTimerInvokerFactory() {}
* An "invokeOnTimer" method implementation akin to @ProcessElement, but
simpler because no
* splitting-related parameters need to be handled.
*/
- private static class InvokeOnTimerDelegation extends DoFnMethodDelegation {
+ private static class InvokeOnTimerDelegation extends
DoFnMethodWithExtraParametersDelegation {
private final DoFnSignature.OnTimerMethod signature;
public InvokeOnTimerDelegation(
TypeDescription clazzDescription, DoFnSignature.OnTimerMethod
signature) {
- super(clazzDescription, signature.targetMethod());
+ super(clazzDescription, signature);
this.signature = signature;
}
@@ -208,33 +206,6 @@ public InstrumentedType prepare(InstrumentedType
instrumentedType) {
// Delegating the method call doesn't require any changes to the
instrumented type.
return instrumentedType;
}
-
- @Override
- protected StackManipulation beforeDelegation(MethodDescription
instrumentedMethod) {
- // Parameters of the wrapper invoker method:
- // DoFn.ArgumentProvider
- // Parameters of the wrapped DoFn method:
- // a dynamic set of allowed "extra" parameters in any order subject to
- // validation prior to getting the DoFnSignature
- ArrayList<StackManipulation> parameters = new ArrayList<>();
-
- // To load the delegate, push `this` and then access the field
- StackManipulation pushDelegate =
- new StackManipulation.Compound(
- MethodVariableAccess.REFERENCE.loadFrom(0),
- FieldAccess.forField(delegateField).read());
-
- StackManipulation pushExtraContextFactory =
MethodVariableAccess.REFERENCE.loadFrom(1);
-
- // Push the extra arguments in their actual order.
- for (DoFnSignature.Parameter param : signature.extraParameters()) {
- parameters.add(
- new StackManipulation.Compound(
- pushExtraContextFactory,
- ByteBuddyDoFnInvokerFactory.getExtraContextParameter(param,
pushDelegate)));
- }
- return new StackManipulation.Compound(parameters);
- }
}
/**
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index 429d06cd6fc..f96e28eb05f 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -57,6 +57,9 @@
/** Invoke the {@link DoFn.Teardown} method on the bound {@link DoFn}. */
void invokeTeardown();
+ /** Invoke the {@link DoFn.OnWindowExpiration} method on the bound {@link
DoFn}. */
+ void invokeOnWindowExpiration(ArgumentProvider<InputT, OutputT> arguments);
+
/**
* Invoke the {@link DoFn.ProcessElement} method on the bound {@link DoFn}.
*
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index f8908c8804d..4085ae6d859 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -84,6 +84,10 @@
@Nullable
public abstract LifecycleMethod teardown();
+ /** Details about this {@link DoFn}'s {@link DoFn.OnWindowExpiration}
method. */
+ @Nullable
+ public abstract OnWindowExpirationMethod onWindowExpiration();
+
/** Timer declarations present on the {@link DoFn} class. Immutable. */
public abstract Map<String, TimerDeclaration> timerDeclarations();
@@ -147,6 +151,8 @@ static Builder builder() {
abstract Builder setTeardown(LifecycleMethod teardown);
+ abstract Builder setOnWindowExpiration(OnWindowExpirationMethod
onWindowExpiration);
+
abstract Builder setGetInitialRestriction(GetInitialRestrictionMethod
getInitialRestriction);
abstract Builder setSplitRestriction(SplitRestrictionMethod
splitRestriction);
@@ -760,6 +766,44 @@ static OnTimerMethod create(
}
}
+ /** Describes a {@link DoFn.OnWindowExpiration} method. */
+ @AutoValue
+ public abstract static class OnWindowExpirationMethod implements
MethodWithExtraParameters {
+
+ /** The annotated method itself. */
+ @Override
+ public abstract Method targetMethod();
+
+ /**
+ * Whether this method requires stable input, expressed via {@link
+ * org.apache.beam.sdk.transforms.DoFn.RequiresStableInput}. For {@link
+ * org.apache.beam.sdk.transforms.DoFn.OnWindowExpiration}, this means
that any state must be
+ * stably persisted prior to calling it.
+ */
+ public abstract boolean requiresStableInput();
+
+ /** The window type used by this method, if any. */
+ @Nullable
+ @Override
+ public abstract TypeDescriptor<? extends BoundedWindow> windowT();
+
+ /** Types of optional parameters of the annotated method, in the order
they appear. */
+ @Override
+ public abstract List<Parameter> extraParameters();
+
+ static OnWindowExpirationMethod create(
+ Method targetMethod,
+ boolean requiresStableInput,
+ TypeDescriptor<? extends BoundedWindow> windowT,
+ List<Parameter> extraParameters) {
+ return new AutoValue_DoFnSignature_OnWindowExpirationMethod(
+ targetMethod,
+ requiresStableInput,
+ windowT,
+ Collections.unmodifiableList(extraParameters));
+ }
+ }
+
/**
* Describes a timer declaration; a field of type {@link TimerSpec}
annotated with {@link
* DoFn.TimerId}.
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index e2279eb9c37..98b9016a607 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -121,6 +121,15 @@ private DoFnSignatures() {}
Parameter.TimerParameter.class,
Parameter.StateParameter.class);
+ private static final Collection<Class<? extends Parameter>>
+ ALLOWED_ON_WINDOW_EXPIRATION_PARAMETERS =
+ ImmutableList.of(
+ Parameter.WindowParameter.class,
+ Parameter.PipelineOptionsParameter.class,
+ Parameter.OutputReceiverParameter.class,
+ Parameter.TaggedOutputReceiverParameter.class,
+ Parameter.StateParameter.class);
+
/** @return the {@link DoFnSignature} for the given {@link DoFn} instance. */
public static <FnT extends DoFn<?, ?>> DoFnSignature signatureForDoFn(FnT
fn) {
return getSignature(fn.getClass());
@@ -336,7 +345,8 @@ private static DoFnSignature parseSignature(Class<? extends
DoFn<?, ?>> fnClass)
findAnnotatedMethod(errors, DoFn.FinishBundle.class, fnClass, false);
Method setupMethod = findAnnotatedMethod(errors, DoFn.Setup.class,
fnClass, false);
Method teardownMethod = findAnnotatedMethod(errors, DoFn.Teardown.class,
fnClass, false);
-
+ Method onWindowExpirationMethod =
+ findAnnotatedMethod(errors, DoFn.OnWindowExpiration.class, fnClass,
false);
Method getInitialRestrictionMethod =
findAnnotatedMethod(errors, DoFn.GetInitialRestriction.class, fnClass,
false);
Method splitRestrictionMethod =
@@ -412,6 +422,12 @@ private static DoFnSignature parseSignature(Class<?
extends DoFn<?, ?>> fnClass)
errors.forMethod(DoFn.Teardown.class, teardownMethod),
teardownMethod));
}
+ if (onWindowExpirationMethod != null) {
+ signatureBuilder.setOnWindowExpiration(
+ analyzeOnWindowExpirationMethod(
+ errors, fnT, onWindowExpirationMethod, inputT, outputT,
fnContext));
+ }
+
ErrorReporter getInitialRestrictionErrors;
if (getInitialRestrictionMethod != null) {
getInitialRestrictionErrors =
@@ -739,6 +755,50 @@ private static void
verifyUnsplittableMethods(ErrorReporter errors, DoFnSignatur
m, timerId, requiresStableInput, windowT, extraParameters);
}
+ @VisibleForTesting
+ static DoFnSignature.OnWindowExpirationMethod
analyzeOnWindowExpirationMethod(
+ ErrorReporter errors,
+ TypeDescriptor<? extends DoFn<?, ?>> fnClass,
+ Method m,
+ TypeDescriptor<?> inputT,
+ TypeDescriptor<?> outputT,
+ FnAnalysisContext fnContext) {
+ errors.checkArgument(void.class.equals(m.getReturnType()), "Must return
void");
+
+ Type[] params = m.getGenericParameterTypes();
+
+ MethodAnalysisContext methodContext = MethodAnalysisContext.create();
+
+ boolean requiresStableInput =
m.isAnnotationPresent(DoFn.RequiresStableInput.class);
+
+ @Nullable TypeDescriptor<? extends BoundedWindow> windowT =
getWindowType(fnClass, m);
+
+ List<DoFnSignature.Parameter> extraParameters = new ArrayList<>();
+ ErrorReporter onWindowExpirationErrors =
errors.forMethod(DoFn.OnWindowExpiration.class, m);
+ for (int i = 0; i < params.length; ++i) {
+ Parameter parameter =
+ analyzeExtraParameter(
+ onWindowExpirationErrors,
+ fnContext,
+ methodContext,
+ fnClass,
+ ParameterDescription.of(
+ m,
+ i,
+ fnClass.resolveType(params[i]),
+ Arrays.asList(m.getParameterAnnotations()[i])),
+ inputT,
+ outputT);
+
+ checkParameterOneOf(errors, parameter,
ALLOWED_ON_WINDOW_EXPIRATION_PARAMETERS);
+
+ extraParameters.add(parameter);
+ }
+
+ return DoFnSignature.OnWindowExpirationMethod.create(
+ m, requiresStableInput, windowT, extraParameters);
+ }
+
@VisibleForTesting
static DoFnSignature.ProcessElementMethod analyzeProcessElementMethod(
ErrorReporter errors,
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 924d0981a7e..54cae0dfae0 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -277,6 +277,42 @@ public void onTimer() {}
verify(fn).processElement(mockProcessContext, mockTimer);
}
+ @Test
+ public void testOnWindowExpirationWithNoParam() throws Exception {
+ class MockFn extends DoFn<String, String> {
+
+ @ProcessElement
+ public void process(ProcessContext c) {}
+
+ @OnWindowExpiration
+ public void onWindowExpiration() {}
+ }
+
+ MockFn fn = mock(MockFn.class);
+ DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn);
+
+ invoker.invokeOnWindowExpiration(mockArgumentProvider);
+ verify(fn).onWindowExpiration();
+ }
+
+ @Test
+ public void testOnWindowExpirationWithParam() {
+ class MockFn extends DoFn<String, String> {
+
+ @ProcessElement
+ public void process(ProcessContext c) {}
+
+ @OnWindowExpiration
+ public void onWindowExpiration(BoundedWindow window) {}
+ }
+
+ MockFn fn = mock(MockFn.class);
+ DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn);
+
+ invoker.invokeOnWindowExpiration(mockArgumentProvider);
+ verify(fn).onWindowExpiration(mockWindow);
+ }
+
@Test
public void testDoFnWithReturn() throws Exception {
class MockFn extends DoFn<String, String> {
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
index 865c91e7f99..5c5e62e5eff 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
@@ -29,6 +29,7 @@
import static org.junit.Assert.fail;
import java.lang.reflect.Field;
+import java.util.List;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
@@ -1054,6 +1055,112 @@ public void foo(ProcessContext context) {}
Matchers.<TypeDescriptor<?>>equalTo(new
TypeDescriptor<ValueState<Integer>>() {}));
}
+ @Test
+ public void testOnWindowExpirationMultipleAnnotation() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Found multiple methods annotated with
@OnWindowExpiration");
+ thrown.expectMessage("bar()");
+ thrown.expectMessage("baz()");
+ thrown.expectMessage(getClass().getName() + "$");
+ DoFnSignatures.getSignature(
+ new DoFn<String, String>() {
+ @ProcessElement
+ public void foo() {}
+
+ @OnWindowExpiration
+ public void bar() {}
+
+ @OnWindowExpiration
+ public void baz() {}
+ }.getClass());
+ }
+
+ @Test
+ public void testOnWindowExpirationMustBePublic() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("OnWindowExpiration");
+ thrown.expectMessage("Must be public");
+ thrown.expectMessage("bar()");
+
+ DoFnSignatures.getSignature(
+ new DoFn<String, String>() {
+ @ProcessElement
+ public void foo() {}
+
+ @OnWindowExpiration
+ void bar() {}
+ }.getClass());
+ }
+
+ @Test
+ public void testOnWindowExpirationDisallowedParameter() throws Exception {
+ // Timers are not allowed in OnWindowExpiration
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Illegal parameter type");
+ thrown.expectMessage("TimerParameter");
+ thrown.expectMessage("myTimer");
+ DoFnSignatures.getSignature(
+ new DoFn<String, String>() {
+ @TimerId("foo")
+ private final TimerSpec myTimer =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @ProcessElement
+ public void foo() {}
+
+ @OnTimer("foo")
+ public void onFoo() {}
+
+ @OnWindowExpiration
+ public void bar(@TimerId("foo") Timer t) {}
+ }.getClass());
+ }
+
+ @Test
+ public void testOnWindowExpirationNoParam() {
+ DoFnSignature sig =
+ DoFnSignatures.getSignature(
+ new DoFn<String, String>() {
+
+ @ProcessElement
+ public void process(ProcessContext c) {}
+
+ @OnWindowExpiration
+ public void bar() {}
+ }.getClass());
+
+ assertThat(sig.onWindowExpiration().extraParameters().size(), equalTo(0));
+ }
+
+ @Test
+ public void testOnWindowExpirationWithAllowedParams() {
+ DoFnSignature sig =
+ DoFnSignatures.getSignature(
+ new DoFn<String, String>() {
+ @StateId("foo")
+ private final StateSpec<ValueState<Integer>> bizzle =
+ StateSpecs.value(VarIntCoder.of());
+
+ @ProcessElement
+ public void process(ProcessContext c) {}
+
+ @OnWindowExpiration
+ public void bar(
+ BoundedWindow b,
+ @StateId("foo") ValueState<Integer> s,
+ PipelineOptions p,
+ OutputReceiver<String> o,
+ MultiOutputReceiver m) {}
+ }.getClass());
+
+ List<Parameter> params = sig.onWindowExpiration().extraParameters();
+ assertThat(params.size(), equalTo(5));
+ assertThat(params.get(0), instanceOf(WindowParameter.class));
+ assertThat(params.get(1), instanceOf(StateParameter.class));
+ assertThat(params.get(2), instanceOf(PipelineOptionsParameter.class));
+ assertThat(params.get(3), instanceOf(OutputReceiverParameter.class));
+ assertThat(params.get(4), instanceOf(TaggedOutputReceiverParameter.class));
+ }
+
private Matcher<String> mentionsTimers() {
return anyOf(containsString("timer"), containsString("Timer"));
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 137447)
Time Spent: 1h 20m (was: 1h 10m)
> Add OnWindowExpiration method to Stateful DoFn
> ----------------------------------------------
>
> Key: BEAM-1589
> URL: https://issues.apache.org/jira/browse/BEAM-1589
> Project: Beam
> Issue Type: New Feature
> Components: runner-core, sdk-java-core
> Reporter: Jingsong Lee
> Assignee: Batkhuyag Batsaikhan
> Priority: Major
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> See BEAM-1517
> This allows the user to do some work before the state's garbage collection.
> It seems kind of annoying, but on the other hand forgetting to set a final
> timer to flush state is probably data loss most of the time.
> FlinkRunner does this work very simply, but other runners, such as
> DirectRunner, need to traverse all the states to do this, and maybe it's a
> little hard.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)