[
https://issues.apache.org/jira/browse/FLINK-11848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-11848:
-----------------------------------
Labels: auto-unassigned stale-major (was: auto-unassigned)
I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help
the community manage its development. I see this issues has been marked as
Major but is unassigned and neither itself nor its Sub-Tasks have been updated
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this
ticket is a Major, please either assign yourself or give an update. Afterwards,
please remove the label or in 7 days the issue will be deprioritized.
> Delete outdated kafka topics caused UNKNOWN_TOPIC_EXCEPTIION
> ------------------------------------------------------------
>
> Key: FLINK-11848
> URL: https://issues.apache.org/jira/browse/FLINK-11848
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.6.4
> Reporter: Shengnan YU
> Priority: Major
> Labels: auto-unassigned, stale-major
>
> Recently we are doing some streaming jobs with apache flink. There are
> multiple KAFKA topics with a format as xxxxxx_yy-mm-dd. We used a topic regex
> pattern to let a consumer to consume those topics. However, if we delete some
> older topics, it seems that the metadata in consumer does not update properly
> so It still remember those outdated topic in its topic list, which leads to
> *UNKNOWN_TOPIC_EXCEPTION*. We must restart the consumer job to recovery. It
> seems to occur in producer as well. Any idea to solve this problem? Thank you
> very much!
>
> Example to reproduce problem:
> There are multiple kafka topics which are
> "test20190310","test20190311","test20190312" for instance. I run the job and
> everything is ok. Then if I delete topic "test20190310", the consumer does
> not perceive the topic is deleted, it will still go fetching metadata of that
> topic. In taskmanager's log, unknown errors display.
> {code:java}
> public static void main(String []args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:9092\n");
> props.put("group.id", "test10");
> props.put("enable.auto.commit", "true");
> props.put("auto.commit.interval.ms", "1000");
> props.put("auto.offset.rest", "earliest");
> props.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>
> props.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
> "1200000");
> Pattern topics = Pattern.compile("^test.*$");
> FlinkKafkaConsumer011<String> consumer = new
> FlinkKafkaConsumer011<>(topics, new SimpleStringSchema(), props);
> DataStream<String> stream = env.addSource(consumer);
> stream.writeToSocket("localhost", 44444, new SimpleStringSchema());
> env.execute("test");
> }
> }
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)