[
https://issues.apache.org/jira/browse/BEAM-3909?focusedWorklogId=85741&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85741
]
ASF GitHub Bot logged work on BEAM-3909:
----------------------------------------
Author: ASF GitHub Bot
Created on: 29/Mar/18 18:00
Start Date: 29/Mar/18 18:00
Worklog Time Spent: 10m
Work Description: tweise commented on a change in pull request #4931:
[BEAM-3909] Add tests for Flink DoFnOperator side-input checkpointing
URL: https://github.com/apache/beam/pull/4931#discussion_r178136261
##########
File path:
runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
##########
@@ -610,6 +620,252 @@ public void testSideInputs(boolean keyed) throws
Exception {
}
+ @Test
+ public void nonKeyedParDoSideInputCheckpointing() throws Exception {
+ sideInputCheckpointing(() -> {
+ Coder<WindowedValue<String>> coder =
+ WindowedValue.getFullCoder(StringUtf8Coder.of(),
IntervalWindow.getCoder());
+ TupleTag<String> outputTag = new TupleTag<>("main-output");
+
+ ImmutableMap<Integer, PCollectionView<?>> sideInputMapping =
+ ImmutableMap.<Integer, PCollectionView<?>>builder()
+ .put(1, view1)
+ .put(2, view2)
+ .build();
+
+ DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
+ new IdentityDoFn<>(),
+ "stepName",
+ coder,
+ outputTag,
+ Collections.emptyList(),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, coder),
+ WindowingStrategy.globalDefault(),
+ sideInputMapping, /* side-input mapping */
+ ImmutableList.of(view1, view2), /* side inputs */
+ PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+ null);
+
+ return new TwoInputStreamOperatorTestHarness<>(doFnOperator);
+ });
+ }
+
+ @Test
+ public void keyedParDoSideInputCheckpointing() throws Exception {
+ sideInputCheckpointing(() -> {
+ Coder<WindowedValue<String>> coder =
+ WindowedValue.getFullCoder(StringUtf8Coder.of(),
IntervalWindow.getCoder());
+ TupleTag<String> outputTag = new TupleTag<>("main-output");
+
+ StringUtf8Coder keyCoder = StringUtf8Coder.of();
+
+ ImmutableMap<Integer, PCollectionView<?>> sideInputMapping =
+ ImmutableMap.<Integer, PCollectionView<?>>builder()
+ .put(1, view1)
+ .put(2, view2)
+ .build();
+
+ DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
+ new IdentityDoFn<>(),
+ "stepName",
+ coder,
+ outputTag,
+ Collections.emptyList(),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, coder),
+ WindowingStrategy.globalDefault(),
+ sideInputMapping, /* side-input mapping */
+ ImmutableList.of(view1, view2), /* side inputs */
+ PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+ keyCoder);
+
+ return new KeyedTwoInputStreamOperatorTestHarness<>(
+ doFnOperator,
+ new StringKeySelector(),
+ // we use a dummy key for the second input since it is considered to
be broadcast
+ new DummyKeySelector(),
+ BasicTypeInfo.STRING_TYPE_INFO);
+ });
+ }
+
+ void sideInputCheckpointing(
+ TestHarnessFactory<
+ TwoInputStreamOperatorTestHarness<
+ WindowedValue<String>,
+ RawUnionValue,
+ WindowedValue<String>>> harnessFactory) throws Exception {
+
+ TwoInputStreamOperatorTestHarness<WindowedValue<String>, RawUnionValue,
WindowedValue<String>>
+ testHarness = harnessFactory.create();
+
+ testHarness.open();
+
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new
Instant(100));
Review comment:
nit: use WINDOW_MSECS_1 and WINDOW_MSECS_2 ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 85741)
Time Spent: 20m (was: 10m)
> Add tests for Flink DoFnOperator side-input checkpointing
> ---------------------------------------------------------
>
> Key: BEAM-3909
> URL: https://issues.apache.org/jira/browse/BEAM-3909
> Project: Beam
> Issue Type: Improvement
> Components: runner-flink
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
> Priority: Major
> Time Spent: 20m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)