[ 
https://issues.apache.org/jira/browse/BEAM-3909?focusedWorklogId=96617&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96617
 ]

ASF GitHub Bot logged work on BEAM-3909:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 30/Apr/18 09:26
            Start Date: 30/Apr/18 09:26
    Worklog Time Spent: 10m 
      Work Description: aljoscha closed pull request #4931: [BEAM-3909] Add 
tests for Flink DoFnOperator side-input checkpointing
URL: https://github.com/apache/beam/pull/4931
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index a9e95d980f5..7a55bc51974 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -483,6 +483,11 @@ public final void processElement1(
   @Override
   public final void processElement2(
       StreamRecord<RawUnionValue> streamRecord) throws Exception {
+    // we finish the bundle because the newly arrived side-input might
+    // make a view available that was previously not ready.
+    // The PushbackSideInputRunner will only reset it's cache of non-ready 
windows when
+    // finishing a bundle.
+    invokeFinishBundle();
     checkInvokeStartBundle();
 
     @SuppressWarnings("unchecked")
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index 02179ffe6fb..34b4befcb2d 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -526,7 +526,17 @@ public void onTimer(OnTimerContext context, 
@StateId(stateId) ValueState<String>
     testHarness.close();
   }
 
-  public void testSideInputs(boolean keyed) throws Exception {
+  @Test
+  public void testNormalParDoSideInputs() throws Exception {
+    testSideInputs(false);
+  }
+
+  @Test
+  public void testKeyedParDoSideInputs() throws Exception {
+    testSideInputs(true);
+  }
+
+  void testSideInputs(boolean keyed) throws Exception {
 
     Coder<WindowedValue<String>> coder = 
WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
 
@@ -687,6 +697,252 @@ public void processElement(ProcessContext context,
     testHarness.close();
   }
 
+  @Test
+  public void nonKeyedParDoSideInputCheckpointing() throws Exception {
+    sideInputCheckpointing(() -> {
+      Coder<WindowedValue<String>> coder =
+          WindowedValue.getFullCoder(StringUtf8Coder.of(), 
IntervalWindow.getCoder());
+      TupleTag<String> outputTag = new TupleTag<>("main-output");
+
+      ImmutableMap<Integer, PCollectionView<?>> sideInputMapping =
+          ImmutableMap.<Integer, PCollectionView<?>>builder()
+              .put(1, view1)
+              .put(2, view2)
+              .build();
+
+      DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
+          new IdentityDoFn<>(),
+          "stepName",
+          coder,
+          outputTag,
+          Collections.emptyList(),
+          new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, coder),
+          WindowingStrategy.globalDefault(),
+          sideInputMapping, /* side-input mapping */
+          ImmutableList.of(view1, view2), /* side inputs */
+          PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+          null);
+
+      return new TwoInputStreamOperatorTestHarness<>(doFnOperator);
+    });
+  }
+
+  @Test
+  public void keyedParDoSideInputCheckpointing() throws Exception {
+    sideInputCheckpointing(() -> {
+      Coder<WindowedValue<String>> coder =
+          WindowedValue.getFullCoder(StringUtf8Coder.of(), 
IntervalWindow.getCoder());
+      TupleTag<String> outputTag = new TupleTag<>("main-output");
+
+      StringUtf8Coder keyCoder = StringUtf8Coder.of();
+
+      ImmutableMap<Integer, PCollectionView<?>> sideInputMapping =
+          ImmutableMap.<Integer, PCollectionView<?>>builder()
+              .put(1, view1)
+              .put(2, view2)
+              .build();
+
+      DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
+          new IdentityDoFn<>(),
+          "stepName",
+          coder,
+          outputTag,
+          Collections.emptyList(),
+          new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, coder),
+          WindowingStrategy.globalDefault(),
+          sideInputMapping, /* side-input mapping */
+          ImmutableList.of(view1, view2), /* side inputs */
+          PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+          keyCoder);
+
+      return new KeyedTwoInputStreamOperatorTestHarness<>(
+          doFnOperator,
+          new StringKeySelector(),
+          // we use a dummy key for the second input since it is considered to 
be broadcast
+          new DummyKeySelector(),
+          BasicTypeInfo.STRING_TYPE_INFO);
+    });
+  }
+
+  void sideInputCheckpointing(
+      TestHarnessFactory<
+          TwoInputStreamOperatorTestHarness<
+              WindowedValue<String>,
+              RawUnionValue,
+              WindowedValue<String>>> harnessFactory) throws Exception {
+
+    TwoInputStreamOperatorTestHarness<WindowedValue<String>, RawUnionValue, 
WindowedValue<String>>
+        testHarness = harnessFactory.create();
+
+    testHarness.open();
+
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new 
Instant(100));
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(0), new 
Instant(500));
+
+    // push in some side inputs for both windows
+    testHarness.processElement2(
+        new StreamRecord<>(
+            new RawUnionValue(
+                1,
+                valuesInWindow(
+                    ImmutableList.of(KV.of((Void) null, "hello"), KV.of((Void) 
null, "ciao")),
+                    new Instant(0),
+                    firstWindow))));
+    testHarness.processElement2(
+        new StreamRecord<>(
+            new RawUnionValue(
+                2,
+                valuesInWindow(
+                    ImmutableList.of(KV.of((Void) null, "foo"), KV.of((Void) 
null, "bar")),
+                    new Instant(0),
+                    secondWindow))));
+
+    // snapshot state, throw away the operator, then restore and verify that 
we still match
+    // main-input elements to the side-inputs that we sent earlier
+    OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+
+    testHarness = harnessFactory.create();
+
+    testHarness.initializeState(snapshot);
+    testHarness.open();
+
+    // push in main-input elements
+    WindowedValue<String> helloElement = valueInWindow("Hello", new 
Instant(0), firstWindow);
+    WindowedValue<String> worldElement = valueInWindow("World", new 
Instant(1000), firstWindow);
+    testHarness.processElement1(new StreamRecord<>(helloElement));
+    testHarness.processElement1(new StreamRecord<>(worldElement));
+
+    assertThat(
+        stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        contains(helloElement, worldElement));
+
+    testHarness.close();
+  }
+
+  @Test
+  public void nonKeyedParDoPushbackDataCheckpointing() throws Exception {
+    pushbackDataCheckpointing(() -> {
+      Coder<WindowedValue<String>> coder =
+          WindowedValue.getFullCoder(StringUtf8Coder.of(), 
IntervalWindow.getCoder());
+
+      TupleTag<String> outputTag = new TupleTag<>("main-output");
+
+      ImmutableMap<Integer, PCollectionView<?>> sideInputMapping =
+          ImmutableMap.<Integer, PCollectionView<?>>builder()
+              .put(1, view1)
+              .put(2, view2)
+              .build();
+
+      DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
+          new IdentityDoFn<>(),
+          "stepName",
+          coder,
+          outputTag,
+          Collections.emptyList(),
+          new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, coder),
+          WindowingStrategy.globalDefault(),
+          sideInputMapping, /* side-input mapping */
+          ImmutableList.of(view1, view2), /* side inputs */
+          PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+          null);
+
+      return new TwoInputStreamOperatorTestHarness<>(doFnOperator);
+    });
+  }
+
+  @Test
+  public void keyedParDoPushbackDataCheckpointing() throws Exception {
+    pushbackDataCheckpointing(() -> {
+      Coder<WindowedValue<String>> coder =
+          WindowedValue.getFullCoder(StringUtf8Coder.of(), 
IntervalWindow.getCoder());
+
+      TupleTag<String> outputTag = new TupleTag<>("main-output");
+
+      StringUtf8Coder keyCoder = StringUtf8Coder.of();
+
+      ImmutableMap<Integer, PCollectionView<?>> sideInputMapping =
+          ImmutableMap.<Integer, PCollectionView<?>>builder()
+              .put(1, view1)
+              .put(2, view2)
+              .build();
+
+      DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
+          new IdentityDoFn<>(),
+          "stepName",
+          coder,
+          outputTag,
+          Collections.emptyList(),
+          new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, coder),
+          WindowingStrategy.globalDefault(),
+          sideInputMapping, /* side-input mapping */
+          ImmutableList.of(view1, view2), /* side inputs */
+          PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+          keyCoder);
+
+      return new KeyedTwoInputStreamOperatorTestHarness<>(
+          doFnOperator,
+          new StringKeySelector(),
+          // we use a dummy key for the second input since it is considered to 
be broadcast
+          new DummyKeySelector(),
+          BasicTypeInfo.STRING_TYPE_INFO);
+    });
+  }
+
+  void pushbackDataCheckpointing(
+      TestHarnessFactory<
+          TwoInputStreamOperatorTestHarness<
+              WindowedValue<String>,
+              RawUnionValue,
+              WindowedValue<String>>> harnessFactory) throws Exception {
+
+    TwoInputStreamOperatorTestHarness<WindowedValue<String>, RawUnionValue, 
WindowedValue<String>>
+        testHarness = harnessFactory.create();
+
+    testHarness.open();
+
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new 
Instant(100));
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(0), new 
Instant(500));
+
+    // push in main-input elements
+    WindowedValue<String> helloElement = valueInWindow("Hello", new 
Instant(0), firstWindow);
+    WindowedValue<String> worldElement = valueInWindow("World", new 
Instant(1000), firstWindow);
+    testHarness.processElement1(new StreamRecord<>(helloElement));
+    testHarness.processElement1(new StreamRecord<>(worldElement));
+
+    // snapshot state, throw away the operator, then restore and verify that 
we still match
+    // main-input elements to the side-inputs that we sent earlier
+    OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+
+    testHarness = harnessFactory.create();
+
+    testHarness.initializeState(snapshot);
+    testHarness.open();
+
+    // push in some side inputs for both windows
+    testHarness.processElement2(
+        new StreamRecord<>(
+            new RawUnionValue(
+                1,
+                valuesInWindow(
+                    ImmutableList.of(KV.of((Void) null, "hello"), KV.of((Void) 
null, "ciao")),
+                    new Instant(0),
+                    firstWindow))));
+    testHarness.processElement2(
+        new StreamRecord<>(
+            new RawUnionValue(
+                2,
+                valuesInWindow(
+                    ImmutableList.of(KV.of((Void) null, "foo"), KV.of((Void) 
null, "bar")),
+                    new Instant(0),
+                    secondWindow))));
+
+    assertThat(
+        stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        contains(helloElement, worldElement));
+
+    testHarness.close();
+  }
+
   @Test
   public void testTimersRestore() throws Exception {
     final Instant timerTimestamp = new Instant(1000);
@@ -795,21 +1051,6 @@ public void onEventTime(OnTimerContext context) {
     return new KeyedOneInputStreamOperatorTestHarness<>(doFnOperator, 
keySelector, keyCoderInfo);
   }
 
-  /**
-   * {@link TwoInputStreamOperatorTestHarness} support OperatorStateBackend,
-   * but don't support KeyedStateBackend. So we just test sideInput of normal 
ParDo.
-   */
-  @Test
-  @SuppressWarnings("unchecked")
-  public void testNormalParDoSideInputs() throws Exception {
-    testSideInputs(false);
-  }
-
-  @Test
-  public void testKeyedSideInputs() throws Exception {
-    testSideInputs(true);
-  }
-
   @Test
   @SuppressWarnings("unchecked")
   public void testBundle() throws Exception {
@@ -984,4 +1225,8 @@ public String getKey(WindowedValue<String> 
stringWindowedValue) throws Exception
       return stringWindowedValue.getValue();
     }
   }
+
+  private interface TestHarnessFactory<T> {
+    T create() throws Exception;
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 96617)
    Time Spent: 1h 10m  (was: 1h)

> Add tests for Flink DoFnOperator side-input checkpointing
> ---------------------------------------------------------
>
>                 Key: BEAM-3909
>                 URL: https://issues.apache.org/jira/browse/BEAM-3909
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-flink
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>            Priority: Major
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to