Repository: incubator-beam Updated Branches: refs/heads/master 0f137169e -> ef1e32dee
[flink] improvements to the Kafka Example - use timestamp extractor after ingestion - fix coder runtime exception - correct logging Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ef1e32de Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ef1e32de Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ef1e32de Branch: refs/heads/master Commit: ef1e32deefb9886584556c7125e87b2873c63ebf Parents: 0f13716 Author: Maximilian Michels <[email protected]> Authored: Thu Mar 17 14:49:09 2016 +0100 Committer: Maximilian Michels <[email protected]> Committed: Thu Mar 17 14:53:41 2016 +0100 ---------------------------------------------------------------------- .../examples/streaming/KafkaWindowedWordCountExample.java | 2 +- .../flink/translation/FlinkStreamingTransformTranslators.java | 3 ++- .../wrappers/streaming/io/UnboundedFlinkSource.java | 7 +++---- runners/flink/runner/src/main/resources/log4j.properties | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef1e32de/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java index 3942d0d..8fca1d4 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java @@ -103,7 +103,7 @@ public class KafkaWindowedWordCountExample { public static void main(String[] args) { PipelineOptionsFactory.register(KafkaStreamingWordCountOptions.class); KafkaStreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).as(KafkaStreamingWordCountOptions.class); - options.setJobName("KafkaExample"); + options.setJobName("KafkaExample - WindowSize: " + options.getWindowSize() + " seconds"); options.setStreaming(true); options.setCheckpointingInterval(1000L); options.setNumberOfExecutionRetries(5); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef1e32de/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java index bdefeaf..2b9b0ee 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java @@ -44,6 +44,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.streaming.api.datastream.*; +import org.apache.flink.streaming.api.functions.IngestionTimeExtractor; import org.apache.flink.util.Collector; import org.joda.time.Instant; import org.slf4j.Logger; @@ -179,7 +180,7 @@ public class FlinkStreamingTransformTranslators { public void flatMap(String s, Collector<WindowedValue<String>> collector) throws Exception { collector.collect(WindowedValue.<String>of(s, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); } - }); + }).assignTimestampsAndWatermarks(new IngestionTimeExtractor()); } else { source = context.getExecutionEnvironment() .addSource(new UnboundedSourceWrapper<>(context.getPipelineOptions(), transform)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef1e32de/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java index 05a8956..82984cb 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java @@ -17,12 +17,10 @@ */ package org.apache.beam.runners.flink.translation.wrappers.streaming.io; -import org.apache.beam.runners.flink.FlinkPipelineRunner; import com.google.cloud.dataflow.sdk.coders.Coder; import com.google.cloud.dataflow.sdk.io.UnboundedSource; import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.common.base.Preconditions; -import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; @@ -30,7 +28,7 @@ import javax.annotation.Nullable; import java.util.List; /** - * A wrapper translating Flink Sources implementing the {@link RichParallelSourceFunction} interface, into + * A wrapper translating Flink Sources implementing the {@link SourceFunction} interface, into * unbounded Beam sources (see {@link UnboundedSource}). * */ public class UnboundedFlinkSource<T> extends UnboundedSource<T, UnboundedSource.CheckpointMark> { @@ -68,7 +66,8 @@ public class UnboundedFlinkSource<T> extends UnboundedSource<T, UnboundedSource. @Override public Coder<T> getDefaultOutputCoder() { - throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); + // The coder is specified in the Flink source + return null; } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef1e32de/runners/flink/runner/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/resources/log4j.properties b/runners/flink/runner/src/main/resources/log4j.properties index 4daaad1..4b6a708 100644 --- a/runners/flink/runner/src/main/resources/log4j.properties +++ b/runners/flink/runner/src/main/resources/log4j.properties @@ -16,7 +16,7 @@ # limitations under the License. ################################################################################ -log4j.rootLogger=INFO,console +log4j.rootLogger=OFF,console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout
