[
https://issues.apache.org/jira/browse/BEAM-3909?focusedWorklogId=96617&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96617
]
ASF GitHub Bot logged work on BEAM-3909:
----------------------------------------
Author: ASF GitHub Bot
Created on: 30/Apr/18 09:26
Start Date: 30/Apr/18 09:26
Worklog Time Spent: 10m
Work Description: aljoscha closed pull request #4931: [BEAM-3909] Add
tests for Flink DoFnOperator side-input checkpointing
URL: https://github.com/apache/beam/pull/4931
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index a9e95d980f5..7a55bc51974 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -483,6 +483,11 @@ public final void processElement1(
@Override
public final void processElement2(
StreamRecord<RawUnionValue> streamRecord) throws Exception {
+ // we finish the bundle because the newly arrived side-input might
+ // make a view available that was previously not ready.
+ // The PushbackSideInputRunner will only reset it's cache of non-ready
windows when
+ // finishing a bundle.
+ invokeFinishBundle();
checkInvokeStartBundle();
@SuppressWarnings("unchecked")
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index 02179ffe6fb..34b4befcb2d 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -526,7 +526,17 @@ public void onTimer(OnTimerContext context,
@StateId(stateId) ValueState<String>
testHarness.close();
}
- public void testSideInputs(boolean keyed) throws Exception {
+ @Test
+ public void testNormalParDoSideInputs() throws Exception {
+ testSideInputs(false);
+ }
+
+ @Test
+ public void testKeyedParDoSideInputs() throws Exception {
+ testSideInputs(true);
+ }
+
+ void testSideInputs(boolean keyed) throws Exception {
Coder<WindowedValue<String>> coder =
WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
@@ -687,6 +697,252 @@ public void processElement(ProcessContext context,
testHarness.close();
}
+ @Test
+ public void nonKeyedParDoSideInputCheckpointing() throws Exception {
+ sideInputCheckpointing(() -> {
+ Coder<WindowedValue<String>> coder =
+ WindowedValue.getFullCoder(StringUtf8Coder.of(),
IntervalWindow.getCoder());
+ TupleTag<String> outputTag = new TupleTag<>("main-output");
+
+ ImmutableMap<Integer, PCollectionView<?>> sideInputMapping =
+ ImmutableMap.<Integer, PCollectionView<?>>builder()
+ .put(1, view1)
+ .put(2, view2)
+ .build();
+
+ DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
+ new IdentityDoFn<>(),
+ "stepName",
+ coder,
+ outputTag,
+ Collections.emptyList(),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, coder),
+ WindowingStrategy.globalDefault(),
+ sideInputMapping, /* side-input mapping */
+ ImmutableList.of(view1, view2), /* side inputs */
+ PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+ null);
+
+ return new TwoInputStreamOperatorTestHarness<>(doFnOperator);
+ });
+ }
+
+ @Test
+ public void keyedParDoSideInputCheckpointing() throws Exception {
+ sideInputCheckpointing(() -> {
+ Coder<WindowedValue<String>> coder =
+ WindowedValue.getFullCoder(StringUtf8Coder.of(),
IntervalWindow.getCoder());
+ TupleTag<String> outputTag = new TupleTag<>("main-output");
+
+ StringUtf8Coder keyCoder = StringUtf8Coder.of();
+
+ ImmutableMap<Integer, PCollectionView<?>> sideInputMapping =
+ ImmutableMap.<Integer, PCollectionView<?>>builder()
+ .put(1, view1)
+ .put(2, view2)
+ .build();
+
+ DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
+ new IdentityDoFn<>(),
+ "stepName",
+ coder,
+ outputTag,
+ Collections.emptyList(),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, coder),
+ WindowingStrategy.globalDefault(),
+ sideInputMapping, /* side-input mapping */
+ ImmutableList.of(view1, view2), /* side inputs */
+ PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+ keyCoder);
+
+ return new KeyedTwoInputStreamOperatorTestHarness<>(
+ doFnOperator,
+ new StringKeySelector(),
+ // we use a dummy key for the second input since it is considered to
be broadcast
+ new DummyKeySelector(),
+ BasicTypeInfo.STRING_TYPE_INFO);
+ });
+ }
+
+ void sideInputCheckpointing(
+ TestHarnessFactory<
+ TwoInputStreamOperatorTestHarness<
+ WindowedValue<String>,
+ RawUnionValue,
+ WindowedValue<String>>> harnessFactory) throws Exception {
+
+ TwoInputStreamOperatorTestHarness<WindowedValue<String>, RawUnionValue,
WindowedValue<String>>
+ testHarness = harnessFactory.create();
+
+ testHarness.open();
+
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new
Instant(100));
+ IntervalWindow secondWindow = new IntervalWindow(new Instant(0), new
Instant(500));
+
+ // push in some side inputs for both windows
+ testHarness.processElement2(
+ new StreamRecord<>(
+ new RawUnionValue(
+ 1,
+ valuesInWindow(
+ ImmutableList.of(KV.of((Void) null, "hello"), KV.of((Void)
null, "ciao")),
+ new Instant(0),
+ firstWindow))));
+ testHarness.processElement2(
+ new StreamRecord<>(
+ new RawUnionValue(
+ 2,
+ valuesInWindow(
+ ImmutableList.of(KV.of((Void) null, "foo"), KV.of((Void)
null, "bar")),
+ new Instant(0),
+ secondWindow))));
+
+ // snapshot state, throw away the operator, then restore and verify that
we still match
+ // main-input elements to the side-inputs that we sent earlier
+ OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+
+ testHarness = harnessFactory.create();
+
+ testHarness.initializeState(snapshot);
+ testHarness.open();
+
+ // push in main-input elements
+ WindowedValue<String> helloElement = valueInWindow("Hello", new
Instant(0), firstWindow);
+ WindowedValue<String> worldElement = valueInWindow("World", new
Instant(1000), firstWindow);
+ testHarness.processElement1(new StreamRecord<>(helloElement));
+ testHarness.processElement1(new StreamRecord<>(worldElement));
+
+ assertThat(
+ stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+ contains(helloElement, worldElement));
+
+ testHarness.close();
+ }
+
+ @Test
+ public void nonKeyedParDoPushbackDataCheckpointing() throws Exception {
+ pushbackDataCheckpointing(() -> {
+ Coder<WindowedValue<String>> coder =
+ WindowedValue.getFullCoder(StringUtf8Coder.of(),
IntervalWindow.getCoder());
+
+ TupleTag<String> outputTag = new TupleTag<>("main-output");
+
+ ImmutableMap<Integer, PCollectionView<?>> sideInputMapping =
+ ImmutableMap.<Integer, PCollectionView<?>>builder()
+ .put(1, view1)
+ .put(2, view2)
+ .build();
+
+ DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
+ new IdentityDoFn<>(),
+ "stepName",
+ coder,
+ outputTag,
+ Collections.emptyList(),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, coder),
+ WindowingStrategy.globalDefault(),
+ sideInputMapping, /* side-input mapping */
+ ImmutableList.of(view1, view2), /* side inputs */
+ PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+ null);
+
+ return new TwoInputStreamOperatorTestHarness<>(doFnOperator);
+ });
+ }
+
+ @Test
+ public void keyedParDoPushbackDataCheckpointing() throws Exception {
+ pushbackDataCheckpointing(() -> {
+ Coder<WindowedValue<String>> coder =
+ WindowedValue.getFullCoder(StringUtf8Coder.of(),
IntervalWindow.getCoder());
+
+ TupleTag<String> outputTag = new TupleTag<>("main-output");
+
+ StringUtf8Coder keyCoder = StringUtf8Coder.of();
+
+ ImmutableMap<Integer, PCollectionView<?>> sideInputMapping =
+ ImmutableMap.<Integer, PCollectionView<?>>builder()
+ .put(1, view1)
+ .put(2, view2)
+ .build();
+
+ DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
+ new IdentityDoFn<>(),
+ "stepName",
+ coder,
+ outputTag,
+ Collections.emptyList(),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, coder),
+ WindowingStrategy.globalDefault(),
+ sideInputMapping, /* side-input mapping */
+ ImmutableList.of(view1, view2), /* side inputs */
+ PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+ keyCoder);
+
+ return new KeyedTwoInputStreamOperatorTestHarness<>(
+ doFnOperator,
+ new StringKeySelector(),
+ // we use a dummy key for the second input since it is considered to
be broadcast
+ new DummyKeySelector(),
+ BasicTypeInfo.STRING_TYPE_INFO);
+ });
+ }
+
+ void pushbackDataCheckpointing(
+ TestHarnessFactory<
+ TwoInputStreamOperatorTestHarness<
+ WindowedValue<String>,
+ RawUnionValue,
+ WindowedValue<String>>> harnessFactory) throws Exception {
+
+ TwoInputStreamOperatorTestHarness<WindowedValue<String>, RawUnionValue,
WindowedValue<String>>
+ testHarness = harnessFactory.create();
+
+ testHarness.open();
+
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new
Instant(100));
+ IntervalWindow secondWindow = new IntervalWindow(new Instant(0), new
Instant(500));
+
+ // push in main-input elements
+ WindowedValue<String> helloElement = valueInWindow("Hello", new
Instant(0), firstWindow);
+ WindowedValue<String> worldElement = valueInWindow("World", new
Instant(1000), firstWindow);
+ testHarness.processElement1(new StreamRecord<>(helloElement));
+ testHarness.processElement1(new StreamRecord<>(worldElement));
+
+ // snapshot state, throw away the operator, then restore and verify that
we still match
+ // main-input elements to the side-inputs that we sent earlier
+ OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+
+ testHarness = harnessFactory.create();
+
+ testHarness.initializeState(snapshot);
+ testHarness.open();
+
+ // push in some side inputs for both windows
+ testHarness.processElement2(
+ new StreamRecord<>(
+ new RawUnionValue(
+ 1,
+ valuesInWindow(
+ ImmutableList.of(KV.of((Void) null, "hello"), KV.of((Void)
null, "ciao")),
+ new Instant(0),
+ firstWindow))));
+ testHarness.processElement2(
+ new StreamRecord<>(
+ new RawUnionValue(
+ 2,
+ valuesInWindow(
+ ImmutableList.of(KV.of((Void) null, "foo"), KV.of((Void)
null, "bar")),
+ new Instant(0),
+ secondWindow))));
+
+ assertThat(
+ stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+ contains(helloElement, worldElement));
+
+ testHarness.close();
+ }
+
@Test
public void testTimersRestore() throws Exception {
final Instant timerTimestamp = new Instant(1000);
@@ -795,21 +1051,6 @@ public void onEventTime(OnTimerContext context) {
return new KeyedOneInputStreamOperatorTestHarness<>(doFnOperator,
keySelector, keyCoderInfo);
}
- /**
- * {@link TwoInputStreamOperatorTestHarness} support OperatorStateBackend,
- * but don't support KeyedStateBackend. So we just test sideInput of normal
ParDo.
- */
- @Test
- @SuppressWarnings("unchecked")
- public void testNormalParDoSideInputs() throws Exception {
- testSideInputs(false);
- }
-
- @Test
- public void testKeyedSideInputs() throws Exception {
- testSideInputs(true);
- }
-
@Test
@SuppressWarnings("unchecked")
public void testBundle() throws Exception {
@@ -984,4 +1225,8 @@ public String getKey(WindowedValue<String>
stringWindowedValue) throws Exception
return stringWindowedValue.getValue();
}
}
+
+ private interface TestHarnessFactory<T> {
+ T create() throws Exception;
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 96617)
Time Spent: 1h 10m (was: 1h)
> Add tests for Flink DoFnOperator side-input checkpointing
> ---------------------------------------------------------
>
> Key: BEAM-3909
> URL: https://issues.apache.org/jira/browse/BEAM-3909
> Project: Beam
> Issue Type: Improvement
> Components: runner-flink
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
> Priority: Major
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)