Repository: beam
Updated Branches:
  refs/heads/master f1ea8f951 -> eaf4450f2


[BEAM-1255] java.io.NotSerializableException in flink on UnboundedSource
fix javadoc for BoundedSourceWrapper


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c2344e94
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c2344e94
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c2344e94

Branch: refs/heads/master
Commit: c2344e944b25d884f25375ea7fce3a9c203cdb9a
Parents: a93e218
Author: Alexey Diomin <[email protected]>
Authored: Thu Jan 12 10:44:43 2017 +0400
Committer: Alexey Diomin <[email protected]>
Committed: Thu Jan 12 15:40:37 2017 +0400

----------------------------------------------------------------------
 .../streaming/io/BoundedSourceWrapper.java      |   2 +-
 .../streaming/io/UnboundedSourceWrapper.java    |   2 +-
 .../streaming/UnboundedSourceWrapperTest.java   | 464 ++++++++++---------
 3 files changed, 250 insertions(+), 218 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c2344e94/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
index df49a49..909cb0e 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
@@ -35,7 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Wrapper for executing {@link BoundedSource UnboundedSources} as a Flink 
Source.
+ * Wrapper for executing {@link BoundedSource BoundedSources} as a Flink 
Source.
  */
 public class BoundedSourceWrapper<OutputT>
     extends RichParallelSourceFunction<WindowedValue<OutputT>>

