This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch release-2.27.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.27.0 by this push:
     new bc90cc8  [BEAM-11481] emit output watermark on watermark hold change
     new 3b0efb5  Merge pull request #13582 from [BEAM-11481] emit output 
watermark on watermark hold change
bc90cc8 is described below

commit bc90cc881d06d57b48a145135c94f4d3f2c54560
Author: Jan Lukavsky <[email protected]>
AuthorDate: Thu Dec 17 11:43:10 2020 +0100

    [BEAM-11481] emit output watermark on watermark hold change
---
 .../wrappers/streaming/DoFnOperator.java           |  23 ++-
 .../streaming/KeyedPushedBackElementsHandler.java  |   5 +-
 .../streaming/state/FlinkStateInternals.java       |   4 +-
 .../wrappers/streaming/DoFnOperatorTest.java       | 169 ++++++++++++++++++++-
 4 files changed, 194 insertions(+), 7 deletions(-)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index fcea37b..9c1e437 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -641,8 +641,10 @@ public class DoFnOperator<InputT, OutputT>
   @Override
   public final void processElement(StreamRecord<WindowedValue<InputT>> 
streamRecord) {
     checkInvokeStartBundle();
+    long oldHold = keyCoder != null ? keyedStateInternals.minWatermarkHoldMs() 
: -1L;
     doFnRunner.processElement(streamRecord.getValue());
     checkInvokeFinishBundleByCount();
+    emitWatermarkIfHoldChanged(oldHold);
   }
 
   @Override
@@ -739,9 +741,12 @@ public class DoFnOperator<InputT, OutputT>
     }
 
     currentInputWatermark = mark.getTimestamp();
+    processInputWatermark(true);
+  }
 
