[
https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15368921#comment-15368921
]
ASF GitHub Bot commented on FLINK-4019:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/2214#discussion_r70161839
--- Diff: docs/apis/streaming/connectors/kinesis.md ---
@@ -146,6 +146,50 @@ Also note that Flink can only restart the topology if
enough processing slots ar
Therefore, if the topology fails due to loss of a TaskManager, there must
still be enough slots available afterwards.
Flink on YARN supports automatic restart of lost YARN containers.
+#### Event Time for Consumed Records
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+{% endhighlight %}
+</div>
+</div>
+
+If streaming topologies choose to use the [event time
notion]({{site.baseurl}}/apis/streaming/event_time.html) for record
+timestamps, an *approximate arrival timestamp* will be used by default.
This timestamp is attached to records by Kinesis once they
+were successfully received and stored by streams. Note that this timestamp
is typically referred to as a Kinesis server-side
+timestamp, and there are no guarantees about the accuracy or order
correctness (i.e., the timestamps may not always be
+ascending).
+
+Users can choose to override this default with a custom timestamp, as
described [here]({{ site.baseurl
}}/apis/streaming/event_timestamps_watermarks.html),
+or use one from the [predefined ones]({{ site.baseurl
}}/apis/streaming/event_timestamp_extractors.html). After doing so,
+it can be passed to the consumer in the following way:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
+ "kinesis_stream_name", new SimpleStringSchema(),
kinesisConsumerConfig));
+kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner());
--- End diff --
Thanks Robert.
I'm not quite sure about the problem with using
`assignTimestampsAndWatermarks()` here, can you explain? I looked at the code,
and from my understanding there's not much difference with
`assignTimestamps()` except that `assignTimestamps()` is deprecated.
> Expose approximateArrivalTimestamp through the KinesisDeserializationSchema
> interface
> -------------------------------------------------------------------------------------
>
> Key: FLINK-4019
> URL: https://issues.apache.org/jira/browse/FLINK-4019
> Project: Flink
> Issue Type: Sub-task
> Components: Streaming Connectors
> Affects Versions: 1.1.0
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Amazon's Record class also gives information about the timestamp of when
> Kinesis successfully receives the record:
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp().
> This should be useful info for users and should be exposed through the
> deserialization schema.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)