rmahindra123 commented on a change in pull request #4021:
URL: https://github.com/apache/hudi/pull/4021#discussion_r756242828
##########
File path:
hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionParticipant.java
##########
@@ -215,13 +215,17 @@ private void writeRecords() {
try {
SinkRecord record = buffer.peek();
if (record != null
- && record.kafkaOffset() >=
ongoingTransactionInfo.getLastWrittenKafkaOffset()) {
+ && record.kafkaOffset() ==
ongoingTransactionInfo.getExpectedKafkaOffset()) {
Review comment:
They will always come in order. But its possible that kafka may send
them out of order in case we do not commit an offset for sometime. For
instance, it will send 12,13,14,15 ... but then realize that the consumer has
not committed an offset (and the last committed offset was 5), it will start
sending 5,6,7 .. but henceforth it will be in order. Thats why we have an
excepted offset, and if we do not receive an expected offset, we force offset
reset, and kafka will start sending messages from before. This ensures no data
loss/duplication, but yeah there may be scope for some optimization, but trying
to be conservative for now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]