Tan-JiaLiang commented on PR #52: URL: https://github.com/apache/flink-connector-kafka/pull/52#issuecomment-1767610043
> the PR description was confusing and I thought the issue was that the Kafka API was returning -1 offset for empty partition. Sorry for that, I rewrite already. > So basically, with this change, we should be storing the initial offset of 0 in these cases (would be good to add unit test with the new offset initializer that verifies this). Agree, and the test case was already added. See `org.apache.flink.streaming.connectors.kafka.table.KafkaTableITCase#testLatestOffsetStrategyResume` > I thought of an alternative solution on the reader side to set offsets to consumer.position(tp) - 1 (I also realized this is how the old implementation handles it). But I guess you could have an edge case, in which your checkpoint starts before offsets are resolved by the reader... I took a look at the old implementation, maybe different from the current. The old implementation can position the offset, replace the startup marker(e.g LATEST_OFFSET/EARLIEST_OFFSET) and set it into the `KafkaTopicPartitionState`, It can do it all in `KafkaConsumerThread`. The new implementation can position the offset in `KafkaPartitionSplitReader`, but there is no way to set it into the `KafkaPartitionSplitState`. The `KafkaPartitionSplitState#currentOffset` can only call in `KafkaRecordEmitter` in current design. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
