curcur edited a comment on issue #11725:
URL: https://github.com/apache/flink/pull/11725#issuecomment-617021982


   List some of the points here, just in case I forgot :-). We can chat details 
offline.
   
   1. If I am understanding correctly (please ignore if I am not), the original 
idea is to make use of the data already partitioned. So that users do not do an 
extra "reinterpret". 
   
   The problem is KeyedStream needs a specific way to decide key -> PartitionID 
`KeyGroupRangeAssignment.assignKeyToParallelOperator`. If the Keyed data in 
Kafka does not follow this hash function, it has to do a repartition (keyBy). 
   
   In other words, there is no easy way to guarantee the way an external system 
partitions data using the same way Flink uses. I think that's why Stephan 
suggests providing a Kafka Writer as well in the Jira. In this way how data is 
partitioned is controlled internally by us.
   
   `reinterpretAsKeyedStream` itself does not need an extra shuffle (if that's 
the concern).
   
   2. How people can reuse the data
   We can provide a read API for people to read, that should not be difficult 
to do (without letting them worrying about watermark), they only need to 
provide a data schema.
   The original persist API has a write and read separately. But I guess 
Stephan is more interested in the "Shuffle idea", so I wrapped them together to 
make the code a bit cleaner.
   
   3. Why watermark is needed
   If we treat this as a pure sink/source problem, well, you are right, the 
watermark is not needed. But that does not seem that useful. I think Stephan is 
more interested in taking this problem as a shuffle problem. Me as well, hmm, 
as a starting point for single task failure recovery (using the regional 
failover idea). As you can see, the watermark is not avoidable for a shuffle.
   
   4. Why watermark is designed in this way?
   This is one way we design how a watermark is transmitted from upstream. I 
agree that there are a lot of different ways (some of them I have actually 
tried, for example, timestamp). My guideline is to make the dependency on Kafka 
as small as possible. And this way is actually similar to how the shuffle data 
is organized.
   
   5. Why I have an extra sink function and operator class: 
   To avoid the effects/changes in the current interface. SinkFunctions and 
Operators are broadly used and I do not want to cause confusion or potential 
risks to our users.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to