Repository: incubator-beam
Updated Branches:
  refs/heads/master ac252a7e1 -> 9de9ce69f


Allow BoundedWindow subclasses in DoFn parameter list


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9c3e59fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9c3e59fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9c3e59fa

Branch: refs/heads/master
Commit: 9c3e59fab86e93477f14e0709ae8ecc37b84f3ef
Parents: 85b908b
Author: Kenneth Knowles <k...@google.com>
Authored: Thu Nov 3 21:30:25 2016 -0700
Committer: Kenneth Knowles <k...@google.com>
Committed: Mon Nov 7 15:25:03 2016 -0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/ParDo.java   | 43 +++++++++++++-
 .../sdk/transforms/reflect/DoFnInvokers.java    |  8 ++-
 .../sdk/transforms/reflect/DoFnSignature.java   | 40 +++++++++----
 .../sdk/transforms/reflect/DoFnSignatures.java  | 41 +++++++++++--
 .../beam/sdk/transforms/windowing/WindowFn.java | 12 ++++
 .../apache/beam/sdk/transforms/ParDoTest.java   | 61 ++++++++++++++++++++
 .../transforms/reflect/DoFnInvokersTest.java    |  6 +-
 7 files changed, 190 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9c3e59fa/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 0684a5c..26799c0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -32,7 +32,10 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.MethodWithExtraParameters;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.OnTimerMethod;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.StringUtils;
@@ -41,6 +44,7 @@ import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.TypedPValue;
 
 /**
@@ -548,6 +552,42 @@ public class ParDo {
   }
 
   /**
+   * Perform common validations of the {@link DoFn} against the input {@link 
PCollection}, for
+   * example ensuring that the window type expected by the {@link DoFn} 
matches the window type of
+   * the {@link PCollection}.
+   */
+  private static <InputT, OutputT> void validateWindowType(
+      PCollection<? extends InputT> input, Serializable fn) {
+    // No validation for OldDoFn
+    if (!(fn instanceof DoFn)) {
+      return;
+    }
+
+    DoFnSignature signature = DoFnSignatures.INSTANCE.getSignature((Class) 
fn.getClass());
+
+    TypeDescriptor<? extends BoundedWindow> actualWindowT =
+        input.getWindowingStrategy().getWindowFn().getWindowTypeDescriptor();
+
+    validateWindowTypeForMethod(actualWindowT, signature.processElement());
+    for (OnTimerMethod method : signature.onTimerMethods().values()) {
+      validateWindowTypeForMethod(actualWindowT, method);
+    }
+  }
+
+  private static void validateWindowTypeForMethod(
+      TypeDescriptor<? extends BoundedWindow> actualWindowT,
+      MethodWithExtraParameters methodSignature) {
+    if (methodSignature.windowT() != null) {
+      checkArgument(
+          methodSignature.windowT().isSupertypeOf(actualWindowT),
+          "%s expects window type %s, which is not a supertype of actual 
window type %s",
+          methodSignature.targetMethod(),
+          methodSignature.windowT(),
+          actualWindowT);
+    }
+  }
+
+  /**
    * Perform common validations of the {@link DoFn}, for example ensuring that 
state is used
    * correctly and that its features can be supported.
    */
