Repository: flink Updated Branches: refs/heads/master d34bdaf7f -> 35b4da273
[FLINK-4121] Add timeunit (ms) to docs for timestamps and watermarks This closes #2165 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/35b4da27 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/35b4da27 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/35b4da27 Branch: refs/heads/master Commit: 35b4da273f755308b82468ced539c380444efd37 Parents: d34bdaf Author: Jonas Traub <[email protected]> Authored: Mon Jun 27 11:06:16 2016 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Mon Jun 27 12:09:06 2016 +0200 ---------------------------------------------------------------------- .../streaming/event_timestamps_watermarks.md | 33 ++++++++++---------- .../streaming/api/watermark/Watermark.java | 4 +-- 2 files changed, 19 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/35b4da27/docs/apis/streaming/event_timestamps_watermarks.md ---------------------------------------------------------------------- diff --git a/docs/apis/streaming/event_timestamps_watermarks.md b/docs/apis/streaming/event_timestamps_watermarks.md index 493e11a..05c9f51 100644 --- a/docs/apis/streaming/event_timestamps_watermarks.md +++ b/docs/apis/streaming/event_timestamps_watermarks.md @@ -48,14 +48,13 @@ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) </div> </div> - ## Assigning Timestamps In order to work with *Event Time*, Flink needs to know the events' *timestamps*, meaning each element in the stream needs to get its event timestamp *assigned*. That happens usually by accessing/extracting the timestamp from some field in the element. -Timestamp assignment goes hand-in-hand with generating watermarks, which tell the system about +Timestamp assignment goes hand-in-hand with generating watermarks, which tell the system about the progress in event time. There are two ways to assign timestamps and generate Watermarks: @@ -63,6 +62,8 @@ There are two ways to assign timestamps and generate Watermarks: 1. Directly in the data stream source 2. Via a timestamp assigner / watermark generator: in Flink timestamp assigners also define the watermarks to be emitted +<span class="label label-danger">Attention</span> Both timestamps and watermarks are specified as +millliseconds since the Java epoch of 1970-01-01T00:00:00Z. ### Source Functions with Timestamps and Watermarks @@ -116,18 +117,18 @@ those timestamps will be overwritten by the TimestampAssigner. Similarly, Waterm Timestamp Assigners take a stream and produce a new stream with timestamped elements and watermarks. If the original stream had timestamps and/or watermarks already, the timestamp assigner overwrites them. -The timestamp assigners usually are specified immediately after the data source but it is not strictly required to do so. +The timestamp assigners usually are specified immediately after the data source but it is not strictly required to do so. A common pattern is, for example, to parse (*MapFunction*) and filter (*FilterFunction*) before the timestamp assigner. In any case, the timestamp assigner needs to be specified before the first operation on event time -(such as the first window operation). As a special case, when using Kafka as the source of a streaming job, -Flink allows the specification of a timestamp assigner / watermark emitter inside -the source (or consumer) itself. More information on how to do so can be found in the -[Kafka Connector documentation]({{ site.baseurl }}/apis/streaming/connectors/kafka.html). +(such as the first window operation). As a special case, when using Kafka as the source of a streaming job, +Flink allows the specification of a timestamp assigner / watermark emitter inside +the source (or consumer) itself. More information on how to do so can be found in the +[Kafka Connector documentation]({{ site.baseurl }}/apis/streaming/connectors/kafka.html). **NOTE:** The remainder of this section presents the main interfaces a programmer has -to implement in order to create her own timestamp extractors/watermark emitters. -To see the pre-implemented extractors that ship with Flink, please refer to the +to implement in order to create her own timestamp extractors/watermark emitters. +To see the pre-implemented extractors that ship with Flink, please refer to the [Pre-defined Timestamp Extractors / Watermark Emitters]({{ site.baseurl }}/apis/streaming/event_timestamp_extractors.html) page. <div class="codetabs" markdown="1"> @@ -137,7 +138,7 @@ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEn env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<MyEvent> stream = env.readFile( - myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100, + myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100, FilePathFilter.createDefaultFilter(), typeInfo); DataStream<MyEvent> withTimestampsAndWatermarks = stream @@ -157,7 +158,7 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val stream: DataStream[MyEvent] = env.readFile( - myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100, + myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100, FilePathFilter.createDefaultFilter()); val withTimestampsAndWatermarks: DataStream[MyEvent] = stream @@ -176,7 +177,7 @@ withTimestampsAndWatermarks #### **With Periodic Watermarks** -The `AssignerWithPeriodicWatermarks` assigns timestamps and generates watermarks periodically (possibly depending +The `AssignerWithPeriodicWatermarks` assigns timestamps and generates watermarks periodically (possibly depending on the stream elements, or purely based on processing time). The interval (every *n* milliseconds) in which the watermark will be generated is defined via @@ -202,7 +203,7 @@ public class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermar @Override public long extractTimestamp(MyEvent element, long previousElementTimestamp) { - long timestamp = element.getCreationTime(); + long timestamp = element.getCreationTime(); currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return timestamp; } @@ -229,7 +230,7 @@ public class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks<My @Override public Watermark getCurrentWatermark() { - // return the watermark as current time minus the maximum time lag + // return the watermark as current time minus the maximum time lag return new Watermark(System.currentTimeMillis() - maxTimeLag); } } @@ -249,7 +250,7 @@ class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEv var currentMaxTimestamp: Long; override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = { - val timestamp = element.getCreationTime() + val timestamp = element.getCreationTime() currentMaxTimestamp = max(timestamp, currentMaxTimestamp) timestamp; } @@ -273,7 +274,7 @@ class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] } override def getCurrentWatermark(): Watermark = { - // return the watermark as current time minus the maximum time lag + // return the watermark as current time minus the maximum time lag new Watermark(System.currentTimeMillis() - maxTimeLag) } } http://git-wip-us.apache.org/repos/asf/flink/blob/35b4da27/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java index cb9eb99..dc12d93 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java @@ -48,11 +48,11 @@ public final class Watermark extends StreamElement { // ------------------------------------------------------------------------ - /** The timestamp of the watermark */ + /** The timestamp of the watermark in milliseconds*/ private final long timestamp; /** - * Creates a new watermark with the given timestamp. + * Creates a new watermark with the given timestamp in milliseconds. */ public Watermark(long timestamp) { this.timestamp = timestamp;
