[ https://issues.apache.org/jira/browse/FLINK-17638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flink Jira Bot updated FLINK-17638: ----------------------------------- Labels: auto-deprioritized-major stale-minor (was: auto-deprioritized-major) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Minor but is unassigned and neither itself nor its Sub-Tasks have been updated for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is still Minor, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > FlinkKafkaConsumerBase restore from empty state will be set consume from > earliest forced > ---------------------------------------------------------------------------------------- > > Key: FLINK-17638 > URL: https://issues.apache.org/jira/browse/FLINK-17638 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.9.0, 1.9.3, 1.10.0 > Environment: Flink 1.9.0 > kafka 1.1.0 > jdk 1.8 > Reporter: chenchuangchuang > Priority: Minor > Labels: auto-deprioritized-major, stale-minor > > my work target and data is like this : > # i need count the number of post per user create last 30 days in my system > # the total and realtime data is in MYSQL > # i can get increment MYSQL binlog from kafka-1.1.1 ( it just store the > last 7 days binlog), the topic name is "binlog_post_topic" > # so , i have to combine the MYSQL data and the binlog data > > i do it in this way: > # first , i carry a snapshot of MYSQL data to kafka topic in order of > create_time ( topic name is "init-post-topic"), and consume from kafka topic > "init-post-topic" as flink data-stream with the SlidingEventTimeWindows > # second, after the task do all the data in the topic "init-post-topic" , i > create a save point for the task , call the save point save-point-a > # third, i modify my code , > ## the data source is "binlog_post_topic" topic of kafka , > ## other operotor will not change, > ## and the "binlog_post_topic" is setted consuming from special timestamp > (when the snapshot of MYSQL create ) > # forth, i restart my task from save-point-a > but i find the kafka consumer for the "binlog_post_topic" do not consume data > from the timestamp i setted, but from the earlist, i find the log in the > task manager > {code:java} > //代码占位符 > 2020-05-11 17:20:47,228 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - > Consumer subtask 0 restored state: {}. > ... > 2020-05-12 20:14:52,641 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - > Consumer subtask 0 will start reading 1 partitions with offsets in restored > state: {KafkaTopicPartition{topic='binlog_post_topic', > partition=0}=-915623761775} > 2020-05-11 17:20:47,414 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - > Consumer subtask 0 creating fetcher with offsets > {KafkaTopicPartition{topic='binlog_post_topic', partition=0}=-915623761775}. > {code} > i guess this may be caused by the FlinkKafkaConsumerBase > then i find code like this > in the method FlinkKafkaConsumerBase.initializeState() > {code:java} > //代码占位符 > if (context.isRestored() && !restoredFromOldState) { > restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator()); > ....{code} > this code mean that if a task is restart from the save point ,that > restoredState will not be null, at least be an empty TreeMap; > and in FlinkKafkaConsumerBase.open() > {code:java} > //代码占位符 > if (restoredState != null) { > for (KafkaTopicPartition partition : allPartitions) { > if (!restoredState.containsKey(partition)) { > restoredState.put(partition, > KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET); > } > } > {code} > in this place will init the consumer , if a task is restart from a save-point > , restoredState at least is an empty TreeMap, then in this code , the > consumer will be setted consume from > KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET > i change this code like this > {code:java} > //代码占位符 > if (restoredState != null && !restoredState.isEmpty()) { > .... > {code} > > and this work well for me . > > 刚注意到这是一个中文jira, 哭晕 > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)