This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new bea8849  Merge pull request #15994: [BEAM-13263] Support 
OnWindowExpiration in (non-portable) Flink runner
bea8849 is described below

commit bea88490629e458175fed5ebfe5da6d74794375b
Author: reuvenlax <[email protected]>
AuthorDate: Tue Dec 14 08:14:21 2021 -0800

    Merge pull request #15994: [BEAM-13263] Support OnWindowExpiration in 
(non-portable) Flink runner
---
 .../org/apache/beam/runners/core/DoFnRunners.java  |  2 -
 .../apache/beam/runners/core/ProcessFnRunner.java  |  5 ++
 .../runners/core/PushbackSideInputDoFnRunner.java  |  3 +
 .../core/SimplePushbackSideInputDoFnRunner.java    |  5 ++
 .../beam/runners/core/StatefulDoFnRunner.java      |  3 +-
 runners/flink/flink_runner.gradle                  |  1 -
 .../functions/FlinkStatefulDoFnFunction.java       | 21 +++++
 .../wrappers/streaming/DoFnOperator.java           | 10 ++-
 .../org/apache/beam/sdk/transforms/ParDoTest.java  | 95 +++++++++++++++-------
 9 files changed, 111 insertions(+), 34 deletions(-)

diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index 8a16d48..c2ae8cf 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -112,7 +112,6 @@ public class DoFnRunners {
           WindowingStrategy<?, ?> windowingStrategy,
           CleanupTimer<InputT> cleanupTimer,
           StateCleaner<W> stateCleaner) {
-
     return defaultStatefulDoFnRunner(
         fn,
         inputCoder,
@@ -144,7 +143,6 @@ public class DoFnRunners {
           CleanupTimer<InputT> cleanupTimer,
           StateCleaner<W> stateCleaner,
           boolean requiresTimeSortedInputSupported) {
-
     boolean doFnRequiresTimeSortedInput =
         DoFnSignatures.signatureForDoFn(doFnRunner.getFn())
             .processElement()
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
index 4b6ed65..0aa61eb 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
@@ -94,6 +94,11 @@ public class ProcessFnRunner<InputT, OutputT, RestrictionT>
     throw new UnsupportedOperationException("User timers unsupported in 
ProcessFn");
   }
 
+  @Override
+  public <KeyT> void onWindowExpiration(BoundedWindow window, Instant 
outputTimestamp, KeyT key) {
+    throw new UnsupportedOperationException("OnWindowExpiration unsupported in 
ProcessFn");
+  }
+
   private static <T> void checkTrivialOuterWindows(
       WindowedValue<KeyedWorkItem<byte[], T>> windowedKWI) {
     // In practice it will be in 0 or 1 windows (ValueInEmptyWindows or 
ValueInGlobalWindow)
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
index 1fd9eb2..1b2d90e 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
@@ -52,6 +52,9 @@ public interface PushbackSideInputDoFnRunner<InputT, OutputT> 
{
       Instant outputTimestamp,
       TimeDomain timeDomain);
 
+  /** Calls the underlying {@link DoFn.OnWindowExpiration} method. */
+  <KeyT> void onWindowExpiration(BoundedWindow window, Instant 
outputTimestamp, KeyT key);
+
   /** Calls the underlying {@link DoFn.FinishBundle} method. */
   void finishBundle();
 
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
index 5004337..31cfa6b 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
@@ -120,6 +120,11 @@ public class SimplePushbackSideInputDoFnRunner<InputT, 
OutputT>
   }
 
   @Override
+  public <KeyT> void onWindowExpiration(BoundedWindow window, Instant 
outputTimestamp, KeyT key) {
+    underlying.onWindowExpiration(window, outputTimestamp, key);
+  }
+
+  @Override
   public void finishBundle() {
     notReadyWindows = null;
     underlying.finishBundle();
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
index 6e4f1c3..39eac1d 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
@@ -141,7 +141,6 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends 
BoundedWindow>
 
   @Override
   public void processElement(WindowedValue<InputT> input) {
-
     // StatefulDoFnRunner always observes windows, so we need to explode
     for (WindowedValue<InputT> value : input.explodeWindows()) {
       BoundedWindow window = value.getWindows().iterator().next();
@@ -220,7 +219,7 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends 
BoundedWindow>
       if (requiresTimeSortedInput) {
         onSortFlushTimer(window, BoundedWindow.TIMESTAMP_MAX_VALUE);
       }
-      doFnRunner.onWindowExpiration(window, outputTimestamp, key);
+      onWindowExpiration(window, outputTimestamp, key);
       stateCleaner.clearForWindow(window);
     } else {
       // An event-time timer can never be late because we don't allow setting 
timers after GC time.
diff --git a/runners/flink/flink_runner.gradle 
b/runners/flink/flink_runner.gradle
index f400974..97d59a4 100644
--- a/runners/flink/flink_runner.gradle
+++ b/runners/flink/flink_runner.gradle
@@ -224,7 +224,6 @@ def createValidatesRunnerTask(Map m) {
       excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
       excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
       excludeCategories 'org.apache.beam.sdk.testing.UsesSystemMetrics'
-      excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
       excludeCategories 'org.apache.beam.sdk.testing.UsesStrictTimerOrdering'
       excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'
       excludeCategories 'org.apache.beam.sdk.testing.UsesLoopingTimer'
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
index d007ce1..c03b27a 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
@@ -19,10 +19,12 @@ package org.apache.beam.runners.flink.translation.functions;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.InMemoryStateInternals;
@@ -45,6 +47,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
@@ -56,6 +59,7 @@ import 
org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 
 /** A {@link RichGroupReduceFunction} for stateful {@link ParDo} in Flink 
Batch Runner. */
@@ -67,6 +71,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
     extends RichGroupReduceFunction<WindowedValue<KV<K, V>>, 
WindowedValue<RawUnionValue>> {
 
   private final DoFn<KV<K, V>, OutputT> dofn;
+  private final boolean usesOnWindowExpiration;
   private String stepName;
   private final WindowingStrategy<?, ?> windowingStrategy;
   private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
@@ -95,6 +100,8 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
       Map<String, PCollectionView<?>> sideInputMapping) {
 
     this.dofn = dofn;
+    this.usesOnWindowExpiration =
+        DoFnSignatures.signatureForDoFn(dofn).onWindowExpiration() != null;
     this.stepName = stepName;
     this.windowingStrategy = windowingStrategy;
     this.sideInputs = sideInputs;
@@ -137,6 +144,8 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
     timerInternals.advanceProcessingTime(Instant.now());
     timerInternals.advanceSynchronizedProcessingTime(Instant.now());
 
+    final Set<BoundedWindow> windowsSeen = new HashSet<>();
+
     List<TupleTag<?>> additionalOutputTags = 
Lists.newArrayList(outputMap.keySet());
 
     DoFnRunner<KV<K, V>, OutputT> doFnRunner =
@@ -172,8 +181,14 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
     doFnRunner.startBundle();
 
     doFnRunner.processElement(currentValue);
+    if (usesOnWindowExpiration) {
+      windowsSeen.addAll(currentValue.getWindows());
+    }
     while (iterator.hasNext()) {
       currentValue = iterator.next();
+      if (usesOnWindowExpiration) {
+        windowsSeen.addAll(currentValue.getWindows());
+      }
       doFnRunner.processElement(currentValue);
     }
 
@@ -186,6 +201,12 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
 
     fireEligibleTimers(key, timerInternals, doFnRunner);
 
+    if (usesOnWindowExpiration) {
+      for (BoundedWindow window : windowsSeen) {
+        doFnRunner.onWindowExpiration(window, 
window.maxTimestamp().minus(Duration.millis(1)), key);
+      }
+    }
+
     doFnRunner.finishBundle();
   }
 
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 4f4f6c1..f78f621 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -199,6 +199,8 @@ public class DoFnOperator<InputT, OutputT>
   /** If true, we must process elements only after a checkpoint is finished. */
   private final boolean requiresStableInput;
 
+  private final boolean usesOnWindowExpiration;
+
   private final boolean finishBundleBeforeCheckpointing;
 
   /** Stores new finalizations being gathered. */
@@ -303,6 +305,8 @@ public class DoFnOperator<InputT, OutputT>
         // WindowDoFnOperator does not use a DoFn
         doFn != null
             && 
DoFnSignatures.getSignature(doFn.getClass()).processElement().requiresStableInput();
+    this.usesOnWindowExpiration =
+        doFn != null && 
DoFnSignatures.getSignature(doFn.getClass()).onWindowExpiration() != null;
 
     if (requiresStableInput) {
       Preconditions.checkState(
@@ -340,11 +344,12 @@ public class DoFnOperator<InputT, OutputT>
               timerInternals, windowingStrategy) {
             @Override
             public void setForWindow(InputT input, BoundedWindow window) {
-              if (!window.equals(GlobalWindow.INSTANCE)) {
+              if (!window.equals(GlobalWindow.INSTANCE) || 
usesOnWindowExpiration) {
                 // Skip setting a cleanup timer for the global window as these 
timers
                 // lead to potentially unbounded state growth in the runner, 
depending on key
                 // cardinality. Cleanup for global window will be performed 
upon arrival of the
                 // final watermark.
+                // In the case of OnWindowExpiration, we still set the timer.
                 super.setForWindow(input, window);
               }
             }
@@ -796,11 +801,14 @@ public class DoFnOperator<InputT, OutputT>
       if (watermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
         invokeFinishBundle();
       }
+
       LOG.debug("Emitting watermark {}", watermark);
       currentOutputWatermark = watermark;
       output.emitWatermark(new Watermark(watermark));
 
       // Check if the final watermark was triggered to perform state cleanup 
for global window
+      // TODO: Do we need to do this when OnWindowExpiration is set, since in 
that case we have a
+      // cleanup timer?
       if (keyedStateInternals != null
           && currentOutputWatermark
               > 
adjustTimestampForFlink(GlobalWindow.INSTANCE.maxTimestamp().getMillis())) {
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 9473318..9018987 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -76,6 +76,7 @@ import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.state.BagState;
@@ -1769,14 +1770,12 @@ public class ParDoTest implements Serializable {
                             @Timestamp Instant timestamp,
                             OutputReceiver<String> r) {
                           r.output(element);
-                          System.out.println("Process: " + element + ":" + 
timestamp.getMillis());
                         }
 
                         @FinishBundle
                         public void finishBundle(FinishBundleContext c) {
                           Instant ts = new Instant(3);
                           c.output("finish", ts, windowFn.assignWindow(ts));
-                          System.out.println("Finish: 3");
                         }
                       }))
               .apply(ParDo.of(new PrintingDoFn()));
@@ -2546,13 +2545,23 @@ public class ParDoTest implements Serializable {
     }
 
     @Test
-    @Category({ValidatesRunner.class, UsesStatefulParDo.class, 
UsesOrderedListState.class})
+    @Category({
+      ValidatesRunner.class,
+      UsesStatefulParDo.class,
+      UsesOrderedListState.class,
+      UsesOnWindowExpiration.class
+    })
     public void testOrderedListStateBounded() {
       testOrderedListStateImpl(false);
     }
 
     @Test
-    @Category({ValidatesRunner.class, UsesStatefulParDo.class, 
UsesOrderedListState.class})
+    @Category({
+      ValidatesRunner.class,
+      UsesStatefulParDo.class,
+      UsesOrderedListState.class,
+      UsesOnWindowExpiration.class
+    })
     public void testOrderedListStateUnbounded() {
       testOrderedListStateImpl(true);
     }
@@ -5730,8 +5739,6 @@ public class ParDoTest implements Serializable {
                 @Timestamp Instant ts,
                 @TimerFamily(timerFamilyId) TimerMap timerMap,
                 OutputReceiver<String> r) {
-              System.out.println("timer Id : " + timerId);
-              System.out.println("timerMap : " + timerMap.toString());
               r.output(timerId);
             }
           };
@@ -6059,7 +6066,18 @@ public class ParDoTest implements Serializable {
       UsesOnWindowExpiration.class
     })
     public void testOnWindowExpirationSimpleBounded() {
-      runOnWindowExpirationSimple(false);
+      runOnWindowExpirationSimple(false, false);
+    }
+
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesStatefulParDo.class,
+      UsesTimersInParDo.class,
+      UsesOnWindowExpiration.class
+    })
+    public void testOnWindowExpirationSimpleBoundedGlobal() {
+      runOnWindowExpirationSimple(false, true);
     }
 
     @Test
@@ -6071,10 +6089,22 @@ public class ParDoTest implements Serializable {
       UsesUnboundedPCollections.class
     })
     public void testOnWindowExpirationSimpleUnbounded() {
-      runOnWindowExpirationSimple(true);
+      runOnWindowExpirationSimple(true, false);
+    }
+
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesStatefulParDo.class,
+      UsesTimersInParDo.class,
+      UsesOnWindowExpiration.class,
+      UsesUnboundedPCollections.class
+    })
+    public void testOnWindowExpirationSimpleUnboundedGlobal() {
+      runOnWindowExpirationSimple(true, true);
     }
 
-    public void runOnWindowExpirationSimple(boolean useStreaming) {
+    public void runOnWindowExpirationSimple(boolean useStreaming, boolean 
globalWindow) {
       final String stateId = "foo";
       final String timerId = "bar";
       IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new 
Instant(10));
@@ -6112,31 +6142,40 @@ public class ParDoTest implements Serializable {
               Integer currentValue = MoreObjects.firstNonNull(state.read(), 0);
               // verify state
               assertEquals(1, (int) currentValue);
+              System.err.println("KEY " + key + " VALUE " + currentValue);
               // To check output is received from OnWindowExpiration
               r.output(currentValue);
             }
           };
 
-      PCollection<Integer> output =
-          pipeline
-              .apply(
-                  Create.timestamped(
-                      // first window
-                      TimestampedValue.of(KV.of("hello", 7), new Instant(3)),
-
-                      // second window
-                      TimestampedValue.of(KV.of("hi", 35), new Instant(13))))
-              .apply(Window.into(FixedWindows.of(Duration.millis(10))))
-              .setIsBoundedInternal(useStreaming ? IsBounded.UNBOUNDED : 
IsBounded.BOUNDED)
-              .apply(ParDo.of(fn));
+      if (useStreaming) {
+        pipeline.getOptions().as(StreamingOptions.class).setStreaming(true);
+      }
 
-      PAssert.that(output)
-          .inWindow(firstWindow)
-          // verify output
-          .containsInAnyOrder(1)
-          .inWindow(secondWindow)
-          // verify output
-          .containsInAnyOrder(1);
+      PCollection<KV<String, Integer>> intermediate =
+          pipeline.apply(
+              Create.timestamped(
+                  // first window
+                  TimestampedValue.of(KV.of("hello", 7), new Instant(3)),
+
+                  // second window
+                  TimestampedValue.of(KV.of("hi", 35), new Instant(13))));
+      if (!globalWindow) {
+        intermediate = 
intermediate.apply(Window.into(FixedWindows.of(Duration.millis(10))));
+      }
+      PCollection<Integer> output = intermediate.apply(ParDo.of(fn));
+
+      if (!globalWindow) {
+        PAssert.that(output)
+            .inWindow(firstWindow)
+            // verify output
+            .containsInAnyOrder(1)
+            .inWindow(secondWindow)
+            // verify output
+            .containsInAnyOrder(1);
+      } else {
+        
PAssert.that(output).inWindow(GlobalWindow.INSTANCE).containsInAnyOrder(1, 1);
+      }
       pipeline.run();
     }
   }

Reply via email to