Repository: incubator-beam Updated Branches: refs/heads/master a9c46057e -> 0f137169e
[flink] fix UnboundedFlinkSource wrapper - remove unnecessary PipelineOptions cache - use the correct interface types - improve Kafka example Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0f137169 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0f137169 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0f137169 Branch: refs/heads/master Commit: 0f137169e4c2cd8d3e5a86c91bc2f401d276e8ed Parents: a9c4605 Author: Maximilian Michels <[email protected]> Authored: Thu Mar 17 12:26:03 2016 +0100 Committer: Maximilian Michels <[email protected]> Committed: Thu Mar 17 12:27:37 2016 +0100 ---------------------------------------------------------------------- .../KafkaWindowedWordCountExample.java | 7 ++-- .../streaming/io/UnboundedFlinkSource.java | 37 ++++++++++---------- 2 files changed, 22 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f137169/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java index 55cdc22..3942d0d 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java @@ -22,7 +22,6 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.io.Unbounded import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.io.Read; import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.io.UnboundedSource; import com.google.cloud.dataflow.sdk.options.Default; import com.google.cloud.dataflow.sdk.options.Description; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; @@ -30,7 +29,7 @@ import com.google.cloud.dataflow.sdk.transforms.*; import com.google.cloud.dataflow.sdk.transforms.windowing.*; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PCollection; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.joda.time.Duration; @@ -121,12 +120,12 @@ public class KafkaWindowedWordCountExample { // this is the Flink consumer that reads the input to // the program from a kafka topic. - FlinkKafkaConsumer082 kafkaConsumer = new FlinkKafkaConsumer082<>( + FlinkKafkaConsumer08<String> kafkaConsumer = new FlinkKafkaConsumer08<>( options.getKafkaTopic(), new SimpleStringSchema(), p); PCollection<String> words = pipeline - .apply(Read.from(new UnboundedFlinkSource<String, UnboundedSource.CheckpointMark>(options, kafkaConsumer)).named("StreamingWordCount")) + .apply(Read.named("StreamingWordCount").from(UnboundedFlinkSource.of(kafkaConsumer))) .apply(ParDo.of(new ExtractWordsFn())) .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowSize()))) .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f137169/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java index 2857efd..05a8956 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java @@ -22,7 +22,9 @@ import com.google.cloud.dataflow.sdk.coders.Coder; import com.google.cloud.dataflow.sdk.io.UnboundedSource; import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.common.base.Preconditions; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; import javax.annotation.Nullable; import java.util.List; @@ -31,52 +33,51 @@ import java.util.List; * A wrapper translating Flink Sources implementing the {@link RichParallelSourceFunction} interface, into * unbounded Beam sources (see {@link UnboundedSource}). * */ -public class UnboundedFlinkSource<T, C extends UnboundedSource.CheckpointMark> extends UnboundedSource<T, C> { +public class UnboundedFlinkSource<T> extends UnboundedSource<T, UnboundedSource.CheckpointMark> { - private final PipelineOptions options; - private final RichParallelSourceFunction<T> flinkSource; + private final SourceFunction<T> flinkSource; - public UnboundedFlinkSource(PipelineOptions pipelineOptions, RichParallelSourceFunction<T> source) { - if(!pipelineOptions.getRunner().equals(FlinkPipelineRunner.class)) { - throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); - } - options = Preconditions.checkNotNull(pipelineOptions); + public UnboundedFlinkSource(SourceFunction<T> source) { flinkSource = Preconditions.checkNotNull(source); - validate(); } - public RichParallelSourceFunction<T> getFlinkSource() { + public SourceFunction<T> getFlinkSource() { return this.flinkSource; } @Override - public List<? extends UnboundedSource<T, C>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception { + public List<? extends UnboundedSource<T, UnboundedSource.CheckpointMark>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception { throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); } @Override - public UnboundedReader<T> createReader(PipelineOptions options, @Nullable C checkpointMark) { + public UnboundedReader<T> createReader(PipelineOptions options, @Nullable CheckpointMark checkpointMark) { throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); } @Nullable @Override - public Coder<C> getCheckpointMarkCoder() { + public Coder<UnboundedSource.CheckpointMark> getCheckpointMarkCoder() { throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); } @Override public void validate() { - Preconditions.checkNotNull(options); - Preconditions.checkNotNull(flinkSource); - if(!options.getRunner().equals(FlinkPipelineRunner.class)) { - throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); - } } @Override public Coder<T> getDefaultOutputCoder() { throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); } + + /** + * Creates a new unbounded source from a Flink source. + * @param flinkSource The Flink source function + * @param <T> The type that the source function produces. + * @return The wrapped source function. + */ + public static <T> UnboundedSource<T, UnboundedSource.CheckpointMark> of(SourceFunction<T> flinkSource) { + return new UnboundedFlinkSource<>(flinkSource); + } }
