This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new c3da9ac  KAFKA-9562: part 1: ignore exceptions while flushing stores 
in close(dirty) (#8116)
c3da9ac is described below

commit c3da9ac86a21ff9bae459f8bc5d3d491360bc758
Author: John Roesler <[email protected]>
AuthorDate: Thu Feb 20 15:28:11 2020 -0600

    KAFKA-9562: part 1: ignore exceptions while flushing stores in close(dirty) 
(#8116)
    
    Reviewers: Guozhang Wang <[email protected]>
---
 .../apache/kafka/streams/processor/internals/StreamTask.java  |  6 +++++-
 .../kafka/streams/processor/internals/StreamThread.java       | 11 +++++++----
 2 files changed, 12 insertions(+), 5 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index ae06bc6..54da00d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -655,7 +655,11 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
             }
         } else {
             // In the case of unclean close we still need to make sure all the 
stores are flushed before closing any
-            super.flushState();
+            try {
+                stateMgr.flush();
+            } catch (final ProcessorStateException e) {
+                // ignore any exceptions while flushing (all stores would have 
had a chance to flush anyway)
+            }
 
             if (eosEnabled) {
                 maybeAbortTransactionAndCloseRecordCollector(isZombie);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index b6b3d83..0268c7a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -701,10 +701,13 @@ public class StreamThread extends Thread {
                     enforceRebalance();
                 }
             } catch (final TaskMigratedException ignoreAndRejoinGroup) {
-                log.warn("Detected task {} that got migrated to another 
thread. " +
-                        "This implies that this thread missed a rebalance and 
dropped out of the consumer group. " +
-                        "Will try to rejoin the consumer group. Below is the 
detailed description of the task:\n{}",
-                    ignoreAndRejoinGroup.migratedTask().id(), 
ignoreAndRejoinGroup.migratedTask().toString(">"));
+                log.warn("Detected task " + 
ignoreAndRejoinGroup.migratedTask().id() +
+                             " that got migrated to another thread. This 
implies that this thread missed" +
+                             " a rebalance and dropped out of the consumer 
group. Will try to rejoin the" +
+                             " consumer group. Below is the detailed 
description of the task:\n" +
+                             ignoreAndRejoinGroup.migratedTask().toString(">"),
+                         ignoreAndRejoinGroup
+                         );
 
                 enforceRebalance();
             }

Reply via email to