Tan-JiaLiang commented on PR #52:
URL: 
https://github.com/apache/flink-connector-kafka/pull/52#issuecomment-1767610043

   > the PR description was confusing and I thought the issue was that the 
Kafka API was returning -1 offset for empty partition.
   
   Sorry for that, I rewrite already.
   
   
   > So basically, with this change, we should be storing the initial offset of 
0 in these cases (would be good to add unit test with the new offset 
initializer that verifies this).
   
   Agree, and the test case was already added. 
   See 
`org.apache.flink.streaming.connectors.kafka.table.KafkaTableITCase#testLatestOffsetStrategyResume`
   
   
   > I thought of an alternative solution on the reader side to set offsets to 
consumer.position(tp) - 1 (I also realized this is how the old implementation 
handles it). But I guess you could have an edge case, in which your checkpoint 
starts before offsets are resolved by the reader...
   
   I took a look at the old implementation, maybe different from the current.
   
   The old implementation can position the offset, replace the startup 
marker(e.g LATEST_OFFSET/EARLIEST_OFFSET) and set it into the 
`KafkaTopicPartitionState`, It can do it all in `KafkaConsumerThread`.
   
   The new implementation can position the offset in 
`KafkaPartitionSplitReader`, but there is no way to set it into the 
`KafkaPartitionSplitState`. The `KafkaPartitionSplitState#currentOffset` can 
only call in `KafkaRecordEmitter` in current design.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to