This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 1c3d08f  KAFKA-8972: Need to flush state even on unclean close (#7589)
1c3d08f is described below

commit 1c3d08f4fca3c786fcaca1dee9ffd6e2dcd74713
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Thu Oct 24 21:24:55 2019 -0700

    KAFKA-8972: Need to flush state even on unclean close (#7589)
    
    In the case of unclean close we still need to make sure all the stores are 
flushed before closing any.
    
    Reviewers: Matthias J. Sax <[email protected]>, Boyang Chen 
<[email protected]>, Bill Bejeck <[email protected]>, Guozhang Wang 
<[email protected]>
---
 .../streams/processor/internals/StreamTask.java    | 23 ++++++++++++----------
 1 file changed, 13 insertions(+), 10 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index e859df8..f167e1a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -673,12 +673,17 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
                 throw taskMigratedException;
             }
         } else {
-            maybeAbortTransactionAndCloseRecordCollector(isZombie);
+            // In the case of unclean close we still need to make sure all the 
stores are flushed before closing any
+            super.flushState();
+
+            if (eosEnabled) {
+                maybeAbortTransactionAndCloseRecordCollector(isZombie);
+            }
         }
     }
 
     private void maybeAbortTransactionAndCloseRecordCollector(final boolean 
isZombie) {
-        if (eosEnabled && !isZombie) {
+        if (!isZombie) {
             try {
                 if (transactionInFlight) {
                     producer.abortTransaction();
@@ -696,14 +701,12 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
             }
         }
 
-        if (eosEnabled) {
-            try {
-                recordCollector.close();
-            } catch (final Throwable e) {
-                log.error("Failed to close producer due to the following 
error:", e);
-            } finally {
-                producer = null;
-            }
+        try {
+            recordCollector.close();
+        } catch (final Throwable e) {
+            log.error("Failed to close producer due to the following error:", 
e);
+        } finally {
+            producer = null;
         }
     }
 

Reply via email to