[ 
https://issues.apache.org/jira/browse/FLINK-10342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16617172#comment-16617172
 ] 

Oleksandr Nitavskyi edited comment on FLINK-10342 at 9/17/18 11:29 AM:
-----------------------------------------------------------------------

Thanks for the comment. I agree that it works as expected from the Flink 
developer point of view, loading old TopicPartitions consumptions from the 
state and discovering newly requested TopicPartitions. In case there are not 
intersections we consume from both sets of TopicPartitions, since state in 
KafkaSource operator can never expire.

In the same time, this behavior is counterintuitive for the Flink users. When 
KafkaSource is created consuming "topic 1" it expected that "topic 1" will be 
consumed.
{code}
new KafkaSource("topic 1")
{code}

If after the refactoring KafkaSource is starting to consume another "topic 2":
{code}
new KafkaSource("topic 2")
{code}
And for us it sounds intuitive that data will come from the "topic 2" and only 
from the "topic 2" and current behavior has the hole in the abstraction.

I believe that it worth to make some action points at least on of:
* Make a small check in the state restoring method where we skip topics which 
are not passed via class constructor.
* Log some warning if topics in the state and in the constructors are different
* Document such behavior, also can be a good exercise which clarifies how does 
state managed and help to start thinking in a little bit different paradigm

If you think it worth to make some action point let me know and I will 
contribute.
Thank you


was (Author: oleksandr nitavskyi):
Thanks for the comment. I agree that it works as expected from the Flink 
developer point of view, loading old TopicPartitions consumptions from the 
state and discovering newly requested TopicPartitions. In case there are not 
intersections we consume from both sets of TopicPartitions, since state in 
KafkaSource operator can never expire.

In the same time, this behavior is counterintuitive for the Flink users. When 
KafkaSource is created consuming "topic 1" it expected that "topic 1" will be 
consumed.
{code}
new KafkaSource("topic 1")
{code}

If after the refactoring KafkaSource is starting to consume another "topic 2":
{code}
new Kafka Source("topic 2")
{code}
And for us it sounds intuitive that data will come from the "topic 2" and only 
from the "topic 2" and current behavior has the hole in the abstraction.

I believe that it worth to make some action points at least on of:
* Make a small check in the state restoring method where we skip topics which 
are not passed via class constructor.
* Log some warning if topics in the state and in the constructors are different
* Document such behavior, also can be a good exercise which clarifies how does 
state managed and help to start thinking in a little bit different paradigm

If you think it worth to make some action point let me know and I will 
contribute.
Thank you

> Kafka duplicate topic consumption when topic name is changed
> ------------------------------------------------------------
>
>                 Key: FLINK-10342
>                 URL: https://issues.apache.org/jira/browse/FLINK-10342
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Oleksandr Nitavskyi
>            Assignee: Oleksandr Nitavskyi
>            Priority: Major
>
> In case of topic name is simply renamed for a KafkaConsumer Flink starts to 
> consume from old and a new topic in the same time which can lead to 
> unexpected behavior.
> Here is the PR with reproduce: https://github.com/apache/flink/pull/6691
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to