rmahindra123 commented on a change in pull request #4021:
URL: https://github.com/apache/hudi/pull/4021#discussion_r756242828



##########
File path: 
hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionParticipant.java
##########
@@ -215,13 +215,17 @@ private void writeRecords() {
         try {
           SinkRecord record = buffer.peek();
           if (record != null
-              && record.kafkaOffset() >= 
ongoingTransactionInfo.getLastWrittenKafkaOffset()) {
+              && record.kafkaOffset() == 
ongoingTransactionInfo.getExpectedKafkaOffset()) {

Review comment:
       They will always come in order. But its possible that kafka may send 
them out of order in case we do not commit an offset for sometime. For 
instance, it will send 12,13,14,15 ... but then realize that the consumer has 
not committed an offset (and the last committed offset was 5), it will start 
sending 5,6,7 .. but henceforth it will be in order. Thats why we have an 
excepted offset, and if we do not receive an  expected offset, we force offset 
reset, and kafka will start sending messages from before. This ensures no data 
loss/duplication, but yeah there may be scope for some optimization, but trying 
to be conservative for now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to