[
https://issues.apache.org/jira/browse/FLINK-31186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17692598#comment-17692598
]
Martijn Visser commented on FLINK-31186:
----------------------------------------
Wouldn't this be considered by design, given that you're breaking the state of
your job by removing a topic, so it requires an action for the user to deal
with that (either start from a clean state or change the UID)?
> Removing topic from kafka source does nothing
> ---------------------------------------------
>
> Key: FLINK-31186
> URL: https://issues.apache.org/jira/browse/FLINK-31186
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.15.3
> Reporter: Exidex
> Priority: Major
>
> As far as I can tell, there is no good way to remove topic from the list of
> topic that kafka source consumes from.
> We use {{StreamExecutionEnvironment.fromSource}} api with
> {{KafkaSource.setTopics}} which accepts list of topics. but when we remove
> the topic from list after some time the flink kafka source still consumes
> from it.
> My guess is that it relates to this TODO in code:
> [GitHub|https://github.com/apache/flink/blob/cc66d4855e6f8ee9986809a18f68a458bcfe3c12/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305]
> You can kind of workaroud this by removing whole job state or changing uid of
> kafka source but that affects either whole job or whole source. The other way
> is to use state processor api but it doesn't expose source operator state,
> which in turn can be worked around using reflection and copying code from
> SourceCoordinator. None of those are satisfactory
--
This message was sent by Atlassian Jira
(v8.20.10#820010)