Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2369#discussion_r75572263
  
    --- Diff: docs/apis/streaming/connectors/kafka.md ---
    @@ -291,3 +301,35 @@ higher value.
     There is currently no transactional producer for Kafka, so Flink can not 
guarantee exactly-once delivery
     into a Kafka topic.
     
    +### Using Kafka timestamps and Flink event time in Kafka 0.10
    +
    +Since Apache Kafka 0.10., Kafka's messages can carry 
[timestamps](https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message),
 indicating
    +the time the event has occurred (see ["event time" in Apache 
Flink](../event_time.html)) or the time when the message
    +has been written to the Kafka broker.
    +
    +The `FlinkKafkaConsumer010` will emit records with the timestamp attached, 
if the time characteristic in Flink is 
    +set to `TimeCharacteristic.EventTime` 
(`StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)`).
    +
    +The Kafka consumer does not emit watermarks. To emit watermarks, the same 
mechanisms as described above in 
    +"Kafka Consumers and Timestamp Extraction/Watermark Emission"  using the 
`assignTimestampsAndWatermarks` method are applicable.
    +
    +There is no need to define a timestamp extractor when using the timestamps 
from Kafka. The `previousElementTimestamp` argument of 
    +the `extractTimestamp()` method contains the timestamp carried by the 
Kafka message.
    +
    +A timestamp extractor for a Kafka consumer would look like this:
    +{% highlight java %}
    +public long extractTimestamp(Long element, long previousElementTimestamp) {
    +    return previousElementTimestamp;
    +}
    +{% endhighlight %}
    +
    +
    +
    +The `FlinkKafkaProducer010` only emits the record timestamp, if 
`setWriteTimestampToKafka(true)` is set.
    +
    +{% highlight java %}
    +FlinkKafkaProducer010.FlinkKafkaProducer010Configuration config = 
FlinkKafkaProducer010.writeToKafka(streamWithTimestamps, topic, new 
SimpleStringSchema(), standardProps);
    +config.setWriteTimestampToKafka(true);
    +{% endhighlight %}
    --- End diff --
    
    I find the usage pattern of this a bit unfamiliar. I've explained this in 
inline comments of the `FlinkKafkaProducer010` class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to