Repository: incubator-beam
Updated Branches:
  refs/heads/master ee1a3bcfb -> 1a7cd4112


Allow for custom timestamp/watermark function in FlinkPipelineRunner

Added new "of" signature and constructor for UnboundedFlinkSource to
allow event timestamping


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9000d95d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9000d95d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9000d95d

Branch: refs/heads/master
Commit: 9000d95d2b34ab45f799aedb140710986ff19452
Parents: ee1a3bc
Author: David Desberg <david.desb...@uber.com>
Authored: Mon Jul 11 12:24:18 2016 -0700
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Wed Jul 13 14:15:54 2016 +0200

----------------------------------------------------------------------
 .../FlinkStreamingTransformTranslators.java     | 10 ++++++---
 .../streaming/io/UnboundedFlinkSource.java      | 23 ++++++++++++++++++++
 2 files changed, 30 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9000d95d/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 5d04068..fa6b387 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -70,7 +70,9 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.util.Collector;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -252,6 +254,8 @@ public class FlinkStreamingTransformTranslators {
       if (transform.getSource().getClass().equals(UnboundedFlinkSource.class)) 
{
         @SuppressWarnings("unchecked")
         UnboundedFlinkSource<T> flinkSourceFunction = 
(UnboundedFlinkSource<T>) transform.getSource();
+        final AssignerWithPeriodicWatermarks<T> flinkAssigner = 
flinkSourceFunction.getFlinkTimestampAssigner();
+
         DataStream<T> flinkSource = context.getExecutionEnvironment()
             .addSource(flinkSourceFunction.getFlinkSource());
 
@@ -260,17 +264,17 @@ public class FlinkStreamingTransformTranslators {
               context.getExecutionEnvironment().getConfig()));
 
         source = flinkSource
+            .assignTimestampsAndWatermarks(flinkAssigner)
             .flatMap(new FlatMapFunction<T, WindowedValue<T>>() {
               @Override
               public void flatMap(T s, Collector<WindowedValue<T>> collector) 
throws Exception {
                 collector.collect(
                     WindowedValue.of(
                         s,
-                        Instant.now(),
+                        new Instant(flinkAssigner.extractTimestamp(s, -1)),
                         GlobalWindow.INSTANCE,
                         PaneInfo.NO_FIRING));
-              }
-            }).assignTimestampsAndWatermarks(new 
IngestionTimeExtractor<WindowedValue<T>>());
+              }});
       } else {
         try {
           transform.getSource();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9000d95d/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
index 94b73ce..716ca30 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
@@ -23,6 +23,8 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 
 import java.util.List;
@@ -40,14 +42,26 @@ public class UnboundedFlinkSource<T> extends 
UnboundedSource<T, UnboundedSource.
   /** Coder set during translation */
   private Coder<T> coder;
 
+  /** Timestamp / watermark assigner for source; defaults to ingestion time */
+  private AssignerWithPeriodicWatermarks<T> flinkTimestampAssigner = new 
IngestionTimeExtractor<T>();
+
   public UnboundedFlinkSource(SourceFunction<T> source) {
     flinkSource = checkNotNull(source);
   }
 
+  public UnboundedFlinkSource(SourceFunction<T> source, 
AssignerWithPeriodicWatermarks<T> timestampAssigner) {
+    flinkSource = checkNotNull(source);
+    flinkTimestampAssigner = checkNotNull(timestampAssigner);
+  }
+
   public SourceFunction<T> getFlinkSource() {
     return this.flinkSource;
   }
 
+  public AssignerWithPeriodicWatermarks<T> getFlinkTimestampAssigner() {
+    return flinkTimestampAssigner;
+  }
+
   @Override
   public List<? extends UnboundedSource<T, UnboundedSource.CheckpointMark>> 
generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws 
Exception {
     throw new RuntimeException("Flink Sources are supported only when running 
with the FlinkRunner.");
@@ -79,6 +93,10 @@ public class UnboundedFlinkSource<T> extends 
UnboundedSource<T, UnboundedSource.
     this.coder = coder;
   }
 
+  public void setFlinkTimestampAssigner(AssignerWithPeriodicWatermarks<T> 
flinkTimestampAssigner) {
+    this.flinkTimestampAssigner = flinkTimestampAssigner;
+  }
+
   /**
    * Creates a new unbounded source from a Flink source.
    * @param flinkSource The Flink source function
@@ -88,4 +106,9 @@ public class UnboundedFlinkSource<T> extends 
UnboundedSource<T, UnboundedSource.
   public static <T> UnboundedSource<T, UnboundedSource.CheckpointMark> 
of(SourceFunction<T> flinkSource) {
     return new UnboundedFlinkSource<>(flinkSource);
   }
+
+  public static <T> UnboundedSource<T, UnboundedSource.CheckpointMark> of(
+          SourceFunction<T> flinkSource, AssignerWithPeriodicWatermarks<T> 
flinkTimestampAssigner) {
+    return new UnboundedFlinkSource<>(flinkSource, flinkTimestampAssigner);
+  }
 }

Reply via email to