cadonna commented on code in PR #13269:
URL: https://github.com/apache/kafka/pull/13269#discussion_r1114238733


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -255,6 +255,7 @@ public void completeRestoration(final 
java.util.function.Consumer<Set<TopicParti
                 resetOffsetsIfNeededAndInitializeMetadata(offsetResetter);
                 initializeTopology();
                 processorContext.initialize();
+                maybeCheckpoint(true); // enforce checkpoint upon completing 
restoration

Review Comment:
   As the ticket says, we can only write this checkpoint if we process under 
at-least-once (ALOS) processing guarantee , but we must not write the 
checkpoint when we process under exactly-once (EOS). In EOS, we need to rebuild 
local state when the Streams client crashed or closed dirty. Otherwise, the 
Streams would reprocess the  records from the last committed offset and add the 
those same records again to the local state store violating EOS. We know that a 
Streams client crashed or closed dirty if there is no checkpoint file in the 
state directory. Now, if we write a checkpoint right after restoration and the 
Streams client processes records, but before the first commit the Streams 
client crashes or closes dirty, it would violate EOS for the aforementioned 
reasons.   
   So you need to differentiate between ALOS and EOS similar when a commit is 
executed: 
https://github.com/apache/kafka/blob/98c2f88e1c605195ccfac19c49a83216e26146a1/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L492



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to