Tan-JiaLiang commented on PR #52: URL: https://github.com/apache/flink-connector-kafka/pull/52#issuecomment-1741714161
@tzulitai Sorry for late. In your example, all of partitions's `currentOffset` is -1 at tp=1 before this PR changes. After this PR changes, all the partitions's `currentOffset` will be assign a meaningful value at tp=1. Let's take the test case `KafkaTableITCase#testLatestOffsetStrategyResume` as an example. Assume topic has 3 partitions, A/B/C tp=0: Start job with `latest-offsets` offsets initializer. tp=1: Write a record to partition A and B, C is empty. tp=2: Stop the job with savepoint. tp=3: Write a record to each partitions. All of partitions get a new record. tp=4: Restore the job from savepoint. Before this PR changes: tp=0: A/B/C `currentOffset` is -1. The `currentOffset` is -1 means that it will call `seekToEnd` in `KafkaSourceReader`. tp=1: The `currentOffset` for three partitions is A=1, B=1, C=-1. tp=2: We save the `currentOffset` A=1, B=1, C=-1 into savepoint. tp=3: Partitions's `endOffset` is A=2, B=2, C=1 after written. tp=4: After restore, partition A and B's consume is ok, but because of `currentOffset` C=-1, it will consume with `seekToEnd`, so data offset=1 lost. After this PR changes: tp=0: A/B/C `currentOffset` is 0. I query the end offset in `KafkaSourceEnumerator` instead of call `seekToEnd` in `KafkaSourceReader`. tp=1: The `currentOffset` for three partitions is A=1, B=1, C=0. tp=2: We save the `currentOffset` A=1, B=1, C=0 into savepoint. tp=3: Partitions's `endOffset` is A=2, B=2, C=1 after written. tp=4: After restore, partition A/B/C's consume is ok because of `currentOffset` C=0, so no data lost. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
