This is an automated email from the ASF dual-hosted git repository.

yichi pushed a commit to branch release-2.39.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.39.0 by this push:
     new 276ac1d4569 Merge pull request #17504: [BEAM-14196] add test verifying 
output watermark propagation in bundle (#17561)
276ac1d4569 is described below

commit 276ac1d45690c7d3f6bb5175b5b483ef6f05ce1b
Author: Yichi Zhang <[email protected]>
AuthorDate: Thu May 5 11:19:33 2022 -0700

    Merge pull request #17504: [BEAM-14196] add test verifying output watermark 
propagation in bundle (#17561)
    
    Co-authored-by: Jan Lukavský <[email protected]>
---
 .../wrappers/streaming/DoFnOperator.java           | 46 ++++++++++++++---
 .../streaming/ExecutableStageDoFnOperator.java     | 12 ++---
 .../beam/runners/flink/FlinkSavepointTest.java     |  2 +-
 .../wrappers/streaming/DoFnOperatorTest.java       | 35 ++++++-------
 .../streaming/ExecutableStageDoFnOperatorTest.java |  3 +-
 .../wrappers/streaming/WindowDoFnOperatorTest.java |  9 ++++
 .../org/apache/beam/sdk/transforms/ParDoTest.java  | 58 +++++++++++++++++++++-
 7 files changed, 129 insertions(+), 36 deletions(-)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index bc82b7be42b..b1016f073d6 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -118,6 +118,8 @@ import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.util.OutputTag;
 import org.checkerframework.checker.nullness.qual.Nullable;
@@ -678,7 +680,7 @@ public class DoFnOperator<InputT, OutputT>
   public final void processElement2(StreamRecord<RawUnionValue> streamRecord) 
throws Exception {
     // we finish the bundle because the newly arrived side-input might
     // make a view available that was previously not ready.
-    // The PushbackSideInputRunner will only reset it's cache of non-ready 
windows when
+    // The PushbackSideInputRunner will only reset its cache of non-ready 
windows when
     // finishing a bundle.
     invokeFinishBundle();
     checkInvokeStartBundle();
@@ -791,6 +793,12 @@ public class DoFnOperator<InputT, OutputT>
         invokeFinishBundle();
       }
 
+      if (bundleStarted) {
+        // do not update watermark in the middle of bundle, because it might 
cause
+        // user-buffered data to be emitted past watermark
+        return;
+      }
+
       LOG.debug("Emitting watermark {}", watermark);
       currentOutputWatermark = watermark;
       output.emitWatermark(new Watermark(watermark));
@@ -867,13 +875,14 @@ public class DoFnOperator<InputT, OutputT>
   @SuppressWarnings("NonAtomicVolatileUpdate")
   @SuppressFBWarnings("VO_VOLATILE_INCREMENT")
   private void checkInvokeFinishBundleByCount() {
-    // We do not access this statement concurrently but we want to make sure 
that each thread
+    // We do not access this statement concurrently, but we want to make sure 
that each thread
     // sees the latest value, which is why we use volatile. See the class 
field section above
     // for more information.
     //noinspection NonAtomicOperationOnVolatileField
     elementCount++;
     if (elementCount >= maxBundleSize) {
       invokeFinishBundle();
+      updateOutputWatermark();
     }
   }
 
@@ -882,6 +891,24 @@ public class DoFnOperator<InputT, OutputT>
     long now = getProcessingTimeService().getCurrentProcessingTime();
     if (now - lastFinishBundleTime >= maxBundleTimeMills) {
       invokeFinishBundle();
+      scheduleForCurrentProcessingTime(ts -> updateOutputWatermark());
+    }
+  }
+
+  @SuppressWarnings("FutureReturnValueIgnored")
+  protected void scheduleForCurrentProcessingTime(ProcessingTimeCallback 
callback) {
+    // We are scheduling a timer for advancing the watermark, to not delay 
finishing the bundle
+    // and temporarily release the checkpoint lock. Otherwise, we could 
potentially loop when a
+    // timer keeps scheduling a timer for the same timestamp.
+    ProcessingTimeService timeService = getProcessingTimeService();
+    timeService.registerTimer(timeService.getCurrentProcessingTime(), 
callback);
+  }
+
+  private void updateOutputWatermark() {
+    try {
+      processInputWatermark(false);
+    } catch (Exception ex) {
+      failBundleFinalization(ex);
     }
   }
 
@@ -910,6 +937,7 @@ public class DoFnOperator<InputT, OutputT>
       while (bundleStarted) {
         invokeFinishBundle();
       }
+      updateOutputWatermark();
     }
   }
 
