Exidex created FLINK-31186:
------------------------------

             Summary: Removing topic from kafka source does nothing
                 Key: FLINK-31186
                 URL: https://issues.apache.org/jira/browse/FLINK-31186
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.15.3
            Reporter: Exidex


As far as I can tell, there is no good way to remove topic from the list of 
topic that kafka source consumes from. 

We use {{StreamExecutionEnvironment.fromSource}} api with 
{{KafkaSource.setTopics}} which accepts list of topics. but when we remove the 
topic from list after some time the flink kafka source still consumes from it. 

My guess is that it relates to this TODO in code:
[GitHub|https://github.com/apache/flink/blob/cc66d4855e6f8ee9986809a18f68a458bcfe3c12/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305]

You can kind of workaroud this by removing whole job state or changing uid of 
kafka source but that affects either whole job or whole source. The other way 
is to use state processor api but it doesn't expose source operator state, 
which in turn can be worked around using reflection and copying code from 
SourceCoordinator. None of those are satisfactory



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to