This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch release-2.27.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.27.0 by this push:
new bc90cc8 [BEAM-11481] emit output watermark on watermark hold change
new 3b0efb5 Merge pull request #13582 from [BEAM-11481] emit output
watermark on watermark hold change
bc90cc8 is described below
commit bc90cc881d06d57b48a145135c94f4d3f2c54560
Author: Jan Lukavsky <[email protected]>
AuthorDate: Thu Dec 17 11:43:10 2020 +0100
[BEAM-11481] emit output watermark on watermark hold change
---
.../wrappers/streaming/DoFnOperator.java | 23 ++-
.../streaming/KeyedPushedBackElementsHandler.java | 5 +-
.../streaming/state/FlinkStateInternals.java | 4 +-
.../wrappers/streaming/DoFnOperatorTest.java | 169 ++++++++++++++++++++-
4 files changed, 194 insertions(+), 7 deletions(-)
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index fcea37b..9c1e437 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -641,8 +641,10 @@ public class DoFnOperator<InputT, OutputT>
@Override
public final void processElement(StreamRecord<WindowedValue<InputT>>
streamRecord) {
checkInvokeStartBundle();
+ long oldHold = keyCoder != null ? keyedStateInternals.minWatermarkHoldMs()
: -1L;
doFnRunner.processElement(streamRecord.getValue());
checkInvokeFinishBundleByCount();
+ emitWatermarkIfHoldChanged(oldHold);
}
@Override
@@ -739,9 +741,12 @@ public class DoFnOperator<InputT, OutputT>
}
currentInputWatermark = mark.getTimestamp();
+ processInputWatermark(true);
+ }
+ private void processInputWatermark(boolean advanceInputWatermark) throws
Exception {
long inputWatermarkHold =
applyInputWatermarkHold(getEffectiveInputWatermark());
- if (keyCoder != null) {
+ if (keyCoder != null && advanceInputWatermark) {
timeServiceManagerCompat.advanceWatermark(new
Watermark(inputWatermarkHold));
}
@@ -992,7 +997,23 @@ public class DoFnOperator<InputT, OutputT>
// allow overriding this in ExecutableStageDoFnOperator to set the key
context
protected void fireTimerInternal(ByteBuffer key, TimerData timerData) {
+ long oldHold = keyCoder != null ? keyedStateInternals.minWatermarkHoldMs()
: -1L;
fireTimer(timerData);
+ emitWatermarkIfHoldChanged(oldHold);
+ }
+
+ void emitWatermarkIfHoldChanged(long currentWatermarkHold) {
+ if (keyCoder != null) {
+ long newWatermarkHold = keyedStateInternals.minWatermarkHoldMs();
+ if (newWatermarkHold > currentWatermarkHold) {
+ try {
+ processInputWatermark(false);
+ } catch (Exception ex) {
+ // should not happen
+ throw new IllegalStateException(ex);
+ }
+ }
+ }
}
// allow overriding this in WindowDoFnOperator
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KeyedPushedBackElementsHandler.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KeyedPushedBackElementsHandler.java
index 82229d9..e51f30b 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KeyedPushedBackElementsHandler.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KeyedPushedBackElementsHandler.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.flink.translation.wrappers.streaming;
import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
@@ -53,8 +54,8 @@ class KeyedPushedBackElementsHandler<K, T> implements
PushedBackElementsHandler<
KeyedStateBackend<K> backend,
ListStateDescriptor<T> stateDescriptor)
throws Exception {
- this.keySelector = keySelector;
- this.backend = backend;
+ this.keySelector = Objects.requireNonNull(keySelector);
+ this.backend = Objects.requireNonNull(backend);
this.stateName = stateDescriptor.getName();
// Eagerly retrieve the state to work around
https://jira.apache.org/jira/browse/FLINK-12653
this.state =
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
index f7b65a9..aa9c614 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -155,8 +155,8 @@ public class FlinkStateInternals<K> implements
StateInternals {
Coder<K> keyCoder,
SerializablePipelineOptions pipelineOptions)
throws Exception {
- this.flinkStateBackend = flinkStateBackend;
- this.keyCoder = keyCoder;
+ this.flinkStateBackend = Objects.requireNonNull(flinkStateBackend);
+ this.keyCoder = Objects.requireNonNull(keyCoder);
watermarkHoldStateDescriptor =
new MapStateDescriptor<>(
"watermark-holds",
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
index 234202c..0045178 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
@@ -21,6 +21,7 @@ import static
org.apache.beam.runners.flink.translation.wrappers.streaming.Strea
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
@@ -31,6 +32,8 @@ import static org.junit.Assert.assertThrows;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.fasterxml.jackson.databind.util.LRUMap;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -38,7 +41,14 @@ import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.StatefulDoFnRunner;
+import org.apache.beam.runners.core.StepContext;
+import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
@@ -57,6 +67,7 @@ import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.testing.PCollectionViewTesting;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
@@ -68,6 +79,7 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
@@ -251,8 +263,8 @@ public class DoFnOperatorTest {
* timestamp {@code <= T} in the future. We have to make sure to take this
into account when
* firing timers.
*
- * <p>This not test the timer API in general or processing-time timers
because there are generic
- * tests for this in {@code ParDoTest}.
+ * <p>This does not test the timer API in general or processing-time timers
because there are
+ * generic tests for this in {@code ParDoTest}.
*/
@Test
public void testWatermarkContract() throws Exception {
@@ -419,6 +431,159 @@ public class DoFnOperatorTest {
}
@Test
+ public void testWatermarkUpdateAfterWatermarkHoldRelease() throws Exception {
+
+ Coder<WindowedValue<KV<String, String>>> coder =
+ WindowedValue.getValueOnlyCoder(KvCoder.of(StringUtf8Coder.of(),
StringUtf8Coder.of()));
+
+ TupleTag<KV<String, String>> outputTag = new TupleTag<>("main-output");
+ List<Long> emittedWatermarkHolds = new ArrayList<>();
+ KeySelector<WindowedValue<KV<String, String>>, ByteBuffer> keySelector =
+ e -> FlinkKeyUtils.encodeKey(e.getValue().getKey(),
StringUtf8Coder.of());
+
+ DoFnOperator<KV<String, String>, KV<String, String>> doFnOperator =
+ new DoFnOperator<KV<String, String>, KV<String, String>>(
+ new IdentityDoFn<>(),
+ "stepName",
+ coder,
+ Collections.emptyMap(),
+ outputTag,
+ Collections.emptyList(),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(
+ outputTag, coder, new
SerializablePipelineOptions(FlinkPipelineOptions.defaults())),
+ WindowingStrategy.globalDefault(),
+ new HashMap<>(), /* side-input mapping */
+ Collections.emptyList(), /* side inputs */
+ FlinkPipelineOptions.defaults(),
+ StringUtf8Coder.of(),
+ keySelector,
+ DoFnSchemaInformation.create(),
+ Collections.emptyMap()) {
+
+ @Override
+ protected DoFnRunner<KV<String, String>, KV<String, String>>
createWrappingDoFnRunner(
+ DoFnRunner<KV<String, String>, KV<String, String>> wrappedRunner,
+ StepContext stepContext) {
+
+ StateNamespace namespace =
+ StateNamespaces.window(GlobalWindow.Coder.INSTANCE,
GlobalWindow.INSTANCE);
+ StateTag<WatermarkHoldState> holdTag =
+ StateTags.watermarkStateInternal("hold",
TimestampCombiner.LATEST);
+ WatermarkHoldState holdState =
stepContext.stateInternals().state(namespace, holdTag);
+ TimerInternals timerInternals = stepContext.timerInternals();
+
+ return new DoFnRunner<KV<String, String>, KV<String, String>>() {
+
+ @Override
+ public void startBundle() {
+ wrappedRunner.startBundle();
+ }
+
+ @Override
+ public void processElement(WindowedValue<KV<String, String>>
elem) {
+ wrappedRunner.processElement(elem);
+ holdState.add(elem.getTimestamp());
+ timerInternals.setTimer(
+ namespace,
+ "timer",
+ "family",
+ elem.getTimestamp().plus(1),
+ elem.getTimestamp().plus(1),
+ TimeDomain.EVENT_TIME);
+ timerInternals.setTimer(
+ namespace,
+ "cleanup",
+ "",
+ GlobalWindow.INSTANCE.maxTimestamp(),
+ GlobalWindow.INSTANCE.maxTimestamp(),
+ TimeDomain.EVENT_TIME);
+ }
+
+ @Override
+ public <KeyT> void onTimer(
+ String timerId,
+ String timerFamilyId,
+ KeyT key,
+ BoundedWindow window,
+ Instant timestamp,
+ Instant outputTimestamp,
+ TimeDomain timeDomain) {
+
+ if ("cleanup".equals(timerId)) {
+ holdState.clear();
+ } else {
+ holdState.add(outputTimestamp);
+ }
+ }
+
+ @Override
+ public void finishBundle() {
+ wrappedRunner.finishBundle();
+ }
+
+ @Override
+ public <KeyT> void onWindowExpiration(
+ BoundedWindow window, Instant timestamp, KeyT key) {
+ wrappedRunner.onWindowExpiration(window, timestamp, key);
+ }
+
+ @Override
+ public DoFn<KV<String, String>, KV<String, String>> getFn() {
+ return doFn;
+ }
+ };
+ }
+
+ @Override
+ void emitWatermarkIfHoldChanged(long currentWatermarkHold) {
+
emittedWatermarkHolds.add(keyedStateInternals.minWatermarkHoldMs());
+ }
+ };
+
+ OneInputStreamOperatorTestHarness<
+ WindowedValue<KV<String, String>>, WindowedValue<KV<String,
String>>>
+ testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ doFnOperator,
+ keySelector,
+ new CoderTypeInformation<>(
+ FlinkKeyUtils.ByteBufferCoder.of(),
FlinkPipelineOptions.defaults()));
+
+ testHarness.setup();
+
+ Instant now = Instant.now();
+
+ testHarness.open();
+
+ // process first element, set hold to `now', setup timer for `now + 1'
+ testHarness.processElement(
+ new StreamRecord<>(
+ WindowedValue.timestampedValueInGlobalWindow(KV.of("Key",
"Hello"), now)));
+
+ assertThat(emittedWatermarkHolds,
is(equalTo(Collections.singletonList(now.getMillis()))));
+
+ // fire timer, change hold to `now + 2'
+ testHarness.processWatermark(now.getMillis() + 2);
+
+ assertThat(
+ emittedWatermarkHolds, is(equalTo(Arrays.asList(now.getMillis(),
now.getMillis() + 1))));
+
+ // process second element, verify we emitted changed hold
+ testHarness.processElement(
+ new StreamRecord<>(
+ WindowedValue.timestampedValueInGlobalWindow(KV.of("Key",
"Hello"), now.plus(2))));
+
+ assertThat(
+ emittedWatermarkHolds,
+ is(equalTo(Arrays.asList(now.getMillis(), now.getMillis() + 1,
now.getMillis() + 2))));
+
+
testHarness.processWatermark(GlobalWindow.INSTANCE.maxTimestamp().plus(1).getMillis());
+
testHarness.processWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+
+ testHarness.close();
+ }
+
+ @Test
public void testLateDroppingForStatefulFn() throws Exception {
WindowingStrategy<Object, IntervalWindow> windowingStrategy =