This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 1c3d08f KAFKA-8972: Need to flush state even on unclean close (#7589)
1c3d08f is described below
commit 1c3d08f4fca3c786fcaca1dee9ffd6e2dcd74713
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Thu Oct 24 21:24:55 2019 -0700
KAFKA-8972: Need to flush state even on unclean close (#7589)
In the case of unclean close we still need to make sure all the stores are
flushed before closing any.
Reviewers: Matthias J. Sax <[email protected]>, Boyang Chen
<[email protected]>, Bill Bejeck <[email protected]>, Guozhang Wang
<[email protected]>
---
.../streams/processor/internals/StreamTask.java | 23 ++++++++++++----------
1 file changed, 13 insertions(+), 10 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index e859df8..f167e1a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -673,12 +673,17 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator
throw taskMigratedException;
}
} else {
- maybeAbortTransactionAndCloseRecordCollector(isZombie);
+ // In the case of unclean close we still need to make sure all the
stores are flushed before closing any
+ super.flushState();
+
+ if (eosEnabled) {
+ maybeAbortTransactionAndCloseRecordCollector(isZombie);
+ }
}
}
private void maybeAbortTransactionAndCloseRecordCollector(final boolean
isZombie) {
- if (eosEnabled && !isZombie) {
+ if (!isZombie) {
try {
if (transactionInFlight) {
producer.abortTransaction();
@@ -696,14 +701,12 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator
}
}
- if (eosEnabled) {
- try {
- recordCollector.close();
- } catch (final Throwable e) {
- log.error("Failed to close producer due to the following
error:", e);
- } finally {
- producer = null;
- }
+ try {
+ recordCollector.close();
+ } catch (final Throwable e) {
+ log.error("Failed to close producer due to the following error:",
e);
+ } finally {
+ producer = null;
}
}