[ 
https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15429202#comment-15429202
 ] 

ASF GitHub Bot commented on FLINK-4035:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2369#discussion_r75572263
  
    --- Diff: docs/apis/streaming/connectors/kafka.md ---
    @@ -291,3 +301,35 @@ higher value.
     There is currently no transactional producer for Kafka, so Flink can not 
guarantee exactly-once delivery
     into a Kafka topic.
     
    +### Using Kafka timestamps and Flink event time in Kafka 0.10
    +
    +Since Apache Kafka 0.10., Kafka's messages can carry 
[timestamps](https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message),
 indicating
    +the time the event has occurred (see ["event time" in Apache 
Flink](../event_time.html)) or the time when the message
    +has been written to the Kafka broker.
    +
    +The `FlinkKafkaConsumer010` will emit records with the timestamp attached, 
if the time characteristic in Flink is 
    +set to `TimeCharacteristic.EventTime` 
(`StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)`).
    +
    +The Kafka consumer does not emit watermarks. To emit watermarks, the same 
mechanisms as described above in 
    +"Kafka Consumers and Timestamp Extraction/Watermark Emission"  using the 
`assignTimestampsAndWatermarks` method are applicable.
    +
    +There is no need to define a timestamp extractor when using the timestamps 
from Kafka. The `previousElementTimestamp` argument of 
    +the `extractTimestamp()` method contains the timestamp carried by the 
Kafka message.
    +
    +A timestamp extractor for a Kafka consumer would look like this:
    +{% highlight java %}
    +public long extractTimestamp(Long element, long previousElementTimestamp) {
    +    return previousElementTimestamp;
    +}
    +{% endhighlight %}
    +
    +
    +
    +The `FlinkKafkaProducer010` only emits the record timestamp, if 
`setWriteTimestampToKafka(true)` is set.
    +
    +{% highlight java %}
    +FlinkKafkaProducer010.FlinkKafkaProducer010Configuration config = 
FlinkKafkaProducer010.writeToKafka(streamWithTimestamps, topic, new 
SimpleStringSchema(), standardProps);
    +config.setWriteTimestampToKafka(true);
    +{% endhighlight %}
    --- End diff --
    
    I find the usage pattern of this a bit unfamiliar. I've explained this in 
inline comments of the `FlinkKafkaProducer010` class.


> Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
> ---------------------------------------------------
>
>                 Key: FLINK-4035
>                 URL: https://issues.apache.org/jira/browse/FLINK-4035
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.0.3
>            Reporter: Elias Levy
>            Assignee: Robert Metzger
>            Priority: Minor
>
> Kafka 0.10.0.0 introduced protocol changes related to the producer.  
> Published messages now include timestamps and compressed messages now include 
> relative offsets.  As it is now, brokers must decompress publisher compressed 
> messages, assign offset to them, and recompress them, which is wasteful and 
> makes it less likely that compression will be used at all.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to