[ 
https://issues.apache.org/jira/browse/FLINK-3375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148821#comment-15148821
 ] 

Zach Cox commented on FLINK-3375:
---------------------------------

Sure [~StephanEwen] - although after thinking it through, it's probably 
unnecessary. If the FlinkKafkaConsumer tracks a watermark per topic partition, 
as described in this jira ticket, that should be good enough.

My thinking went something like this: imagine n web servers sending page view 
events to a Kafka topic with m partitions. Each event has a UUID field which is 
used as the message key. So each web server is spreading its events across all 
m partitions approximately uniformly, and each partition contains events from 
all n web servers.

Each event also has a timestamp field generated by the web server using its 
system clock. Each web server will (likely) be generating events with ascending 
timestamps, but the system clocks across web servers are independent, so each 
topic partition will contain events that are not strictly ascending. In general 
the events in a partition will be slightly out-of-order.

An alternative to inferring watermarks from the events in the Kafka topic 
partitions (i.e. the current `TimestampExtractor`) could be to have the event 
generators (e.g. web servers) also emit watermarks directly into the topic 
partitions. So the partitions would contain two types of messages: either an 
event or a watermark, just like how the internal Flink streams contain 
`StreamElement` which is either a `StreamRecord<T>` or `Watermark`. Each Kafka 
topic partition then contains both events and watermarks from multiple sources. 
The watermark for each partition would then be the min of watermarks from all 
sources - so I guess the watermarks would need to contain some kind of sourceId 
and then you either need to know all sourceIds, or use heuristics to infer all 
sourceIds to track the min. At that point you could just include a sourceId in 
the event itself and just treat each event as a watermark, and skip the 
explicit watermarks altogether. 

So instead of building anything new into Flink to support this, I think I would 
just include a sourceId (e.g. host+port of web server) along with the timestamp 
in the events, and then use that sourceId in my `TimestampExtractor` 
implementation to determine the watermark of the Kafka topic partition. Perhaps 
a generic `TimestampExtractor` implementation that tracks watermarks per event 
source could be created, but including both events and watermarks in the input 
Kafka topic partitions is probably not a good idea.

> Allow Watermark Generation in the Kafka Source
> ----------------------------------------------
>
>                 Key: FLINK-3375
>                 URL: https://issues.apache.org/jira/browse/FLINK-3375
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>    Affects Versions: 1.0.0
>            Reporter: Stephan Ewen
>             Fix For: 1.0.0
>
>
> It is a common case that event timestamps are ascending inside one Kafka 
> Partition. Ascending timestamps are easy for users, because they are handles 
> by ascending timestamp extraction.
> If the Kafka source has multiple partitions per source task, then the records 
> become out of order before timestamps can be extracted and watermarks can be 
> generated.
> If we make the FlinkKafkaConsumer an event time source function, it can 
> generate watermarks itself. It would internally implement the same logic as 
> the regular operators that merge streams, keeping track of event time 
> progress per partition and generating watermarks based on the current 
> guaranteed event time progress.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to