[ 
https://issues.apache.org/jira/browse/FLINK-31186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17692598#comment-17692598
 ] 

Martijn Visser commented on FLINK-31186:
----------------------------------------

Wouldn't this be considered by design, given that you're breaking the state of 
your job by removing a topic, so it requires an action for the user to deal 
with that (either start from a clean state or change the UID)? 

> Removing topic from kafka source does nothing
> ---------------------------------------------
>
>                 Key: FLINK-31186
>                 URL: https://issues.apache.org/jira/browse/FLINK-31186
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.15.3
>            Reporter: Exidex
>            Priority: Major
>
> As far as I can tell, there is no good way to remove topic from the list of 
> topic that kafka source consumes from. 
> We use {{StreamExecutionEnvironment.fromSource}} api with 
> {{KafkaSource.setTopics}} which accepts list of topics. but when we remove 
> the topic from list after some time the flink kafka source still consumes 
> from it. 
> My guess is that it relates to this TODO in code:
> [GitHub|https://github.com/apache/flink/blob/cc66d4855e6f8ee9986809a18f68a458bcfe3c12/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305]
> You can kind of workaroud this by removing whole job state or changing uid of 
> kafka source but that affects either whole job or whole source. The other way 
> is to use state processor api but it doesn't expose source operator state, 
> which in turn can be worked around using reflection and copying code from 
> SourceCoordinator. None of those are satisfactory



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to