Repository: flink
Updated Branches:
  refs/heads/master d34bdaf7f -> 35b4da273


[FLINK-4121] Add timeunit (ms) to docs for timestamps and watermarks

This closes #2165


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/35b4da27
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/35b4da27
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/35b4da27

Branch: refs/heads/master
Commit: 35b4da273f755308b82468ced539c380444efd37
Parents: d34bdaf
Author: Jonas Traub <[email protected]>
Authored: Mon Jun 27 11:06:16 2016 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Mon Jun 27 12:09:06 2016 +0200

----------------------------------------------------------------------
 .../streaming/event_timestamps_watermarks.md    | 33 ++++++++++----------
 .../streaming/api/watermark/Watermark.java      |  4 +--
 2 files changed, 19 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/35b4da27/docs/apis/streaming/event_timestamps_watermarks.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/event_timestamps_watermarks.md 
b/docs/apis/streaming/event_timestamps_watermarks.md
index 493e11a..05c9f51 100644
--- a/docs/apis/streaming/event_timestamps_watermarks.md
+++ b/docs/apis/streaming/event_timestamps_watermarks.md
@@ -48,14 +48,13 @@ 
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 </div>
 </div>
 
-
 ## Assigning Timestamps
 
 In order to work with *Event Time*, Flink needs to know the events' 
*timestamps*, meaning each element in the
 stream needs to get its event timestamp *assigned*. That happens usually by 
accessing/extracting the
 timestamp from some field in the element.
 
-Timestamp assignment goes hand-in-hand with generating watermarks, which tell 
the system about 
+Timestamp assignment goes hand-in-hand with generating watermarks, which tell 
the system about
 the progress in event time.
 
 There are two ways to assign timestamps and generate Watermarks:
@@ -63,6 +62,8 @@ There are two ways to assign timestamps and generate 
Watermarks:
   1. Directly in the data stream source
   2. Via a timestamp assigner / watermark generator: in Flink timestamp 
assigners also define the watermarks to be emitted
 
+<span class="label label-danger">Attention</span> Both timestamps and 
watermarks are specified as
+millliseconds since the Java epoch of 1970-01-01T00:00:00Z.
 
 ### Source Functions with Timestamps and Watermarks
 
@@ -116,18 +117,18 @@ those timestamps will be overwritten by the 
TimestampAssigner. Similarly, Waterm
 Timestamp Assigners take a stream and produce a new stream with timestamped 
elements and watermarks. If the
 original stream had timestamps and/or watermarks already, the timestamp 
assigner overwrites them.
 
-The timestamp assigners usually are specified immediately after the data 
source but it is not strictly required to do so. 
+The timestamp assigners usually are specified immediately after the data 
source but it is not strictly required to do so.
 A common pattern is, for example, to parse (*MapFunction*) and filter 
(*FilterFunction*) before the timestamp assigner.
 In any case, the timestamp assigner needs to be specified before the first 
operation on event time
-(such as the first window operation). As a special case, when using Kafka as 
the source of a streaming job, 
-Flink allows the specification of a timestamp assigner / watermark emitter 
inside 
-the source (or consumer) itself. More information on how to do so can be found 
in the 
-[Kafka Connector documentation]({{ site.baseurl 
}}/apis/streaming/connectors/kafka.html). 
+(such as the first window operation). As a special case, when using Kafka as 
the source of a streaming job,
+Flink allows the specification of a timestamp assigner / watermark emitter 
inside
+the source (or consumer) itself. More information on how to do so can be found 
in the
+[Kafka Connector documentation]({{ site.baseurl 
}}/apis/streaming/connectors/kafka.html).
 
 
 **NOTE:** The remainder of this section presents the main interfaces a 
programmer has
-to implement in order to create her own timestamp extractors/watermark 
emitters. 
-To see the pre-implemented extractors that ship with Flink, please refer to 
the 
+to implement in order to create her own timestamp extractors/watermark 
emitters.
+To see the pre-implemented extractors that ship with Flink, please refer to the
 [Pre-defined Timestamp Extractors / Watermark Emitters]({{ site.baseurl 
}}/apis/streaming/event_timestamp_extractors.html) page.
 
 <div class="codetabs" markdown="1">
@@ -137,7 +138,7 @@ final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEn
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 
 DataStream<MyEvent> stream = env.readFile(
-        myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100, 
+        myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
         FilePathFilter.createDefaultFilter(), typeInfo);
 
 DataStream<MyEvent> withTimestampsAndWatermarks = stream
@@ -157,7 +158,7 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
 val stream: DataStream[MyEvent] = env.readFile(
-         myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100, 
+         myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
          FilePathFilter.createDefaultFilter());
 
 val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
@@ -176,7 +177,7 @@ withTimestampsAndWatermarks
 
 #### **With Periodic Watermarks**
 
-The `AssignerWithPeriodicWatermarks` assigns timestamps and generates 
watermarks periodically (possibly depending 
+The `AssignerWithPeriodicWatermarks` assigns timestamps and generates 
watermarks periodically (possibly depending
 on the stream elements, or purely based on processing time).
 
 The interval (every *n* milliseconds) in which the watermark will be generated 
is defined via
@@ -202,7 +203,7 @@ public class BoundedOutOfOrdernessGenerator extends 
AssignerWithPeriodicWatermar
 
     @Override
     public long extractTimestamp(MyEvent element, long 
previousElementTimestamp) {
-        long timestamp = element.getCreationTime(); 
+        long timestamp = element.getCreationTime();
         currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
         return timestamp;
     }
@@ -229,7 +230,7 @@ public class TimeLagWatermarkGenerator extends 
AssignerWithPeriodicWatermarks<My
 
        @Override
        public Watermark getCurrentWatermark() {
-               // return the watermark as current time minus the maximum time 
lag 
+               // return the watermark as current time minus the maximum time 
lag
                return new Watermark(System.currentTimeMillis() - maxTimeLag);
        }
 }
@@ -249,7 +250,7 @@ class BoundedOutOfOrdernessGenerator extends 
AssignerWithPeriodicWatermarks[MyEv
     var currentMaxTimestamp: Long;
 
     override def extractTimestamp(element: MyEvent, previousElementTimestamp: 
Long): Long = {
-        val timestamp = element.getCreationTime() 
+        val timestamp = element.getCreationTime()
         currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
         timestamp;
     }
@@ -273,7 +274,7 @@ class TimeLagWatermarkGenerator extends 
AssignerWithPeriodicWatermarks[MyEvent]
     }
 
     override def getCurrentWatermark(): Watermark = {
-        // return the watermark as current time minus the maximum time lag 
+        // return the watermark as current time minus the maximum time lag
         new Watermark(System.currentTimeMillis() - maxTimeLag)
     }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/35b4da27/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
index cb9eb99..dc12d93 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
@@ -48,11 +48,11 @@ public final class Watermark extends StreamElement {
        
        // 
------------------------------------------------------------------------
        
-       /** The timestamp of the watermark */
+       /** The timestamp of the watermark in milliseconds*/
        private final long timestamp;
 
        /**
-        * Creates a new watermark with the given timestamp.
+        * Creates a new watermark with the given timestamp in milliseconds.
         */
        public Watermark(long timestamp) {
                this.timestamp = timestamp;

Reply via email to