Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5200#discussion_r160598888 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -559,6 +565,7 @@ public void onException(Throwable cause) { // the fetchers 'snapshotCurrentState()' method return at least // the restored offsets this.kafkaFetcher = fetcher; + this.kafkaOffsetCommitter = createOffsetCommitter(); --- End diff -- This is a very valid argument. Will address this with a factory perhaps, as you suggested.
---