@@ -942,16 +970,20 @@ public class DoFnOperator<InputT, OutputT>
       }
       outputManager.closeBuffer();
     } catch (Exception e) {
-      // https://jira.apache.org/jira/browse/FLINK-14653
-      // Any regular exception during checkpointing will be tolerated by Flink 
because those
-      // typically do not affect the execution flow. We need to fail hard here 
because errors
-      // in bundle execution are application errors which are not related to 
checkpointing.
-      throw new Error("Checkpointing failed because bundle failed to 
finalize.", e);
+      failBundleFinalization(e);
     }
 
     super.snapshotState(context);
   }
 
+  private void failBundleFinalization(Exception e) {
+    // https://jira.apache.org/jira/browse/FLINK-14653
+    // Any regular exception during checkpointing will be tolerated by Flink 
because those
+    // typically do not affect the execution flow. We need to fail hard here 
because errors
+    // in bundle execution are application errors which are not related to 
checkpointing.
+    throw new Error("Checkpointing failed because bundle failed to finalize.", 
e);
+  }
+
   public BundleFinalizer getBundleFinalizer() {
     return bundleFinalizer;
   }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index 6b6ebbfc729..6de413aa363 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -115,7 +115,6 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -700,7 +699,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> 
extends DoFnOperator<I
   @Override
   public void flushData() throws Exception {
     closed = true;
-    // We might still holding back the watermark and Flink does not trigger 
the timer
+    // We might still hold back the watermark and Flink does not trigger the 
timer
     // callback for watermark advancement anymore.
     processWatermark1(Watermark.MAX_WATERMARK);
     while (getCurrentOutputWatermark() < 
Watermark.MAX_WATERMARK.getTimestamp()) {
@@ -835,7 +834,6 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> 
extends DoFnOperator<I
     inputWatermarkBeforeBundleStart = getEffectiveInputWatermark();
   }
 
-  @SuppressWarnings("FutureReturnValueIgnored")
   private void finishBundleCallback() {
     minEventTimeTimerTimestampInLastBundle = 
minEventTimeTimerTimestampInCurrentBundle;
     minEventTimeTimerTimestampInCurrentBundle = Long.MAX_VALUE;
@@ -843,12 +841,8 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> 
extends DoFnOperator<I
       if (!closed
           && minEventTimeTimerTimestampInLastBundle < Long.MAX_VALUE
           && minEventTimeTimerTimestampInLastBundle <= 
getEffectiveInputWatermark()) {
-        ProcessingTimeService processingTimeService = 
getProcessingTimeService();
-        // We are scheduling a timer for advancing the watermark, to not delay 
finishing the bundle
-        // and temporarily release the checkpoint lock. Otherwise, we could 
potentially loop when a
-        // timer keeps scheduling a timer for the same timestamp.
-        processingTimeService.registerTimer(
-            processingTimeService.getCurrentProcessingTime(),
+
+        scheduleForCurrentProcessingTime(
             ts -> processWatermark1(new 
Watermark(getEffectiveInputWatermark())));
       } else {
         processWatermark1(new Watermark(getEffectiveInputWatermark()));
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
index d957b4e5c13..a5949a6965f 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
@@ -316,7 +316,7 @@ public class FlinkSavepointTest implements Serializable {
                   MapElements.via(
                       new InferableFunction<byte[], KV<String, Void>>() {
                         @Override
-                        public KV<String, Void> apply(byte[] input) throws 
Exception {
+                        public KV<String, Void> apply(byte[] input) {
                           // This only writes data to one of the two initial 
partitions.
                           // We want to test this due to
                           // https://jira.apache.org/jira/browse/BEAM-7144
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
index acfede826e7..3ce0862b38c 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
@@ -27,7 +27,9 @@ import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
 
 import com.fasterxml.jackson.databind.type.TypeFactory;
 import com.fasterxml.jackson.databind.util.LRUMap;
@@ -93,6 +95,7 @@ import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIt
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
@@ -326,12 +329,10 @@ public class DoFnOperatorTest {
 
           @OnTimer(processingTimerId)
           public void onProcessingTime(OnTimerContext context) {
-            assertEquals(
-                // Timestamps in processing timer context are defined to be 
the input watermark
-                // See SimpleDoFnRunner#onTimer
-                "Timer timestamp must match current input watermark",
-                timerTimestamp.plus(Duration.millis(1)),
-                context.timestamp());
+            assertTrue(
+                // Timestamps in processing timer context are defined to be 
the output watermark
+                "Timer timestamp must be at most current input watermark",
+                
!timerTimestamp.plus(Duration.millis(1)).isBefore(context.timestamp()));
             context.outputWithTimestamp(processingTimeMessage, 
context.timestamp());
           }
         };
@@ -422,17 +423,12 @@ public class DoFnOperatorTest {
     // this must fire the processing timer
     testHarness.setProcessingTime(timerTimestamp.getMillis() + 1);
 
-    assertThat(
-        stripStreamRecordFromWindowedValue(testHarness.getOutput()),
-        contains(
-            WindowedValue.of(
-                // Timestamps in processing timer context are defined to be 
the input watermark
-                // See SimpleDoFnRunner#onTimer
-                processingTimeMessage,
-                timerTimestamp.plus(Duration.millis(1)),
-                window1,
-                PaneInfo.NO_FIRING)));
-
+    ArrayList<WindowedValue<?>> outputs =
+        
Lists.newArrayList(stripStreamRecordFromWindowedValue(testHarness.getOutput()));
+    assertEquals(1, outputs.size());
+    assertEquals(processingTimeMessage, outputs.get(0).getValue());
+    
assertFalse(timerTimestamp.plus(Duration.millis(1)).isBefore(outputs.get(0).getTimestamp()));
+    assertEquals(Collections.singletonList(window1), 
outputs.get(0).getWindows());
     testHarness.close();
   }
 
@@ -794,6 +790,11 @@ public class DoFnOperatorTest {
     assertThat(testHarness.numEventTimeTimers(), is(0));
     assertThat(testHarness.numKeyedStateEntries(), is(2));
 
+    // enforce closing of bundle
+    testHarness.setProcessingTime(
+        testHarness.getProcessingTime()
+            + 2 * FlinkPipelineOptions.defaults().getMaxBundleTimeMills());
+
     // Should not trigger garbage collection yet
     testHarness.processWatermark(
         
GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.millis(1)).getMillis());
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
index 78e240931b0..a82b269a525 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
@@ -27,6 +27,7 @@ import static 
org.hamcrest.collection.IsIterableContainingInOrder.contains;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
@@ -537,7 +538,7 @@ public class ExecutableStageDoFnOperatorTest {
     assertThat(testHarness.numEventTimeTimers(), is(3));
     testHarness.setProcessingTime(testHarness.getProcessingTime() + 1);
     assertThat(testHarness.numEventTimeTimers(), is(0));
-    assertThat(operator.getCurrentOutputWatermark(), is(5L));
+    assertTrue(operator.getCurrentOutputWatermark() <= 5L);
 
     // Output watermark is advanced synchronously when the bundle finishes,
     // no more timers are scheduled
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java
index ab45072ab58..3e0f028e1f4 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java
@@ -155,6 +155,10 @@ public class WindowDoFnOperatorTest {
     assertThat(Iterables.size(timerInternals.pendingTimersById.keys()), is(1));
 
     assertThat(testHarness.numKeyedStateEntries(), is(6));
+    // close bundle
+    testHarness.setProcessingTime(
+        testHarness.getProcessingTime()
+            + 2 * FlinkPipelineOptions.defaults().getMaxBundleTimeMills());
     assertThat(windowDoFnOperator.getCurrentOutputWatermark(), is(1L));
 
     // close window
@@ -164,6 +168,11 @@ public class WindowDoFnOperatorTest {
     assertThat(Iterables.size(timerInternals.pendingTimersById.keys()), is(0));
 
     assertThat(testHarness.numKeyedStateEntries(), is(3));
+
+    // close bundle
+    testHarness.setProcessingTime(
+        testHarness.getProcessingTime()
+            + 2 * FlinkPipelineOptions.defaults().getMaxBundleTimeMills());
     assertThat(windowDoFnOperator.getCurrentOutputWatermark(), is(100L));
 
     testHarness.processWatermark(200L);
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 9846f51343b..3207295a755 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -126,6 +126,7 @@ import org.apache.beam.sdk.transforms.Mean.CountSum;
 import org.apache.beam.sdk.transforms.ParDo.SingleOutput;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -6515,7 +6516,6 @@ public class ParDoTest implements Serializable {
               Integer currentValue = MoreObjects.firstNonNull(state.read(), 0);
               // verify state
               assertEquals(1, (int) currentValue);
-              System.err.println("KEY " + key + " VALUE " + currentValue);
               // To check output is received from OnWindowExpiration
               r.output(currentValue);
             }
@@ -6674,4 +6674,60 @@ public class ParDoTest implements Serializable {
           fieldAccessDescriptor.getNestedFieldsAccessed().isEmpty());
     }
   }
+
+  @RunWith(JUnit4.class)
+  public static class BundleInvariantsTests extends SharedTestBase implements 
Serializable {
+
+    @Test
+    @Category({ValidatesRunner.class, UsesUnboundedPCollections.class, 
UsesTestStream.class})
+    public void testWatermarkUpdateMidBundle() {
+      DoFn<String, String> bufferDoFn =
+          new DoFn<String, String>() {
+            private final Set<KV<String, Instant>> buffer = new HashSet<>();
+
+            @ProcessElement
+            public void process(@Element String in, @Timestamp Instant ts) {
+              buffer.add(KV.of(in, ts));
+            }
+
+            @FinishBundle
+            public void finish(FinishBundleContext context) {
+              buffer.forEach(k -> context.output(k.getKey(), k.getValue(), 
GlobalWindow.INSTANCE));
+              buffer.clear();
+            }
+          };
+      int numBundles = 200;
+      TestStream.Builder<String> builder =
+          TestStream.create(StringUtf8Coder.of()).advanceWatermarkTo(new 
Instant(0));
+      List<List<TimestampedValue<String>>> bundles =
+          IntStream.range(0, numBundles)
+              .mapToObj(
+                  r ->
+                      IntStream.range(0, r + 1)
+                          .mapToObj(v -> 
TimestampedValue.of(String.valueOf(v), new Instant(r)))
+                          .collect(Collectors.toList()))
+              .collect(Collectors.toList());
+      for (List<TimestampedValue<String>> b : bundles) {
+        builder =
+            builder
+                .addElements(b.get(0), b.subList(1, b.size()).toArray(new 
TimestampedValue[] {}))
+                .advanceWatermarkTo(new Instant(b.size()));
+      }
+      PCollection<Long> result =
+          pipeline
+              .apply(builder.advanceWatermarkToInfinity())
+              .apply(ParDo.of(bufferDoFn))
+              .apply("milliWindow", 
Window.into(FixedWindows.of(Duration.millis(1))))
+              .apply("count", 
Combine.globally(Count.<String>combineFn()).withoutDefaults())
+              .apply(
+                  "globalWindow",
+                  Window.<Long>into(new GlobalWindows())
+                      .triggering(AfterWatermark.pastEndOfWindow())
+                      .withAllowedLateness(Duration.ZERO)
+                      .discardingFiredPanes())
+              .apply("sum", Sum.longsGlobally());
+      PAssert.that(result).containsInAnyOrder((numBundles * numBundles + 
numBundles) / 2L);
+      pipeline.run();
+    }
+  }
 }

Reply via email to