[
https://issues.apache.org/jira/browse/FLINK-3375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15148821#comment-15148821
]
Zach Cox commented on FLINK-3375:
---------------------------------
Sure [~StephanEwen] - although after thinking it through, it's probably
unnecessary. If the FlinkKafkaConsumer tracks a watermark per topic partition,
as described in this jira ticket, that should be good enough.
My thinking went something like this: imagine n web servers sending page view
events to a Kafka topic with m partitions. Each event has a UUID field which is
used as the message key. So each web server is spreading its events across all
m partitions approximately uniformly, and each partition contains events from
all n web servers.
Each event also has a timestamp field generated by the web server using its
system clock. Each web server will (likely) be generating events with ascending
timestamps, but the system clocks across web servers are independent, so each
topic partition will contain events that are not strictly ascending. In general
the events in a partition will be slightly out-of-order.
An alternative to inferring watermarks from the events in the Kafka topic
partitions (i.e. the current `TimestampExtractor`) could be to have the event
generators (e.g. web servers) also emit watermarks directly into the topic
partitions. So the partitions would contain two types of messages: either an
event or a watermark, just like how the internal Flink streams contain
`StreamElement` which is either a `StreamRecord<T>` or `Watermark`. Each Kafka
topic partition then contains both events and watermarks from multiple sources.
The watermark for each partition would then be the min of watermarks from all
sources - so I guess the watermarks would need to contain some kind of sourceId
and then you either need to know all sourceIds, or use heuristics to infer all
sourceIds to track the min. At that point you could just include a sourceId in
the event itself and just treat each event as a watermark, and skip the
explicit watermarks altogether.
So instead of building anything new into Flink to support this, I think I would
just include a sourceId (e.g. host+port of web server) along with the timestamp
in the events, and then use that sourceId in my `TimestampExtractor`
implementation to determine the watermark of the Kafka topic partition. Perhaps
a generic `TimestampExtractor` implementation that tracks watermarks per event
source could be created, but including both events and watermarks in the input
Kafka topic partitions is probably not a good idea.
> Allow Watermark Generation in the Kafka Source
> ----------------------------------------------
>
> Key: FLINK-3375
> URL: https://issues.apache.org/jira/browse/FLINK-3375
> Project: Flink
> Issue Type: Improvement
> Components: Kafka Connector
> Affects Versions: 1.0.0
> Reporter: Stephan Ewen
> Fix For: 1.0.0
>
>
> It is a common case that event timestamps are ascending inside one Kafka
> Partition. Ascending timestamps are easy for users, because they are handles
> by ascending timestamp extraction.
> If the Kafka source has multiple partitions per source task, then the records
> become out of order before timestamps can be extracted and watermarks can be
> generated.
> If we make the FlinkKafkaConsumer an event time source function, it can
> generate watermarks itself. It would internally implement the same logic as
> the regular operators that merge streams, keeping track of event time
> progress per partition and generating watermarks based on the current
> guaranteed event time progress.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)