[ 
https://issues.apache.org/jira/browse/FLINK-15670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17056757#comment-17056757
 ] 

Yuan Mei commented on FLINK-15670:
----------------------------------

Some updates since last sync up:

*Watermark*

For watermark, Stephan is proposing to keep each subtask's watermark in each 
partition; and merge/align them on the consumer side. This is an improvement 
version of the second solution proposed above. The benefit is to avoid the 
exponential # of partitions.

 

+_*Producer Side*_+

*Stored Data Format*

 
{code:java}
TAG_RECORD, DATA<T>
TAG_WATERMARK, SUB_TASK_INDEX, WATERMARK(LONG)
{code}
 

The original idea is to store the `StreamElement` directly. However, there are 
two problems:
 # As stated above, the watermark needs to associate with a subtask index; 
StreamElement does not include a subtask number.
 # There are constraints on TYPE implication. For example, DataStreamSink 
requires the inputStream and StreamSink to have the same type.  This same type 
is also the type of element the invokable sink function operates on. We need 
many API changes if we want to break these constraints

 

Instead, I add a class `FlinkKafkaShuffleProducer` that extends 
`FlinkKafkaProducer` and override its 

`invoke` function. The function provides an internal Serializer that can handle 
`Record` and `Watermark` at the same time.

 

*How a Watermark is propagated*

The producer(sink function) is only invoked when processing an element 
(StreamRecord). So I am using the context associated with the element to get 
the watermark. This is a bit tricky since a watermark is always put `behind` an 
element, so I won't get a watermark if it is the end of the stream.

I was also thinking of invoking the producer (sink function) when 
`processWatermark`. However, it is also a lot of API changes if doing this way. 
I would try to avoid such interface changes if possible.

 

_*+Consumer Side+*_

*How data is consumed and a watermark is retrieved*

Currently, data is deserialized in KafkaFetcher.runFetchLoop; and a watermark 
is regenerated in

AbstractFetcher.emitRecordWithTimestampAndPunctuatedWatermark. Hence, I am 
going to override these two functions. 

 

The producer side is almost done, and the consumer side is under construction.

 

 

 

> Provide a Kafka Source/Sink pair that aligns Kafka's Partitions and Flink's 
> KeyGroups
> -------------------------------------------------------------------------------------
>
>                 Key: FLINK-15670
>                 URL: https://issues.apache.org/jira/browse/FLINK-15670
>             Project: Flink
>          Issue Type: New Feature
>          Components: API / DataStream, Connectors / Kafka
>            Reporter: Stephan Ewen
>            Priority: Major
>              Labels: usability
>             Fix For: 1.11.0
>
>
> This Source/Sink pair would serve two purposes:
> 1. You can read topics that are already partitioned by key and process them 
> without partitioning them again (avoid shuffles)
> 2. You can use this to shuffle through Kafka, thereby decomposing the job 
> into smaller jobs and independent pipelined regions that fail over 
> independently.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to