[BEAM-333][flink] make bounded/unbounded sources stoppable

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7e2820b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7e2820b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7e2820b0

Branch: refs/heads/master
Commit: 7e2820b06c19d958cbf7316ae28def7fe796a360
Parents: be689df
Author: Maximilian Michels <m...@apache.org>
Authored: Tue Sep 6 16:38:43 2016 +0200
Committer: Maximilian Michels <m...@apache.org>
Committed: Fri Sep 9 16:06:42 2016 +0200

----------------------------------------------------------------------
 .../wrappers/streaming/io/BoundedSourceWrapper.java         | 9 ++++++++-
 .../wrappers/streaming/io/UnboundedSourceWrapper.java       | 8 +++++++-
 2 files changed, 15 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e2820b0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
index 3cb93c0..df49a49 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.flink.api.common.functions.StoppableFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -37,7 +38,8 @@ import org.slf4j.LoggerFactory;
  * Wrapper for executing {@link BoundedSource UnboundedSources} as a Flink 
Source.
  */
 public class BoundedSourceWrapper<OutputT>
-    extends RichParallelSourceFunction<WindowedValue<OutputT>> {
+    extends RichParallelSourceFunction<WindowedValue<OutputT>>
+    implements StoppableFunction {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(BoundedSourceWrapper.class);
 
@@ -206,6 +208,11 @@ public class BoundedSourceWrapper<OutputT>
     isRunning = false;
   }
 
+  @Override
+  public void stop() {
+    this.isRunning = false;
+  }
+
   /**
    * Visible so that we can check this in tests. Must not be used for anything 
else.
    */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e2820b0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 8647322..debf52f 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.flink.api.common.functions.StoppableFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
@@ -53,7 +54,7 @@ import org.slf4j.LoggerFactory;
 public class UnboundedSourceWrapper<
     OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark>
     extends RichParallelSourceFunction<WindowedValue<OutputT>>
-    implements Triggerable, Checkpointed<byte[]> {
+    implements Triggerable, StoppableFunction, Checkpointed<byte[]> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(UnboundedSourceWrapper.class);
 
@@ -311,6 +312,11 @@ public class UnboundedSourceWrapper<
   }
 
   @Override
+  public void stop() {
+    isRunning = false;
+  }
+
+  @Override
   public byte[] snapshotState(long l, long l1) throws Exception {
 
     if (checkpointCoder == null) {

Reply via email to