+  private void processInputWatermark(boolean advanceInputWatermark) throws 
Exception {
     long inputWatermarkHold = 
applyInputWatermarkHold(getEffectiveInputWatermark());
-    if (keyCoder != null) {
+    if (keyCoder != null && advanceInputWatermark) {
       timeServiceManagerCompat.advanceWatermark(new 
Watermark(inputWatermarkHold));
     }
 
@@ -992,7 +997,23 @@ public class DoFnOperator<InputT, OutputT>
 
   // allow overriding this in ExecutableStageDoFnOperator to set the key 
context
   protected void fireTimerInternal(ByteBuffer key, TimerData timerData) {
+    long oldHold = keyCoder != null ? keyedStateInternals.minWatermarkHoldMs() 
: -1L;
     fireTimer(timerData);
+    emitWatermarkIfHoldChanged(oldHold);
+  }
+
+  void emitWatermarkIfHoldChanged(long currentWatermarkHold) {
+    if (keyCoder != null) {
+      long newWatermarkHold = keyedStateInternals.minWatermarkHoldMs();
+      if (newWatermarkHold > currentWatermarkHold) {
+        try {
+          processInputWatermark(false);
+        } catch (Exception ex) {
+          // should not happen
+          throw new IllegalStateException(ex);
+        }
+      }
+    }
   }
 
   // allow overriding this in WindowDoFnOperator
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KeyedPushedBackElementsHandler.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KeyedPushedBackElementsHandler.java
index 82229d9..e51f30b 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KeyedPushedBackElementsHandler.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KeyedPushedBackElementsHandler.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
 import java.util.List;
+import java.util.Objects;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
@@ -53,8 +54,8 @@ class KeyedPushedBackElementsHandler<K, T> implements 
PushedBackElementsHandler<
       KeyedStateBackend<K> backend,
       ListStateDescriptor<T> stateDescriptor)
       throws Exception {
-    this.keySelector = keySelector;
-    this.backend = backend;
+    this.keySelector = Objects.requireNonNull(keySelector);
+    this.backend = Objects.requireNonNull(backend);
     this.stateName = stateDescriptor.getName();
     // Eagerly retrieve the state to work around 
https://jira.apache.org/jira/browse/FLINK-12653
     this.state =
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
index f7b65a9..aa9c614 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -155,8 +155,8 @@ public class FlinkStateInternals<K> implements 
StateInternals {
       Coder<K> keyCoder,
       SerializablePipelineOptions pipelineOptions)
       throws Exception {
-    this.flinkStateBackend = flinkStateBackend;
-    this.keyCoder = keyCoder;
+    this.flinkStateBackend = Objects.requireNonNull(flinkStateBackend);
+    this.keyCoder = Objects.requireNonNull(keyCoder);
     watermarkHoldStateDescriptor =
         new MapStateDescriptor<>(
             "watermark-holds",
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
index 234202c..0045178 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
@@ -21,6 +21,7 @@ import static 
org.apache.beam.runners.flink.translation.wrappers.streaming.Strea
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
@@ -31,6 +32,8 @@ import static org.junit.Assert.assertThrows;
 import com.fasterxml.jackson.databind.type.TypeFactory;
 import com.fasterxml.jackson.databind.util.LRUMap;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -38,7 +41,14 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.runners.core.StatefulDoFnRunner;
+import org.apache.beam.runners.core.StepContext;
+import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
@@ -57,6 +67,7 @@ import org.apache.beam.sdk.state.Timer;
 import org.apache.beam.sdk.state.TimerSpec;
 import org.apache.beam.sdk.state.TimerSpecs;
 import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.state.WatermarkHoldState;
 import org.apache.beam.sdk.testing.PCollectionViewTesting;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -68,6 +79,7 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
@@ -251,8 +263,8 @@ public class DoFnOperatorTest {
    * timestamp {@code <= T} in the future. We have to make sure to take this 
into account when
    * firing timers.
    *
-   * <p>This not test the timer API in general or processing-time timers 
because there are generic
-   * tests for this in {@code ParDoTest}.
+   * <p>This does not test the timer API in general or processing-time timers 
because there are
+   * generic tests for this in {@code ParDoTest}.
    */
   @Test
   public void testWatermarkContract() throws Exception {
@@ -419,6 +431,159 @@ public class DoFnOperatorTest {
   }
 
   @Test
+  public void testWatermarkUpdateAfterWatermarkHoldRelease() throws Exception {
+
+    Coder<WindowedValue<KV<String, String>>> coder =
+        WindowedValue.getValueOnlyCoder(KvCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of()));
+
+    TupleTag<KV<String, String>> outputTag = new TupleTag<>("main-output");
+    List<Long> emittedWatermarkHolds = new ArrayList<>();
+    KeySelector<WindowedValue<KV<String, String>>, ByteBuffer> keySelector =
+        e -> FlinkKeyUtils.encodeKey(e.getValue().getKey(), 
StringUtf8Coder.of());
+
+    DoFnOperator<KV<String, String>, KV<String, String>> doFnOperator =
+        new DoFnOperator<KV<String, String>, KV<String, String>>(
+            new IdentityDoFn<>(),
+            "stepName",
+            coder,
+            Collections.emptyMap(),
+            outputTag,
+            Collections.emptyList(),
+            new DoFnOperator.MultiOutputOutputManagerFactory<>(
+                outputTag, coder, new 
SerializablePipelineOptions(FlinkPipelineOptions.defaults())),
+            WindowingStrategy.globalDefault(),
+            new HashMap<>(), /* side-input mapping */
+            Collections.emptyList(), /* side inputs */
+            FlinkPipelineOptions.defaults(),
+            StringUtf8Coder.of(),
+            keySelector,
+            DoFnSchemaInformation.create(),
+            Collections.emptyMap()) {
+
+          @Override
+          protected DoFnRunner<KV<String, String>, KV<String, String>> 
createWrappingDoFnRunner(
+              DoFnRunner<KV<String, String>, KV<String, String>> wrappedRunner,
+              StepContext stepContext) {
+
+            StateNamespace namespace =
+                StateNamespaces.window(GlobalWindow.Coder.INSTANCE, 
GlobalWindow.INSTANCE);
+            StateTag<WatermarkHoldState> holdTag =
+                StateTags.watermarkStateInternal("hold", 
TimestampCombiner.LATEST);
+            WatermarkHoldState holdState = 
stepContext.stateInternals().state(namespace, holdTag);
+            TimerInternals timerInternals = stepContext.timerInternals();
+
+            return new DoFnRunner<KV<String, String>, KV<String, String>>() {
+
+              @Override
+              public void startBundle() {
+                wrappedRunner.startBundle();
+              }
+
+              @Override
+              public void processElement(WindowedValue<KV<String, String>> 
elem) {
+                wrappedRunner.processElement(elem);
+                holdState.add(elem.getTimestamp());
+                timerInternals.setTimer(
+                    namespace,
+                    "timer",
+                    "family",
+                    elem.getTimestamp().plus(1),
+                    elem.getTimestamp().plus(1),
+                    TimeDomain.EVENT_TIME);
+                timerInternals.setTimer(
+                    namespace,
+                    "cleanup",
+                    "",
+                    GlobalWindow.INSTANCE.maxTimestamp(),
+                    GlobalWindow.INSTANCE.maxTimestamp(),
+                    TimeDomain.EVENT_TIME);
+              }
+
+              @Override
+              public <KeyT> void onTimer(
+                  String timerId,
+                  String timerFamilyId,
+                  KeyT key,
+                  BoundedWindow window,
+                  Instant timestamp,
+                  Instant outputTimestamp,
+                  TimeDomain timeDomain) {
+
+                if ("cleanup".equals(timerId)) {
+                  holdState.clear();
+                } else {
+                  holdState.add(outputTimestamp);
+                }
+              }
+
+              @Override
+              public void finishBundle() {
+                wrappedRunner.finishBundle();
+              }
+
+              @Override
+              public <KeyT> void onWindowExpiration(
+                  BoundedWindow window, Instant timestamp, KeyT key) {
+                wrappedRunner.onWindowExpiration(window, timestamp, key);
+              }
+
+              @Override
+              public DoFn<KV<String, String>, KV<String, String>> getFn() {
+                return doFn;
+              }
+            };
+          }
+
+          @Override
+          void emitWatermarkIfHoldChanged(long currentWatermarkHold) {
+            
emittedWatermarkHolds.add(keyedStateInternals.minWatermarkHoldMs());
+          }
+        };
+
+    OneInputStreamOperatorTestHarness<
+            WindowedValue<KV<String, String>>, WindowedValue<KV<String, 
String>>>
+        testHarness =
+            new KeyedOneInputStreamOperatorTestHarness<>(
+                doFnOperator,
+                keySelector,
+                new CoderTypeInformation<>(
+                    FlinkKeyUtils.ByteBufferCoder.of(), 
FlinkPipelineOptions.defaults()));
+
+    testHarness.setup();
+
+    Instant now = Instant.now();
+
+    testHarness.open();
+
+    // process first element, set hold to `now', setup timer for `now + 1'
+    testHarness.processElement(
+        new StreamRecord<>(
+            WindowedValue.timestampedValueInGlobalWindow(KV.of("Key", 
"Hello"), now)));
+
+    assertThat(emittedWatermarkHolds, 
is(equalTo(Collections.singletonList(now.getMillis()))));
+
+    // fire timer, change hold to `now + 2'
+    testHarness.processWatermark(now.getMillis() + 2);
+
+    assertThat(
+        emittedWatermarkHolds, is(equalTo(Arrays.asList(now.getMillis(), 
now.getMillis() + 1))));
+
+    // process second element, verify we emitted changed hold
+    testHarness.processElement(
+        new StreamRecord<>(
+            WindowedValue.timestampedValueInGlobalWindow(KV.of("Key", 
"Hello"), now.plus(2))));
+
+    assertThat(
+        emittedWatermarkHolds,
+        is(equalTo(Arrays.asList(now.getMillis(), now.getMillis() + 1, 
now.getMillis() + 2))));
+
+    
testHarness.processWatermark(GlobalWindow.INSTANCE.maxTimestamp().plus(1).getMillis());
+    
testHarness.processWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+
+    testHarness.close();
+  }
+
+  @Test
   public void testLateDroppingForStatefulFn() throws Exception {
 
     WindowingStrategy<Object, IntervalWindow> windowingStrategy =

Reply via email to