[ 
https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15367948#comment-15367948
 ] 

ASF GitHub Bot commented on FLINK-4019:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2214#discussion_r70103527
  
    --- Diff: docs/apis/streaming/connectors/kinesis.md ---
    @@ -146,6 +146,50 @@ Also note that Flink can only restart the topology if 
enough processing slots ar
     Therefore, if the topology fails due to loss of a TaskManager, there must 
still be enough slots available afterwards.
     Flink on YARN supports automatic restart of lost YARN containers.
     
    +#### Event Time for Consumed Records
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment()
    +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +If streaming topologies choose to use the [event time 
notion]({{site.baseurl}}/apis/streaming/event_time.html) for record
    +timestamps, an *approximate arrival timestamp* will be used by default. 
This timestamp is attached to records by Kinesis once they
    +were successfully received and stored by streams. Note that this timestamp 
is typically referred to as a Kinesis server-side
    +timestamp, and there are no guarantees about the accuracy or order 
correctness (i.e., the timestamps may not always be
    +ascending).
    +
    +Users can choose to override this default with a custom timestamp, as 
described [here]({{ site.baseurl 
}}/apis/streaming/event_timestamps_watermarks.html),
    +or use one from the [predefined ones]({{ site.baseurl 
}}/apis/streaming/event_timestamp_extractors.html). After doing so,
    +it can be passed to the consumer in the following way:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
    +    "kinesis_stream_name", new SimpleStringSchema(), 
kinesisConsumerConfig));
    +kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner());
    --- End diff --
    
    There is one minor thing here, you have to do kinesis = kinesis.assignTS() 
in order to work properly.
    But I'll fix it while merging.


> Expose approximateArrivalTimestamp through the KinesisDeserializationSchema 
> interface
> -------------------------------------------------------------------------------------
>
>                 Key: FLINK-4019
>                 URL: https://issues.apache.org/jira/browse/FLINK-4019
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Streaming Connectors
>    Affects Versions: 1.1.0
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>             Fix For: 1.1.0
>
>
> Amazon's Record class also gives information about the timestamp of when 
> Kinesis successfully receives the record: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp().
> This should be useful info for users and should be exposed through the 
> deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to