[
https://issues.apache.org/jira/browse/BEAM-3909?focusedWorklogId=85744&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85744
]
ASF GitHub Bot logged work on BEAM-3909:
----------------------------------------
Author: ASF GitHub Bot
Created on: 29/Mar/18 18:05
Start Date: 29/Mar/18 18:05
Worklog Time Spent: 10m
Work Description: tweise commented on a change in pull request #4931:
[BEAM-3909] Add tests for Flink DoFnOperator side-input checkpointing
URL: https://github.com/apache/beam/pull/4931#discussion_r178137435
##########
File path:
runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
##########
@@ -610,6 +620,252 @@ public void testSideInputs(boolean keyed) throws
Exception {
}
+ @Test
+ public void nonKeyedParDoSideInputCheckpointing() throws Exception {
+ sideInputCheckpointing(() -> {
+ Coder<WindowedValue<String>> coder =
+ WindowedValue.getFullCoder(StringUtf8Coder.of(),
IntervalWindow.getCoder());
+ TupleTag<String> outputTag = new TupleTag<>("main-output");
+
+ ImmutableMap<Integer, PCollectionView<?>> sideInputMapping =
+ ImmutableMap.<Integer, PCollectionView<?>>builder()
+ .put(1, view1)
+ .put(2, view2)
+ .build();
+
+ DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
+ new IdentityDoFn<>(),
+ "stepName",
+ coder,
+ outputTag,
+ Collections.emptyList(),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, coder),
+ WindowingStrategy.globalDefault(),
+ sideInputMapping, /* side-input mapping */
+ ImmutableList.of(view1, view2), /* side inputs */
+ PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+ null);
+
+ return new TwoInputStreamOperatorTestHarness<>(doFnOperator);
+ });
+ }
+
+ @Test
+ public void keyedParDoSideInputCheckpointing() throws Exception {
+ sideInputCheckpointing(() -> {
+ Coder<WindowedValue<String>> coder =
+ WindowedValue.getFullCoder(StringUtf8Coder.of(),
IntervalWindow.getCoder());
+ TupleTag<String> outputTag = new TupleTag<>("main-output");
+
+ StringUtf8Coder keyCoder = StringUtf8Coder.of();
+
+ ImmutableMap<Integer, PCollectionView<?>> sideInputMapping =
+ ImmutableMap.<Integer, PCollectionView<?>>builder()
+ .put(1, view1)
+ .put(2, view2)
+ .build();
+
+ DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
+ new IdentityDoFn<>(),
+ "stepName",
+ coder,
+ outputTag,
+ Collections.emptyList(),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, coder),
+ WindowingStrategy.globalDefault(),
+ sideInputMapping, /* side-input mapping */
+ ImmutableList.of(view1, view2), /* side inputs */
+ PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+ keyCoder);
+
+ return new KeyedTwoInputStreamOperatorTestHarness<>(
+ doFnOperator,
+ new StringKeySelector(),
+ // we use a dummy key for the second input since it is considered to
be broadcast
+ new DummyKeySelector(),
+ BasicTypeInfo.STRING_TYPE_INFO);
+ });
+ }
+
+ void sideInputCheckpointing(
+ TestHarnessFactory<
+ TwoInputStreamOperatorTestHarness<
+ WindowedValue<String>,
+ RawUnionValue,
+ WindowedValue<String>>> harnessFactory) throws Exception {
+
+ TwoInputStreamOperatorTestHarness<WindowedValue<String>, RawUnionValue,
WindowedValue<String>>
+ testHarness = harnessFactory.create();
+
+ testHarness.open();
+
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new
Instant(100));
+ IntervalWindow secondWindow = new IntervalWindow(new Instant(0), new
Instant(500));
+
+ // push in some side inputs for both windows
+ testHarness.processElement2(
+ new StreamRecord<>(
+ new RawUnionValue(
+ 1,
+ valuesInWindow(
+ ImmutableList.of(KV.of((Void) null, "hello"), KV.of((Void)
null, "ciao")),
+ new Instant(0),
+ firstWindow))));
+ testHarness.processElement2(
+ new StreamRecord<>(
+ new RawUnionValue(
+ 2,
+ valuesInWindow(
+ ImmutableList.of(KV.of((Void) null, "foo"), KV.of((Void)
null, "bar")),
+ new Instant(0),
+ secondWindow))));
+
+ // snapshot state, throw away the operator, then restore and verify that
we still match
+ // main-input elements to the side-inputs that we sent earlier
+ OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+
+ testHarness = harnessFactory.create();
+
+ testHarness.initializeState(snapshot);
+ testHarness.open();
+
+ // push in main-input elements
+ WindowedValue<String> helloElement = valueInWindow("Hello", new
Instant(0), firstWindow);
+ WindowedValue<String> worldElement = valueInWindow("World", new
Instant(1000), firstWindow);
+ testHarness.processElement1(new StreamRecord<>(helloElement));
+ testHarness.processElement1(new StreamRecord<>(worldElement));
+
+ assertThat(
+
this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+ contains(helloElement, worldElement));
+
+ testHarness.close();
+ }
+
+ @Test
+ public void nonKeyedParDoPushbackDataCheckpointing() throws Exception {
+ pushbackDataCheckpointing(() -> {
+ Coder<WindowedValue<String>> coder =
+ WindowedValue.getFullCoder(StringUtf8Coder.of(),
IntervalWindow.getCoder());
+
+ TupleTag<String> outputTag = new TupleTag<>("main-output");
+
+ ImmutableMap<Integer, PCollectionView<?>> sideInputMapping =
+ ImmutableMap.<Integer, PCollectionView<?>>builder()
+ .put(1, view1)
+ .put(2, view2)
+ .build();
+
+ DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
+ new IdentityDoFn<>(),
+ "stepName",
+ coder,
+ outputTag,
+ Collections.emptyList(),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, coder),
+ WindowingStrategy.globalDefault(),
+ sideInputMapping, /* side-input mapping */
+ ImmutableList.of(view1, view2), /* side inputs */
+ PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+ null);
+
+ return new TwoInputStreamOperatorTestHarness<>(doFnOperator);
+ });
+ }
+
+ @Test
+ public void keyedParDoPushbackDataCheckpointing() throws Exception {
+ pushbackDataCheckpointing(() -> {
+ Coder<WindowedValue<String>> coder =
+ WindowedValue.getFullCoder(StringUtf8Coder.of(),
IntervalWindow.getCoder());
+
+ TupleTag<String> outputTag = new TupleTag<>("main-output");
+
+ StringUtf8Coder keyCoder = StringUtf8Coder.of();
+
+ ImmutableMap<Integer, PCollectionView<?>> sideInputMapping =
+ ImmutableMap.<Integer, PCollectionView<?>>builder()
+ .put(1, view1)
+ .put(2, view2)
+ .build();
+
+ DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
+ new IdentityDoFn<>(),
+ "stepName",
+ coder,
+ outputTag,
+ Collections.emptyList(),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, coder),
+ WindowingStrategy.globalDefault(),
+ sideInputMapping, /* side-input mapping */
+ ImmutableList.of(view1, view2), /* side inputs */
+ PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+ keyCoder);
+
+ return new KeyedTwoInputStreamOperatorTestHarness<>(
+ doFnOperator,
+ new StringKeySelector(),
+ // we use a dummy key for the second input since it is considered to
be broadcast
+ new DummyKeySelector(),
+ BasicTypeInfo.STRING_TYPE_INFO);
+ });
+ }
+
+ void pushbackDataCheckpointing(
+ TestHarnessFactory<
+ TwoInputStreamOperatorTestHarness<
+ WindowedValue<String>,
+ RawUnionValue,
+ WindowedValue<String>>> harnessFactory) throws Exception {
+
+ TwoInputStreamOperatorTestHarness<WindowedValue<String>, RawUnionValue,
WindowedValue<String>>
+ testHarness = harnessFactory.create();
+
+ testHarness.open();
+
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new
Instant(100));
+ IntervalWindow secondWindow = new IntervalWindow(new Instant(0), new
Instant(500));
+
+ // push in main-input elements
+ WindowedValue<String> helloElement = valueInWindow("Hello", new
Instant(0), firstWindow);
+ WindowedValue<String> worldElement = valueInWindow("World", new
Instant(1000), firstWindow);
+ testHarness.processElement1(new StreamRecord<>(helloElement));
+ testHarness.processElement1(new StreamRecord<>(worldElement));
+
+ // snapshot state, throw away the operator, then restore and verify that
we still match
+ // main-input elements to the side-inputs that we sent earlier
+ OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+
+ testHarness = harnessFactory.create();
+
+ testHarness.initializeState(snapshot);
+ testHarness.open();
+
+ // push in some side inputs for both windows
+ testHarness.processElement2(
+ new StreamRecord<>(
+ new RawUnionValue(
+ 1,
+ valuesInWindow(
+ ImmutableList.of(KV.of((Void) null, "hello"), KV.of((Void)
null, "ciao")),
+ new Instant(0),
+ firstWindow))));
+ testHarness.processElement2(
Review comment:
Just a clarification for myself: The use of 2 side inputs vs. one isn't
relevant for the new tests, that's just a byproduct of using the existing
setup?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 85744)
Time Spent: 0.5h (was: 20m)
> Add tests for Flink DoFnOperator side-input checkpointing
> ---------------------------------------------------------
>
> Key: BEAM-3909
> URL: https://issues.apache.org/jira/browse/BEAM-3909
> Project: Beam
> Issue Type: Improvement
> Components: runner-flink
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
> Priority: Major
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)