@@ -768,6 +808,7 @@ public class ParDo {
     public PCollection<OutputT> apply(PCollection<? extends InputT> input) {
       checkArgument(
           !isSplittable(getOldFn()), "Splittable DoFn not supported by the 
current runner");
+      validateWindowType(input, fn);
       return PCollection.<OutputT>createPrimitiveOutputInternal(
               input.getPipeline(),
               input.getWindowingStrategy(),
@@ -1024,7 +1065,7 @@ public class ParDo {
     public PCollectionTuple apply(PCollection<? extends InputT> input) {
       checkArgument(
           !isSplittable(getOldFn()), "Splittable DoFn not supported by the 
current runner");
-
+      validateWindowType(input, fn);
       PCollectionTuple outputs = PCollectionTuple.ofPrimitiveOutputsInternal(
           input.getPipeline(),
           TupleTagList.of(mainOutputTag).and(sideOutputTags.getAll()),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9c3e59fa/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
index ba95f98..b975711 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
@@ -510,8 +510,8 @@ public class DoFnInvokers {
   private static MethodDescription getExtraContextFactoryMethodDescription(
       String methodName, Class<?>... parameterTypes) {
     try {
-    return new MethodDescription.ForLoadedMethod(
-                DoFn.ExtraContextFactory.class.getMethod(methodName, 
parameterTypes));
+      return new MethodDescription.ForLoadedMethod(
+          DoFn.ExtraContextFactory.class.getMethod(methodName, 
parameterTypes));
     } catch (Exception e) {
       throw new IllegalStateException(
           String.format(
@@ -538,7 +538,9 @@ public class DoFnInvokers {
 
           @Override
           public StackManipulation dispatch(WindowParameter p) {
-            return simpleExtraContextParameter("window", 
pushExtraContextFactory);
+            return new StackManipulation.Compound(
+                simpleExtraContextParameter("window", pushExtraContextFactory),
+                TypeCasting.to(new 
TypeDescription.ForLoadedType(p.windowT().getRawType())));
           }
 
           @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9c3e59fa/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index befc10b..4cbe219 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.transforms.DoFn.StateId;
 import org.apache.beam.sdk.transforms.DoFn.TimerId;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter;
+import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -144,6 +145,10 @@ public abstract class DoFnSignature {
      * <p>Validation that these are allowed is external to this class.
      */
     List<Parameter> extraParameters();
+
+    /** The type of window expected by this method, if any. */
+    @Nullable
+    TypeDescriptor<? extends BoundedWindow> windowT();
   }
 
   /** A descriptor for an optional parameter of the {@link 
DoFn.ProcessElement} method. */
@@ -229,18 +234,14 @@ public abstract class DoFnSignature {
     }
 
     // These parameter descriptors are constant
-    private static final WindowParameter BOUNDED_WINDOW_PARAMETER =
-        new AutoValue_DoFnSignature_Parameter_WindowParameter();
     private static final InputProviderParameter INPUT_PROVIDER_PARAMETER =
         new AutoValue_DoFnSignature_Parameter_InputProviderParameter();
     private static final OutputReceiverParameter OUTPUT_RECEIVER_PARAMETER =
         new AutoValue_DoFnSignature_Parameter_OutputReceiverParameter();
 
-    /**
-     * Returns a {@link WindowParameter}.
-     */
-    public static WindowParameter boundedWindow() {
-      return BOUNDED_WINDOW_PARAMETER;
+    /** Returns a {@link WindowParameter}. */
+    public static WindowParameter boundedWindow(TypeDescriptor<? extends 
BoundedWindow> windowT) {
+      return new AutoValue_DoFnSignature_Parameter_WindowParameter(windowT);
     }
 
     /**
@@ -283,6 +284,7 @@ public abstract class DoFnSignature {
     @AutoValue
     public abstract static class WindowParameter extends Parameter {
       WindowParameter() {}
+      public abstract TypeDescriptor<? extends BoundedWindow> windowT();
     }
 
     /**
@@ -357,6 +359,10 @@ public abstract class DoFnSignature {
     @Nullable
     public abstract TypeDescriptor<?> trackerT();
 
+    /** The window type used by this method, if any. */
+    @Nullable
+    public abstract TypeDescriptor<? extends BoundedWindow> windowT();
+
     /** Whether this {@link DoFn} returns a {@link ProcessContinuation} or 
void. */
     public abstract boolean hasReturnValue();
 
@@ -364,9 +370,14 @@ public abstract class DoFnSignature {
         Method targetMethod,
         List<Parameter> extraParameters,
         TypeDescriptor<?> trackerT,
+        @Nullable TypeDescriptor<? extends BoundedWindow> windowT,
         boolean hasReturnValue) {
       return new AutoValue_DoFnSignature_ProcessElementMethod(
-          targetMethod, Collections.unmodifiableList(extraParameters), 
trackerT, hasReturnValue);
+          targetMethod,
+          Collections.unmodifiableList(extraParameters),
+          trackerT,
+          windowT,
+          hasReturnValue);
     }
 
     /**
@@ -381,6 +392,7 @@ public abstract class DoFnSignature {
           extraParameters(),
           Predicates.or(
               Predicates.instanceOf(WindowParameter.class),
+              Predicates.instanceOf(TimerParameter.class),
               Predicates.instanceOf(StateParameter.class)));
     }
 
@@ -404,13 +416,21 @@ public abstract class DoFnSignature {
     @Override
     public abstract Method targetMethod();
 
+    /** The window type used by this method, if any. */
+    @Nullable
+    public abstract TypeDescriptor<? extends BoundedWindow> windowT();
+
     /** Types of optional parameters of the annotated method, in the order 
they appear. */
     @Override
     public abstract List<Parameter> extraParameters();
 
-    static OnTimerMethod create(Method targetMethod, String id, 
List<Parameter> extraParameters) {
+    static OnTimerMethod create(
+        Method targetMethod,
+        String id,
+        TypeDescriptor<? extends BoundedWindow> windowT,
+        List<Parameter> extraParameters) {
       return new AutoValue_DoFnSignature_OnTimerMethod(
-          id, targetMethod, Collections.unmodifiableList(extraParameters));
+          id, targetMethod, windowT, 
Collections.unmodifiableList(extraParameters));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9c3e59fa/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 09c5f3d..e918182 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -50,6 +50,7 @@ import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter;
+import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
@@ -149,6 +150,9 @@ public class DoFnSignatures {
     private final Map<String, TimerParameter> timerParameters = new 
HashMap<>();
     private final List<Parameter> extraParameters = new ArrayList<>();
 
+    @Nullable
+    private TypeDescriptor<? extends BoundedWindow> windowT;
+
     private MethodAnalysisContext() {}
 
     /** Indicates whether a {@link RestrictionTrackerParameter} is known in 
this context. */
@@ -157,6 +161,18 @@ public class DoFnSignatures {
           extraParameters, 
Predicates.instanceOf(RestrictionTrackerParameter.class));
     }
 
+    /** Indicates whether a {@link WindowParameter} is known in this context. 
*/
+    public boolean hasWindowParameter() {
+      return Iterables.any(
+          extraParameters, Predicates.instanceOf(WindowParameter.class));
+    }
+
+    /** The window type, if any, used by this method. */
+    @Nullable
+    public TypeDescriptor<? extends BoundedWindow> getWindowType() {
+      return windowT;
+    }
+
     /** State parameters declared in this context, keyed by {@link StateId}. */
     public Map<String, StateParameter> getStateParameters() {
       return Collections.unmodifiableMap(stateParameters);
@@ -599,6 +615,8 @@ public class DoFnSignatures {
 
     MethodAnalysisContext methodContext = MethodAnalysisContext.create();
 
+    @Nullable TypeDescriptor<? extends BoundedWindow> windowT = 
getWindowType(fnClass, m);
+
     List<DoFnSignature.Parameter> extraParameters = new ArrayList<>();
     TypeDescriptor<?> expectedOutputReceiverT = outputReceiverTypeOf(outputT);
     ErrorReporter onTimerErrors = errors.forMethod(DoFn.OnTimer.class, m);
@@ -618,7 +636,7 @@ public class DoFnSignatures {
               expectedOutputReceiverT));
     }
 
-    return DoFnSignature.OnTimerMethod.create(m, timerId, extraParameters);
+    return DoFnSignature.OnTimerMethod.create(m, timerId, windowT, 
extraParameters);
   }
 
   @VisibleForTesting
@@ -650,6 +668,7 @@ public class DoFnSignatures {
         formatType(processContextT));
 
     TypeDescriptor<?> trackerT = getTrackerType(fnClass, m);
+    TypeDescriptor<? extends BoundedWindow> windowT = getWindowType(fnClass, 
m);
     TypeDescriptor<?> expectedInputProviderT = inputProviderTypeOf(inputT);
     TypeDescriptor<?> expectedOutputReceiverT = outputReceiverTypeOf(outputT);
     for (int i = 1; i < params.length; ++i) {
@@ -684,6 +703,7 @@ public class DoFnSignatures {
         m,
         methodContext.getExtraParameters(),
         trackerT,
+        windowT,
         DoFn.ProcessContinuation.class.equals(m.getReturnType()));
   }
 
@@ -700,12 +720,12 @@ public class DoFnSignatures {
 
     ErrorReporter paramErrors = methodErrors.forParameter(param);
 
-    if (rawType.equals(BoundedWindow.class)) {
+    if (BoundedWindow.class.isAssignableFrom(rawType)) {
       methodErrors.checkArgument(
-          
!methodContext.getExtraParameters().contains(Parameter.boundedWindow()),
+          !methodContext.hasWindowParameter(),
           "Multiple %s parameters",
           BoundedWindow.class.getSimpleName());
-      return Parameter.boundedWindow();
+      return Parameter.boundedWindow((TypeDescriptor<? extends BoundedWindow>) 
paramT);
     } else if (rawType.equals(DoFn.InputProvider.class)) {
       methodErrors.checkArgument(
           
!methodContext.getExtraParameters().contains(Parameter.inputProvider()),
@@ -856,6 +876,19 @@ public class DoFnSignatures {
     return null;
   }
 
+  @Nullable
+  private static TypeDescriptor<? extends BoundedWindow> getWindowType(
+      TypeDescriptor<?> fnClass, Method method) {
+    Type[] params = method.getGenericParameterTypes();
+    for (int i = 0; i < params.length; i++) {
+      TypeDescriptor<?> paramT = fnClass.resolveType(params[i]);
+      if (BoundedWindow.class.isAssignableFrom(paramT.getRawType())) {
+        return (TypeDescriptor<? extends BoundedWindow>) paramT;
+      }
+    }
+    return null;
+  }
+
   @VisibleForTesting
   static DoFnSignature.BundleMethod analyzeBundleMethod(
       ErrorReporter errors,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9c3e59fa/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
index 127fb4f..ea0bb79 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
@@ -24,6 +24,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.joda.time.Instant;
 
 /**
@@ -159,6 +160,17 @@ public abstract class WindowFn<T, W extends BoundedWindow>
   }
 
   /**
+   * Returns a {@link TypeDescriptor} capturing what is known statically about 
the window type of
+   * this {@link WindowFn} instance's most-derived class.
+   *
+   * <p>In the normal case of a concrete {@link WindowFn} subclass with no 
generic type parameters
+   * of its own (including anonymous inner classes), this will be a complete 
non-generic type.
+   */
+  public TypeDescriptor<W> getWindowTypeDescriptor() {
+    return new TypeDescriptor<W>(this) {};
+  }
+
+  /**
    * {@inheritDoc}
    *
    * <p>By default, does not register any display data. Implementors may 
override this method

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9c3e59fa/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index d3ea9fb..26f5570 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -51,6 +51,8 @@ import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn.OnTimer;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
 import org.apache.beam.sdk.transforms.ParDo.Bound;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
@@ -58,7 +60,12 @@ import 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerSpec;
+import org.apache.beam.sdk.util.TimerSpecs;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 import org.apache.beam.sdk.util.state.StateSpec;
 import org.apache.beam.sdk.util.state.StateSpecs;
@@ -72,6 +79,7 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -1513,6 +1521,59 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
+  public void testRejectsWrongWindowType() {
+    Pipeline p = TestPipeline.create();
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(GlobalWindow.class.getSimpleName());
+    thrown.expectMessage(IntervalWindow.class.getSimpleName());
+    thrown.expectMessage("window type");
+    thrown.expectMessage("not a supertype");
+
+    p.apply(Create.of(1, 2, 3))
+        .apply(
+            ParDo.of(
+                new DoFn<Integer, Integer>() {
+                  @ProcessElement
+                  public void process(ProcessContext c, IntervalWindow w) {}
+                }));
+  }
+
+  /**
+   * Tests that it is OK to use different window types in the parameter lists 
to different
+   * {@link DoFn} functions, as long as they are all subtypes of the actual 
window type
+   * of the input.
+   *
+   * <p>Today, the only method other than {@link ProcessElement 
@ProcessElement} that can accept
+   * extended parameters is {@link OnTimer @OnTimer}, which is rejected before 
it reaches window
+   * type validation. Rather than delay validation, this test is temporarily 
disabled.
+   */
+  @Ignore("ParDo rejects this on account of it using timers")
+  @Test
+  public void testMultipleWindowSubtypesOK() {
+    final String timerId = "gobbledegook";
+
+    Pipeline p = TestPipeline.create();
+
+    p.apply(Create.of(1, 2, 3))
+        
.apply(Window.<Integer>into(FixedWindows.of(Duration.standardSeconds(10))))
+        .apply(
+            ParDo.of(
+                new DoFn<Integer, Integer>() {
+                  @TimerId(timerId)
+                  private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+                  @ProcessElement
+                  public void process(ProcessContext c, IntervalWindow w) {}
+
+                  @OnTimer(timerId)
+                  public void onTimer(BoundedWindow w) {}
+                }));
+
+    // If it doesn't crash, we made it!
+  }
+
+  @Test
   public void testRejectsSplittableDoFnByDefault() {
     // ParDo with a splittable DoFn must be overridden by the runner.
     // Without an override, applying it directly must fail.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9c3e59fa/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 60f82a8..7bdc007 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -44,7 +44,7 @@ import 
org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import 
org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerSpec;
@@ -71,7 +71,7 @@ public class DoFnInvokersTest {
   @Rule public ExpectedException thrown = ExpectedException.none();
 
   @Mock private DoFn<String, String>.ProcessContext mockContext;
-  @Mock private BoundedWindow mockWindow;
+  @Mock private IntervalWindow mockWindow;
   @Mock private DoFn.InputProvider<String> mockInputProvider;
   @Mock private DoFn.OutputReceiver<String> mockOutputReceiver;
   @Mock private WindowingInternals<String, String> mockWindowingInternals;
@@ -173,7 +173,7 @@ public class DoFnInvokersTest {
   public void testDoFnWithWindow() throws Exception {
     class MockFn extends DoFn<String, String> {
       @DoFn.ProcessElement
-      public void processElement(ProcessContext c, BoundedWindow w) throws 
Exception {}
+      public void processElement(ProcessContext c, IntervalWindow w) throws 
Exception {}
     }
     MockFn fn = mock(MockFn.class);
     assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));

Reply via email to