This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new bea8849 Merge pull request #15994: [BEAM-13263] Support
OnWindowExpiration in (non-portable) Flink runner
bea8849 is described below
commit bea88490629e458175fed5ebfe5da6d74794375b
Author: reuvenlax <[email protected]>
AuthorDate: Tue Dec 14 08:14:21 2021 -0800
Merge pull request #15994: [BEAM-13263] Support OnWindowExpiration in
(non-portable) Flink runner
---
.../org/apache/beam/runners/core/DoFnRunners.java | 2 -
.../apache/beam/runners/core/ProcessFnRunner.java | 5 ++
.../runners/core/PushbackSideInputDoFnRunner.java | 3 +
.../core/SimplePushbackSideInputDoFnRunner.java | 5 ++
.../beam/runners/core/StatefulDoFnRunner.java | 3 +-
runners/flink/flink_runner.gradle | 1 -
.../functions/FlinkStatefulDoFnFunction.java | 21 +++++
.../wrappers/streaming/DoFnOperator.java | 10 ++-
.../org/apache/beam/sdk/transforms/ParDoTest.java | 95 +++++++++++++++-------
9 files changed, 111 insertions(+), 34 deletions(-)
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index 8a16d48..c2ae8cf 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -112,7 +112,6 @@ public class DoFnRunners {
WindowingStrategy<?, ?> windowingStrategy,
CleanupTimer<InputT> cleanupTimer,
StateCleaner<W> stateCleaner) {
-
return defaultStatefulDoFnRunner(
fn,
inputCoder,
@@ -144,7 +143,6 @@ public class DoFnRunners {
CleanupTimer<InputT> cleanupTimer,
StateCleaner<W> stateCleaner,
boolean requiresTimeSortedInputSupported) {
-
boolean doFnRequiresTimeSortedInput =
DoFnSignatures.signatureForDoFn(doFnRunner.getFn())
.processElement()
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
index 4b6ed65..0aa61eb 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
@@ -94,6 +94,11 @@ public class ProcessFnRunner<InputT, OutputT, RestrictionT>
throw new UnsupportedOperationException("User timers unsupported in
ProcessFn");
}
+ @Override
+ public <KeyT> void onWindowExpiration(BoundedWindow window, Instant
outputTimestamp, KeyT key) {
+ throw new UnsupportedOperationException("OnWindowExpiration unsupported in
ProcessFn");
+ }
+
private static <T> void checkTrivialOuterWindows(
WindowedValue<KeyedWorkItem<byte[], T>> windowedKWI) {
// In practice it will be in 0 or 1 windows (ValueInEmptyWindows or
ValueInGlobalWindow)
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
index 1fd9eb2..1b2d90e 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
@@ -52,6 +52,9 @@ public interface PushbackSideInputDoFnRunner<InputT, OutputT>
{
Instant outputTimestamp,
TimeDomain timeDomain);
+ /** Calls the underlying {@link DoFn.OnWindowExpiration} method. */
+ <KeyT> void onWindowExpiration(BoundedWindow window, Instant
outputTimestamp, KeyT key);
+
/** Calls the underlying {@link DoFn.FinishBundle} method. */
void finishBundle();
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
index 5004337..31cfa6b 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
@@ -120,6 +120,11 @@ public class SimplePushbackSideInputDoFnRunner<InputT,
OutputT>
}
@Override
+ public <KeyT> void onWindowExpiration(BoundedWindow window, Instant
outputTimestamp, KeyT key) {
+ underlying.onWindowExpiration(window, outputTimestamp, key);
+ }
+
+ @Override
public void finishBundle() {
notReadyWindows = null;
underlying.finishBundle();
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
index 6e4f1c3..39eac1d 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
@@ -141,7 +141,6 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends
BoundedWindow>
@Override
public void processElement(WindowedValue<InputT> input) {
-
// StatefulDoFnRunner always observes windows, so we need to explode
for (WindowedValue<InputT> value : input.explodeWindows()) {
BoundedWindow window = value.getWindows().iterator().next();
@@ -220,7 +219,7 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends
BoundedWindow>
if (requiresTimeSortedInput) {
onSortFlushTimer(window, BoundedWindow.TIMESTAMP_MAX_VALUE);
}
- doFnRunner.onWindowExpiration(window, outputTimestamp, key);
+ onWindowExpiration(window, outputTimestamp, key);
stateCleaner.clearForWindow(window);
} else {
// An event-time timer can never be late because we don't allow setting
timers after GC time.
diff --git a/runners/flink/flink_runner.gradle
b/runners/flink/flink_runner.gradle
index f400974..97d59a4 100644
--- a/runners/flink/flink_runner.gradle
+++ b/runners/flink/flink_runner.gradle
@@ -224,7 +224,6 @@ def createValidatesRunnerTask(Map m) {
excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesSystemMetrics'
- excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
excludeCategories 'org.apache.beam.sdk.testing.UsesStrictTimerOrdering'
excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'
excludeCategories 'org.apache.beam.sdk.testing.UsesLoopingTimer'
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
index d007ce1..c03b27a 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
@@ -19,10 +19,12 @@ package org.apache.beam.runners.flink.translation.functions;
import static org.apache.flink.util.Preconditions.checkArgument;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.InMemoryStateInternals;
@@ -45,6 +47,7 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
@@ -56,6 +59,7 @@ import
org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
+import org.joda.time.Duration;
import org.joda.time.Instant;
/** A {@link RichGroupReduceFunction} for stateful {@link ParDo} in Flink
Batch Runner. */
@@ -67,6 +71,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
extends RichGroupReduceFunction<WindowedValue<KV<K, V>>,
WindowedValue<RawUnionValue>> {
private final DoFn<KV<K, V>, OutputT> dofn;
+ private final boolean usesOnWindowExpiration;
private String stepName;
private final WindowingStrategy<?, ?> windowingStrategy;
private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
@@ -95,6 +100,8 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
Map<String, PCollectionView<?>> sideInputMapping) {
this.dofn = dofn;
+ this.usesOnWindowExpiration =
+ DoFnSignatures.signatureForDoFn(dofn).onWindowExpiration() != null;
this.stepName = stepName;
this.windowingStrategy = windowingStrategy;
this.sideInputs = sideInputs;
@@ -137,6 +144,8 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
timerInternals.advanceProcessingTime(Instant.now());
timerInternals.advanceSynchronizedProcessingTime(Instant.now());
+ final Set<BoundedWindow> windowsSeen = new HashSet<>();
+
List<TupleTag<?>> additionalOutputTags =
Lists.newArrayList(outputMap.keySet());
DoFnRunner<KV<K, V>, OutputT> doFnRunner =
@@ -172,8 +181,14 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
doFnRunner.startBundle();
doFnRunner.processElement(currentValue);
+ if (usesOnWindowExpiration) {
+ windowsSeen.addAll(currentValue.getWindows());
+ }
while (iterator.hasNext()) {
currentValue = iterator.next();
+ if (usesOnWindowExpiration) {
+ windowsSeen.addAll(currentValue.getWindows());
+ }
doFnRunner.processElement(currentValue);
}
@@ -186,6 +201,12 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
fireEligibleTimers(key, timerInternals, doFnRunner);
+ if (usesOnWindowExpiration) {
+ for (BoundedWindow window : windowsSeen) {
+ doFnRunner.onWindowExpiration(window,
window.maxTimestamp().minus(Duration.millis(1)), key);
+ }
+ }
+
doFnRunner.finishBundle();
}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 4f4f6c1..f78f621 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -199,6 +199,8 @@ public class DoFnOperator<InputT, OutputT>
/** If true, we must process elements only after a checkpoint is finished. */
private final boolean requiresStableInput;
+ private final boolean usesOnWindowExpiration;
+
private final boolean finishBundleBeforeCheckpointing;
/** Stores new finalizations being gathered. */
@@ -303,6 +305,8 @@ public class DoFnOperator<InputT, OutputT>
// WindowDoFnOperator does not use a DoFn
doFn != null
&&
DoFnSignatures.getSignature(doFn.getClass()).processElement().requiresStableInput();
+ this.usesOnWindowExpiration =
+ doFn != null &&
DoFnSignatures.getSignature(doFn.getClass()).onWindowExpiration() != null;
if (requiresStableInput) {
Preconditions.checkState(
@@ -340,11 +344,12 @@ public class DoFnOperator<InputT, OutputT>
timerInternals, windowingStrategy) {
@Override
public void setForWindow(InputT input, BoundedWindow window) {
- if (!window.equals(GlobalWindow.INSTANCE)) {
+ if (!window.equals(GlobalWindow.INSTANCE) ||
usesOnWindowExpiration) {
// Skip setting a cleanup timer for the global window as these
timers
// lead to potentially unbounded state growth in the runner,
depending on key
// cardinality. Cleanup for global window will be performed
upon arrival of the
// final watermark.
+ // In the case of OnWindowExpiration, we still set the timer.
super.setForWindow(input, window);
}
}
@@ -796,11 +801,14 @@ public class DoFnOperator<InputT, OutputT>
if (watermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
invokeFinishBundle();
}
+
LOG.debug("Emitting watermark {}", watermark);
currentOutputWatermark = watermark;
output.emitWatermark(new Watermark(watermark));
// Check if the final watermark was triggered to perform state cleanup
for global window
+ // TODO: Do we need to do this when OnWindowExpiration is set, since in
that case we have a
+ // cleanup timer?
if (keyedStateInternals != null
&& currentOutputWatermark
>
adjustTimestampForFlink(GlobalWindow.INSTANCE.maxTimestamp().getMillis())) {
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 9473318..9018987 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -76,6 +76,7 @@ import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.state.BagState;
@@ -1769,14 +1770,12 @@ public class ParDoTest implements Serializable {
@Timestamp Instant timestamp,
OutputReceiver<String> r) {
r.output(element);
- System.out.println("Process: " + element + ":" +
timestamp.getMillis());
}
@FinishBundle
public void finishBundle(FinishBundleContext c) {
Instant ts = new Instant(3);
c.output("finish", ts, windowFn.assignWindow(ts));
- System.out.println("Finish: 3");
}
}))
.apply(ParDo.of(new PrintingDoFn()));
@@ -2546,13 +2545,23 @@ public class ParDoTest implements Serializable {
}
@Test
- @Category({ValidatesRunner.class, UsesStatefulParDo.class,
UsesOrderedListState.class})
+ @Category({
+ ValidatesRunner.class,
+ UsesStatefulParDo.class,
+ UsesOrderedListState.class,
+ UsesOnWindowExpiration.class
+ })
public void testOrderedListStateBounded() {
testOrderedListStateImpl(false);
}
@Test
- @Category({ValidatesRunner.class, UsesStatefulParDo.class,
UsesOrderedListState.class})
+ @Category({
+ ValidatesRunner.class,
+ UsesStatefulParDo.class,
+ UsesOrderedListState.class,
+ UsesOnWindowExpiration.class
+ })
public void testOrderedListStateUnbounded() {
testOrderedListStateImpl(true);
}
@@ -5730,8 +5739,6 @@ public class ParDoTest implements Serializable {
@Timestamp Instant ts,
@TimerFamily(timerFamilyId) TimerMap timerMap,
OutputReceiver<String> r) {
- System.out.println("timer Id : " + timerId);
- System.out.println("timerMap : " + timerMap.toString());
r.output(timerId);
}
};
@@ -6059,7 +6066,18 @@ public class ParDoTest implements Serializable {
UsesOnWindowExpiration.class
})
public void testOnWindowExpirationSimpleBounded() {
- runOnWindowExpirationSimple(false);
+ runOnWindowExpirationSimple(false, false);
+ }
+
+ @Test
+ @Category({
+ ValidatesRunner.class,
+ UsesStatefulParDo.class,
+ UsesTimersInParDo.class,
+ UsesOnWindowExpiration.class
+ })
+ public void testOnWindowExpirationSimpleBoundedGlobal() {
+ runOnWindowExpirationSimple(false, true);
}
@Test
@@ -6071,10 +6089,22 @@ public class ParDoTest implements Serializable {
UsesUnboundedPCollections.class
})
public void testOnWindowExpirationSimpleUnbounded() {
- runOnWindowExpirationSimple(true);
+ runOnWindowExpirationSimple(true, false);
+ }
+
+ @Test
+ @Category({
+ ValidatesRunner.class,
+ UsesStatefulParDo.class,
+ UsesTimersInParDo.class,
+ UsesOnWindowExpiration.class,
+ UsesUnboundedPCollections.class
+ })
+ public void testOnWindowExpirationSimpleUnboundedGlobal() {
+ runOnWindowExpirationSimple(true, true);
}
- public void runOnWindowExpirationSimple(boolean useStreaming) {
+ public void runOnWindowExpirationSimple(boolean useStreaming, boolean
globalWindow) {
final String stateId = "foo";
final String timerId = "bar";
IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new
Instant(10));
@@ -6112,31 +6142,40 @@ public class ParDoTest implements Serializable {
Integer currentValue = MoreObjects.firstNonNull(state.read(), 0);
// verify state
assertEquals(1, (int) currentValue);
+ System.err.println("KEY " + key + " VALUE " + currentValue);
// To check output is received from OnWindowExpiration
r.output(currentValue);
}
};
- PCollection<Integer> output =
- pipeline
- .apply(
- Create.timestamped(
- // first window
- TimestampedValue.of(KV.of("hello", 7), new Instant(3)),
-
- // second window
- TimestampedValue.of(KV.of("hi", 35), new Instant(13))))
- .apply(Window.into(FixedWindows.of(Duration.millis(10))))
- .setIsBoundedInternal(useStreaming ? IsBounded.UNBOUNDED :
IsBounded.BOUNDED)
- .apply(ParDo.of(fn));
+ if (useStreaming) {
+ pipeline.getOptions().as(StreamingOptions.class).setStreaming(true);
+ }
- PAssert.that(output)
- .inWindow(firstWindow)
- // verify output
- .containsInAnyOrder(1)
- .inWindow(secondWindow)
- // verify output
- .containsInAnyOrder(1);
+ PCollection<KV<String, Integer>> intermediate =
+ pipeline.apply(
+ Create.timestamped(
+ // first window
+ TimestampedValue.of(KV.of("hello", 7), new Instant(3)),
+
+ // second window
+ TimestampedValue.of(KV.of("hi", 35), new Instant(13))));
+ if (!globalWindow) {
+ intermediate =
intermediate.apply(Window.into(FixedWindows.of(Duration.millis(10))));
+ }
+ PCollection<Integer> output = intermediate.apply(ParDo.of(fn));
+
+ if (!globalWindow) {
+ PAssert.that(output)
+ .inWindow(firstWindow)
+ // verify output
+ .containsInAnyOrder(1)
+ .inWindow(secondWindow)
+ // verify output
+ .containsInAnyOrder(1);
+ } else {
+
PAssert.that(output).inWindow(GlobalWindow.INSTANCE).containsInAnyOrder(1, 1);
+ }
pipeline.run();
}
}