This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new 121df46 KAFKA-9582: Do not abort transaction in unclean close (#8143)
121df46 is described below
commit 121df465fad0bc062537c027bf9ed4755112d10c
Author: Boyang Chen <[email protected]>
AuthorDate: Fri Feb 21 10:27:57 2020 -0800
KAFKA-9582: Do not abort transaction in unclean close (#8143)
In order to avoid hitting the fatal exception during unclean close, we
should avoid calling the abortTransaction() call.
Reviewers: John Roesler <[email protected]>, Guozhang Wang
<[email protected]>
---
.../streams/processor/internals/StreamTask.java | 49 ++++++----------------
.../processor/internals/StreamTaskTest.java | 12 +++---
2 files changed, 18 insertions(+), 43 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 54da00d..9aa8e79 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -94,7 +94,6 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator
private long idleStartTime;
private Producer<byte[], byte[]> producer;
private boolean commitRequested = false;
- private boolean transactionInFlight = false;
private final String threadId;
@@ -294,7 +293,6 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator
} catch (final ProducerFencedException |
UnknownProducerIdException e) {
throw new TaskMigratedException(this, e);
}
- transactionInFlight = true;
}
processorContext.initialize();
@@ -522,10 +520,8 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator
if (eosEnabled) {
producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata,
applicationId);
producer.commitTransaction();
- transactionInFlight = false;
if (startNewTransaction) {
producer.beginTransaction();
- transactionInFlight = true;
}
} else {
consumer.commitSync(consumedOffsetsAndMetadata);
@@ -602,7 +598,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator
*/
public void suspend() {
log.debug("Suspending");
- suspend(true, false);
+ suspend(true);
}
/**
@@ -618,8 +614,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator
* or if the task producer got fenced (EOS)
*/
// visible for testing
- void suspend(final boolean clean,
- final boolean isZombie) {
+ void suspend(final boolean clean) {
// this is necessary because all partition times are reset to -1
during close
// we need to preserve the original partitions times before calling
commit
final Map<TopicPartition, Long> partitionTimes =
extractPartitionTimes();
@@ -640,14 +635,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator
if (eosEnabled) {
stateMgr.checkpoint(activeTaskCheckpointableOffsets());
-
- try {
- recordCollector.close();
- } catch (final RecoverableClientException e) {
- taskMigratedException = new
TaskMigratedException(this, e);
- } finally {
- producer = null;
- }
+ taskMigratedException = closeRecordCollector();
}
}
if (taskMigratedException != null) {
@@ -662,37 +650,26 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator
}
if (eosEnabled) {
- maybeAbortTransactionAndCloseRecordCollector(isZombie);
+ // Ignore any exceptions whilee closing the record collector,
i.e task producer.
+ closeRecordCollector();
}
}
}
- private void maybeAbortTransactionAndCloseRecordCollector(final boolean
isZombie) {
- if (!isZombie) {
- try {
- if (transactionInFlight) {
- producer.abortTransaction();
- }
- transactionInFlight = false;
- } catch (final ProducerFencedException ignore) {
- /* TODO
- * this should actually never happen atm as we guard the call
to #abortTransaction
- * -> the reason for the guard is a "bug" in the Producer --
it throws IllegalStateException
- * instead of ProducerFencedException atm. We can remove the
isZombie flag after KAFKA-5604 got
- * fixed and fall-back to this catch-and-swallow code
- */
-
- // can be ignored: transaction got already aborted by
brokers/transactional-coordinator if this happens
- }
- }
+ private TaskMigratedException closeRecordCollector() {
+ TaskMigratedException taskMigratedException = null;
try {
recordCollector.close();
+ } catch (final RecoverableClientException e) {
+ taskMigratedException = new TaskMigratedException(this, e);
} catch (final Throwable e) {
log.error("Failed to close producer due to the following error:",
e);
} finally {
producer = null;
}
+
+ return taskMigratedException;
}
private void closeTopology() {
@@ -742,7 +719,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator
/**
* <pre>
- * - {@link #suspend(boolean, boolean) suspend(clean)}
+ * - {@link #suspend(boolean) suspend(clean)}
* - close topology
* - if (clean) {@link #commit()}
* - flush state and producer
@@ -765,7 +742,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator
RuntimeException firstException = null;
try {
- suspend(clean, isZombie);
+ suspend(clean);
} catch (final RuntimeException e) {
clean = false;
firstException = e;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 1d0ca4f..2832291 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -1317,26 +1317,25 @@ public class StreamTaskTest {
}
@Test
- public void
shouldAbortTransactionAndCloseProducerOnUncleanCloseWithEosEnabled() {
+ public void shouldCloseProducerOnUncleanCloseWithEosEnabled() {
task = createStatelessTask(createConfig(true),
StreamsConfig.METRICS_LATEST);
task.initializeTopology();
task.close(false, false);
task = null;
- assertTrue(producer.transactionAborted());
- assertFalse(producer.transactionInFlight());
+ // Make sure no method call on the producer during an unclean close
(such as abort).
+ assertTrue(producer.transactionInFlight());
assertTrue(producer.closed());
}
@Test
- public void
shouldAbortTransactionAndCloseProducerOnErrorDuringUncleanCloseWithEosEnabled()
{
+ public void shouldCloseProducerOnErrorDuringUncleanCloseWithEosEnabled() {
task = createTaskThatThrowsException(true);
task.initializeTopology();
task.close(false, false);
- assertTrue(producer.transactionAborted());
assertTrue(producer.closed());
}
@@ -1553,7 +1552,7 @@ public class StreamTaskTest {
}
@Test
- public void
shouldAbortTransactionButNotCloseProducerIfFencedOnCloseDuringUncleanCloseWithEosEnabled()
{
+ public void
shouldNotCloseProducerIfFencedOnCloseDuringUncleanCloseWithEosEnabled() {
task = createStatelessTask(createConfig(true),
StreamsConfig.METRICS_LATEST);
task.initializeTopology();
producer.fenceProducerOnClose();
@@ -1561,7 +1560,6 @@ public class StreamTaskTest {
task.close(false, false);
task = null;
- assertTrue(producer.transactionAborted());
assertFalse(producer.closed());
}