http://git-wip-us.apache.org/repos/asf/beam/blob/c2344e94/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index af955ba..68746b2 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -143,7 +143,7 @@ public class UnboundedSourceWrapper<
     } else {
 
       Coder<? extends UnboundedSource<OutputT, CheckpointMarkT>> sourceCoder =
-          SerializableCoder.of(new TypeDescriptor<UnboundedSource<OutputT, 
CheckpointMarkT>>() {
+          (Coder) SerializableCoder.of(new TypeDescriptor<UnboundedSource>() {
           });
 
       checkpointCoder = (ListCoder) ListCoder.of(KvCoder.of(sourceCoder, 
checkpointMarkCoder));

http://git-wip-us.apache.org/repos/asf/beam/blob/c2344e94/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
index 9e8261a..b0be98b 100644
--- 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
+++ 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
@@ -46,259 +46,291 @@ import 
org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.InstantiationUtil;
 import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 /**
  * Tests for {@link UnboundedSourceWrapper}.
  */
-@RunWith(Parameterized.class)
+@RunWith(Enclosed.class)
 public class UnboundedSourceWrapperTest {
 
-  private final int numTasks;
-  private final int numSplits;
+  /**
+   * Parameterized tests.
+   */
+  @RunWith(Parameterized.class)
+  public static class UnboundedSourceWrapperTestWithParams {
+    private final int numTasks;
+    private final int numSplits;
+
+    public UnboundedSourceWrapperTestWithParams(int numTasks, int numSplits) {
+      this.numTasks = numTasks;
+      this.numSplits = numSplits;
+    }
 
-  public UnboundedSourceWrapperTest(int numTasks, int numSplits) {
-    this.numTasks = numTasks;
-    this.numSplits = numSplits;
-  }
+    @Parameterized.Parameters
+    public static Collection<Object[]> data() {
+      /*
+       * Parameters for initializing the tests:
+       * {numTasks, numSplits}
+       * The test currently assumes powers of two for some assertions.
+       */
+      return Arrays.asList(new Object[][]{
+          {1, 1}, {1, 2}, {1, 4},
+          {2, 1}, {2, 2}, {2, 4},
+          {4, 1}, {4, 2}, {4, 4}
+      });
+    }
 
-  @Parameterized.Parameters
-  public static Collection<Object[]> data() {
-    /*
-     * Parameters for initializing the tests:
-     * {numTasks, numSplits}
-     * The test currently assumes powers of two for some assertions.
+    /**
+     * Creates a {@link UnboundedSourceWrapper} that has one or multiple 
readers per source.
+     * If numSplits > numTasks the source has one source will manage multiple 
readers.
      */
-    return Arrays.asList(new Object[][] {
-      {1, 1}, {1, 2}, {1, 4},
-      {2, 1}, {2, 2}, {2, 4},
-      {4, 1}, {4, 2}, {4, 4}
-    });
-  }
+    @Test
+    public void testReaders() throws Exception {
+      final int numElements = 20;
+      final Object checkpointLock = new Object();
+      PipelineOptions options = PipelineOptionsFactory.create();
+
+      // this source will emit exactly NUM_ELEMENTS across all parallel 
readers,
+      // afterwards it will stall. We check whether we also receive 
NUM_ELEMENTS
+      // elements later.
+      TestCountingSource source = new TestCountingSource(numElements);
+      UnboundedSourceWrapper<KV<Integer, Integer>, 
TestCountingSource.CounterMark> flinkWrapper =
+          new UnboundedSourceWrapper<>(options, source, numSplits);
+
+      assertEquals(numSplits, flinkWrapper.getSplitSources().size());
+
+      StreamSource<WindowedValue<
+          KV<Integer, Integer>>,
+          UnboundedSourceWrapper<
+              KV<Integer, Integer>,
+              TestCountingSource.CounterMark>> sourceOperator = new 
StreamSource<>(flinkWrapper);
+
+      setupSourceOperator(sourceOperator, numTasks);
+
+      try {
+        sourceOperator.open();
+        sourceOperator.run(checkpointLock,
+            new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
+              private int count = 0;
+
+              @Override
+              public void emitWatermark(Watermark watermark) {
+              }
 
-  /**
-   * Creates a {@link UnboundedSourceWrapper} that has one or multiple readers 
per source.
-   * If numSplits > numTasks the source has one source will manage multiple 
readers.
-   */
-  @Test
-  public void testReaders() throws Exception {
-    final int numElements = 20;
-    final Object checkpointLock = new Object();
-    PipelineOptions options = PipelineOptionsFactory.create();
-
-    // this source will emit exactly NUM_ELEMENTS across all parallel readers,
-    // afterwards it will stall. We check whether we also receive NUM_ELEMENTS
-    // elements later.
-    TestCountingSource source = new TestCountingSource(numElements);
-    UnboundedSourceWrapper<KV<Integer, Integer>, 
TestCountingSource.CounterMark> flinkWrapper =
-        new UnboundedSourceWrapper<>(options, source, numSplits);
-
-    assertEquals(numSplits, flinkWrapper.getSplitSources().size());
-
-    StreamSource<WindowedValue<
-        KV<Integer, Integer>>,
-        UnboundedSourceWrapper<
-            KV<Integer, Integer>,
-            TestCountingSource.CounterMark>> sourceOperator = new 
StreamSource<>(flinkWrapper);
-
-    setupSourceOperator(sourceOperator, numTasks);
-
-    try {
-      sourceOperator.open();
-      sourceOperator.run(checkpointLock,
-          new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
-            private int count = 0;
-
-            @Override
-            public void emitWatermark(Watermark watermark) {
-            }
-
-            @Override
-            public void collect(
-                StreamRecord<WindowedValue<KV<Integer, Integer>>> 
windowedValueStreamRecord) {
-
-              count++;
-              if (count >= numElements) {
-                throw new SuccessException();
+              @Override
+              public void collect(
+                  StreamRecord<WindowedValue<KV<Integer, Integer>>> 
windowedValueStreamRecord) {
+
+                count++;
+                if (count >= numElements) {
+                  throw new SuccessException();
+                }
               }
-            }
 
-            @Override
-            public void close() {
+              @Override
+              public void close() {
 
-            }
-          });
-    } catch (SuccessException e) {
+              }
+            });
+      } catch (SuccessException e) {
 
-      assertEquals(Math.max(1, numSplits / numTasks), 
flinkWrapper.getLocalSplitSources().size());
+        assertEquals(Math.max(1, numSplits / numTasks), 
flinkWrapper.getLocalSplitSources().size());
 
-      // success
-      return;
+        // success
+        return;
+      }
+      fail("Read terminated without producing expected number of outputs");
     }
-    fail("Read terminated without producing expected number of outputs");
-  }
 
-  /**
-   * Verify that snapshot/restore work as expected. We bring up a source and 
cancel
-   * after seeing a certain number of elements. Then we snapshot that source,
-   * bring up a completely new source that we restore from the snapshot and 
verify
-   * that we see all expected elements in the end.
-   */
-  @Test
-  public void testRestore() throws Exception {
-    final int numElements = 20;
-    final Object checkpointLock = new Object();
-    PipelineOptions options = PipelineOptionsFactory.create();
-
-    // this source will emit exactly NUM_ELEMENTS across all parallel readers,
-    // afterwards it will stall. We check whether we also receive NUM_ELEMENTS
-    // elements later.
-    TestCountingSource source = new TestCountingSource(numElements);
-    UnboundedSourceWrapper<KV<Integer, Integer>, 
TestCountingSource.CounterMark> flinkWrapper =
-        new UnboundedSourceWrapper<>(options, source, numSplits);
-
-    assertEquals(numSplits, flinkWrapper.getSplitSources().size());
-
-    StreamSource<
-        WindowedValue<KV<Integer, Integer>>,
-        UnboundedSourceWrapper<
-            KV<Integer, Integer>,
-            TestCountingSource.CounterMark>> sourceOperator = new 
StreamSource<>(flinkWrapper);
-
-    setupSourceOperator(sourceOperator, numTasks);
-
-    final Set<KV<Integer, Integer>> emittedElements = new HashSet<>();
-
-    boolean readFirstBatchOfElements = false;
-
-    try {
-      sourceOperator.open();
-      sourceOperator.run(checkpointLock,
-          new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
-            private int count = 0;
-
-            @Override
-            public void emitWatermark(Watermark watermark) {
-            }
-
-            @Override
-            public void collect(
-                StreamRecord<WindowedValue<KV<Integer, Integer>>> 
windowedValueStreamRecord) {
-
-              
emittedElements.add(windowedValueStreamRecord.getValue().getValue());
-              count++;
-              if (count >= numElements / 2) {
-                throw new SuccessException();
+    /**
+     * Verify that snapshot/restore work as expected. We bring up a source and 
cancel
+     * after seeing a certain number of elements. Then we snapshot that source,
+     * bring up a completely new source that we restore from the snapshot and 
verify
+     * that we see all expected elements in the end.
+     */
+    @Test
+    public void testRestore() throws Exception {
+      final int numElements = 20;
+      final Object checkpointLock = new Object();
+      PipelineOptions options = PipelineOptionsFactory.create();
+
+      // this source will emit exactly NUM_ELEMENTS across all parallel 
readers,
+      // afterwards it will stall. We check whether we also receive 
NUM_ELEMENTS
+      // elements later.
+      TestCountingSource source = new TestCountingSource(numElements);
+      UnboundedSourceWrapper<KV<Integer, Integer>, 
TestCountingSource.CounterMark> flinkWrapper =
+          new UnboundedSourceWrapper<>(options, source, numSplits);
+
+      assertEquals(numSplits, flinkWrapper.getSplitSources().size());
+
+      StreamSource<
+          WindowedValue<KV<Integer, Integer>>,
+          UnboundedSourceWrapper<
+              KV<Integer, Integer>,
+              TestCountingSource.CounterMark>> sourceOperator = new 
StreamSource<>(flinkWrapper);
+
+      setupSourceOperator(sourceOperator, numTasks);
+
+      final Set<KV<Integer, Integer>> emittedElements = new HashSet<>();
+
+      boolean readFirstBatchOfElements = false;
+
+      try {
+        sourceOperator.open();
+        sourceOperator.run(checkpointLock,
+            new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
+              private int count = 0;
+
+              @Override
+              public void emitWatermark(Watermark watermark) {
               }
-            }
 
-            @Override
-            public void close() {
+              @Override
+              public void collect(
+                  StreamRecord<WindowedValue<KV<Integer, Integer>>> 
windowedValueStreamRecord) {
 
-            }
-          });
-    } catch (SuccessException e) {
-      // success
-      readFirstBatchOfElements = true;
-    }
+                
emittedElements.add(windowedValueStreamRecord.getValue().getValue());
+                count++;
+                if (count >= numElements / 2) {
+                  throw new SuccessException();
+                }
+              }
+
+              @Override
+              public void close() {
+
+              }
+            });
+      } catch (SuccessException e) {
+        // success
+        readFirstBatchOfElements = true;
+      }
+
+      assertTrue("Did not successfully read first batch of elements.", 
readFirstBatchOfElements);
+
+      // draw a snapshot
+      byte[] snapshot = flinkWrapper.snapshotState(0, 0);
+
+      // test that finalizeCheckpoint on CheckpointMark is called
+      final ArrayList<Integer> finalizeList = new ArrayList<>();
+      TestCountingSource.setFinalizeTracker(finalizeList);
+      flinkWrapper.notifyCheckpointComplete(0);
+      assertEquals(flinkWrapper.getLocalSplitSources().size(), 
finalizeList.size());
+
+      // create a completely new source but restore from the snapshot
+      TestCountingSource restoredSource = new TestCountingSource(numElements);
+      UnboundedSourceWrapper<
+          KV<Integer, Integer>, TestCountingSource.CounterMark> 
restoredFlinkWrapper =
+          new UnboundedSourceWrapper<>(options, restoredSource, numSplits);
+
+      assertEquals(numSplits, restoredFlinkWrapper.getSplitSources().size());
+
+      StreamSource<
+          WindowedValue<KV<Integer, Integer>>,
+          UnboundedSourceWrapper<
+              KV<Integer, Integer>,
+              TestCountingSource.CounterMark>> restoredSourceOperator =
+          new StreamSource<>(restoredFlinkWrapper);
+
+      setupSourceOperator(restoredSourceOperator, numTasks);
+
+      // restore snapshot
+      restoredFlinkWrapper.restoreState(snapshot);
+
+      boolean readSecondBatchOfElements = false;
+
+      // run again and verify that we see the other elements
+      try {
+        restoredSourceOperator.open();
+        restoredSourceOperator.run(checkpointLock,
+            new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
+              private int count = 0;
+
+              @Override
+              public void emitWatermark(Watermark watermark) {
+              }
 
-    assertTrue("Did not successfully read first batch of elements.", 
readFirstBatchOfElements);
-
-    // draw a snapshot
-    byte[] snapshot = flinkWrapper.snapshotState(0, 0);
-
-    // test that finalizeCheckpoint on CheckpointMark is called
-    final ArrayList<Integer> finalizeList = new ArrayList<>();
-    TestCountingSource.setFinalizeTracker(finalizeList);
-    flinkWrapper.notifyCheckpointComplete(0);
-    assertEquals(flinkWrapper.getLocalSplitSources().size(), 
finalizeList.size());
-
-    // create a completely new source but restore from the snapshot
-    TestCountingSource restoredSource = new TestCountingSource(numElements);
-    UnboundedSourceWrapper<
-        KV<Integer, Integer>, TestCountingSource.CounterMark> 
restoredFlinkWrapper =
-        new UnboundedSourceWrapper<>(options, restoredSource, numSplits);
-
-    assertEquals(numSplits, restoredFlinkWrapper.getSplitSources().size());
-
-    StreamSource<
-        WindowedValue<KV<Integer, Integer>>,
-        UnboundedSourceWrapper<
-            KV<Integer, Integer>,
-            TestCountingSource.CounterMark>> restoredSourceOperator =
-        new StreamSource<>(restoredFlinkWrapper);
-
-    setupSourceOperator(restoredSourceOperator, numTasks);
-
-    // restore snapshot
-    restoredFlinkWrapper.restoreState(snapshot);
-
-    boolean readSecondBatchOfElements = false;
-
-    // run again and verify that we see the other elements
-    try {
-      restoredSourceOperator.open();
-      restoredSourceOperator.run(checkpointLock,
-          new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
-            private int count = 0;
-
-            @Override
-            public void emitWatermark(Watermark watermark) {
-            }
-
-            @Override
-            public void collect(
-                StreamRecord<WindowedValue<KV<Integer, Integer>>> 
windowedValueStreamRecord) {
-              
emittedElements.add(windowedValueStreamRecord.getValue().getValue());
-              count++;
-              if (count >= numElements / 2) {
-                throw new SuccessException();
+              @Override
+              public void collect(
+                  StreamRecord<WindowedValue<KV<Integer, Integer>>> 
windowedValueStreamRecord) {
+                
emittedElements.add(windowedValueStreamRecord.getValue().getValue());
+                count++;
+                if (count >= numElements / 2) {
+                  throw new SuccessException();
+                }
               }
-            }
 
-            @Override
-            public void close() {
+              @Override
+              public void close() {
 
-            }
-          });
-    } catch (SuccessException e) {
-      // success
-      readSecondBatchOfElements = true;
-    }
+              }
+            });
+      } catch (SuccessException e) {
+        // success
+        readSecondBatchOfElements = true;
+      }
 
-    assertEquals(Math.max(1, numSplits / numTasks), 
flinkWrapper.getLocalSplitSources().size());
+      assertEquals(Math.max(1, numSplits / numTasks), 
flinkWrapper.getLocalSplitSources().size());
 
-    assertTrue("Did not successfully read second batch of elements.", 
readSecondBatchOfElements);
+      assertTrue("Did not successfully read second batch of elements.", 
readSecondBatchOfElements);
 
-    // verify that we saw all NUM_ELEMENTS elements
-    assertTrue(emittedElements.size() == numElements);
-  }
+      // verify that we saw all NUM_ELEMENTS elements
+      assertTrue(emittedElements.size() == numElements);
+    }
 
-  @SuppressWarnings("unchecked")
-  private static <T> void setupSourceOperator(StreamSource<T, ?> operator, int 
numSubTasks) {
-    ExecutionConfig executionConfig = new ExecutionConfig();
-    StreamConfig cfg = new StreamConfig(new Configuration());
+    @SuppressWarnings("unchecked")
+    private static <T> void setupSourceOperator(StreamSource<T, ?> operator, 
int numSubTasks) {
+      ExecutionConfig executionConfig = new ExecutionConfig();
+      StreamConfig cfg = new StreamConfig(new Configuration());
 
-    cfg.setTimeCharacteristic(TimeCharacteristic.EventTime);
+      cfg.setTimeCharacteristic(TimeCharacteristic.EventTime);
 
-    Environment env = new DummyEnvironment("MockTwoInputTask", numSubTasks, 0);
+      Environment env = new DummyEnvironment("MockTwoInputTask", numSubTasks, 
0);
 
-    StreamTask<?, ?> mockTask = mock(StreamTask.class);
-    when(mockTask.getName()).thenReturn("Mock Task");
-    when(mockTask.getCheckpointLock()).thenReturn(new Object());
-    when(mockTask.getConfiguration()).thenReturn(cfg);
-    when(mockTask.getEnvironment()).thenReturn(env);
-    when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
-    when(mockTask.getAccumulatorMap())
-        .thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap());
+      StreamTask<?, ?> mockTask = mock(StreamTask.class);
+      when(mockTask.getName()).thenReturn("Mock Task");
+      when(mockTask.getCheckpointLock()).thenReturn(new Object());
+      when(mockTask.getConfiguration()).thenReturn(cfg);
+      when(mockTask.getEnvironment()).thenReturn(env);
+      when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
+      when(mockTask.getAccumulatorMap())
+          .thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap());
 
-    operator.setup(mockTask, cfg, (Output< StreamRecord<T>>) 
mock(Output.class));
+      operator.setup(mockTask, cfg, (Output<StreamRecord<T>>) 
mock(Output.class));
+    }
+
+    /**
+     * A special {@link RuntimeException} that we throw to signal that the 
test was successful.
+     */
+    private static class SuccessException extends RuntimeException {
+    }
   }
 
   /**
-   * A special {@link RuntimeException} that we throw to signal that the test 
was successful.
+   * Not parameterized tests.
    */
-  private static class SuccessException extends RuntimeException {}
+  public static class BasicTest {
+
+    /**
+     * Check serialization a {@link UnboundedSourceWrapper}.
+     */
+    @Test
+    public void testSerialization() throws Exception {
+      final int parallelism = 1;
+      final int numElements = 20;
+      PipelineOptions options = PipelineOptionsFactory.create();
+
+      TestCountingSource source = new TestCountingSource(numElements);
+      UnboundedSourceWrapper<KV<Integer, Integer>, 
TestCountingSource.CounterMark> flinkWrapper =
+          new UnboundedSourceWrapper<>(options, source, parallelism);
+
+      InstantiationUtil.serializeObject(flinkWrapper);
+    }
+
+  }
 }

Reply via email to