[ 
https://issues.apache.org/jira/browse/FLINK-3752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15344376#comment-15344376
 ] 

ASF GitHub Bot commented on FLINK-3752:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2142#discussion_r68061306
  
    --- Diff: docs/apis/streaming/connectors/kafka.md ---
    @@ -198,6 +198,63 @@ Flink on YARN supports automatic restart of lost YARN 
containers.
     
     If checkpointing is not enabled, the Kafka consumer will periodically 
commit the offsets to Zookeeper.
     
    +#### Kafka Consumers and Timestamp Extraction/Watermark Emission
    +
    +In many scenarios, the timestamp of a record is embedded (explicitly or 
implicitly) in the record itself. 
    +In addition, the user may want to emit watermarks either periodically, or 
in an irregular fashion, e.g. based on
    +special records in the Kafka stream that contain the current event-time 
watermark. For these cases, the Flink Kafka 
    +Consumer allows the specification of an `AssignerWithPeriodicWatermarks` 
or an `AssignerWithPunctuatedWatermarks`.
    +
    +You can specify your custom timestamp extractor/ watermark emitter as 
described 
    +[here]({{ site.baseurl 
}}/apis/streaming/event_timestamps_watermarks.html), or use one from the 
    +[predefined ones]({{ site.baseurl 
}}/apis/streaming/event_timestamp_extractors.html). After doing so, you 
    +can pass it to your consumer in the following way:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +Properties properties = new Properties();
    +properties.setProperty("bootstrap.servers", "localhost:9092");
    +// only required for Kafka 0.8
    +properties.setProperty("zookeeper.connect", "localhost:2181");
    +properties.setProperty("group.id", "test");
    +
    +FlinkKafkaConsumer08<String> myConsumer = 
    +    new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), 
properties);
    +myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
    +
    +DataStream<String> stream = env
    +   .addSource(myConsumer)
    +   .print();
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val properties = new Properties();
    +properties.setProperty("bootstrap.servers", "localhost:9092");
    +// only required for Kafka 0.8
    +properties.setProperty("zookeeper.connect", "localhost:2181");
    +properties.setProperty("group.id", "test");
    +
    +val myConsumer = new FlinkKafkaConsumer08[Stirng]("topic", new 
SimpleStringSchema(), properties);
    +myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
    +stream = env
    +    .addSource(myConsumer)
    +    .print
    +{% endhighlight %}
    +</div>
    +</div>
    + 
    +When such an assigner is specified, for each record read from Kafka, the 
    +`extractTimestamp(T element, long previousElementTimestamp)` is called to 
assign a timestamp to the record and 
    +the `Watermark getCurrentWatermark()` (for periodic) or the 
    +`Watermark checkAndGetNextWatermark(T lastElement, long 
extractedTimestamp)` (for punctuated) is called to determine 
    +if a new watermark should be emitted and with which timestamp.
    +
    +Internally, an instance of the assigner is executed per Kafka partition, 
and when a single task is reading 
    +form multiple partitions, watermark alignment is done as described 
    
+[here](http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/).
    --- End diff --
    
    Could you remove the link to the data Artisans blog here?
    The Flink documentation should be self-contained and independent of 
material from any companies.


> Add Per-Kafka-Partition Watermark Generation to the docs
> --------------------------------------------------------
>
>                 Key: FLINK-3752
>                 URL: https://issues.apache.org/jira/browse/FLINK-3752
>             Project: Flink
>          Issue Type: Improvement
>          Components: Documentation
>    Affects Versions: 1.1.0
>            Reporter: Stephan Ewen
>            Assignee: Kostas Kloudas
>             Fix For: 1.1.0
>
>
> The new methods that create watermarks per Kafka topic-partition, rather than 
> per Flink DataStream partition, should be documented under 
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/event_timestamps_watermarks.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to