[
https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15429202#comment-15429202
]
ASF GitHub Bot commented on FLINK-4035:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/2369#discussion_r75572263
--- Diff: docs/apis/streaming/connectors/kafka.md ---
@@ -291,3 +301,35 @@ higher value.
There is currently no transactional producer for Kafka, so Flink can not
guarantee exactly-once delivery
into a Kafka topic.
+### Using Kafka timestamps and Flink event time in Kafka 0.10
+
+Since Apache Kafka 0.10., Kafka's messages can carry
[timestamps](https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message),
indicating
+the time the event has occurred (see ["event time" in Apache
Flink](../event_time.html)) or the time when the message
+has been written to the Kafka broker.
+
+The `FlinkKafkaConsumer010` will emit records with the timestamp attached,
if the time characteristic in Flink is
+set to `TimeCharacteristic.EventTime`
(`StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)`).
+
+The Kafka consumer does not emit watermarks. To emit watermarks, the same
mechanisms as described above in
+"Kafka Consumers and Timestamp Extraction/Watermark Emission" using the
`assignTimestampsAndWatermarks` method are applicable.
+
+There is no need to define a timestamp extractor when using the timestamps
from Kafka. The `previousElementTimestamp` argument of
+the `extractTimestamp()` method contains the timestamp carried by the
Kafka message.
+
+A timestamp extractor for a Kafka consumer would look like this:
+{% highlight java %}
+public long extractTimestamp(Long element, long previousElementTimestamp) {
+ return previousElementTimestamp;
+}
+{% endhighlight %}
+
+
+
+The `FlinkKafkaProducer010` only emits the record timestamp, if
`setWriteTimestampToKafka(true)` is set.
+
+{% highlight java %}
+FlinkKafkaProducer010.FlinkKafkaProducer010Configuration config =
FlinkKafkaProducer010.writeToKafka(streamWithTimestamps, topic, new
SimpleStringSchema(), standardProps);
+config.setWriteTimestampToKafka(true);
+{% endhighlight %}
--- End diff --
I find the usage pattern of this a bit unfamiliar. I've explained this in
inline comments of the `FlinkKafkaProducer010` class.
> Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
> ---------------------------------------------------
>
> Key: FLINK-4035
> URL: https://issues.apache.org/jira/browse/FLINK-4035
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Affects Versions: 1.0.3
> Reporter: Elias Levy
> Assignee: Robert Metzger
> Priority: Minor
>
> Kafka 0.10.0.0 introduced protocol changes related to the producer.
> Published messages now include timestamps and compressed messages now include
> relative offsets. As it is now, brokers must decompress publisher compressed
> messages, assign offset to them, and recompress them, which is wasteful and
> makes it less likely that compression will be used at all.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)