Repository: kafka Updated Branches: refs/heads/0.11.0 7dde914aa -> c62793eab
KAFKA-5147; Add missing synchronization to TransactionManager The basic idea is that exactly three collections, ie. `pendingRequests`, `newPartitionsToBeAddedToTransaction`, and `partitionsInTransaction` are accessed from the context of application threads. The first two are modified from the application threads, and the last is read from those threads. So to make the `TransactionManager` truly thread safe, we have to ensure that all accesses to these three members are done in a synchronized block. I inspected the code, and I believe this patch puts the synchronization in all the correct places. Author: Apurva Mehta <[email protected]> Reviewers: Jason Gustafson <[email protected]> Closes #3132 from apurvam/KAFKA-5147-transaction-manager-synchronization-fixes (cherry picked from commit 02c0c3b01730bdbff8a09d355c1b017715c7ce10) Signed-off-by: Jason Gustafson <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c62793ea Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c62793ea Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c62793ea Branch: refs/heads/0.11.0 Commit: c62793eab35d166c33c018d931fa4a40024a2524 Parents: 7dde914 Author: Apurva Mehta <[email protected]> Authored: Thu May 25 16:17:18 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Thu May 25 16:25:46 2017 -0700 ---------------------------------------------------------------------- .../producer/internals/TransactionManager.java | 34 +++++++++++--------- .../internals/TransactionalRequestResult.java | 2 +- 2 files changed, 20 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c62793ea/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index d674697..e5c6ec2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -240,23 +240,23 @@ public class TransactionManager { return transactionalId != null; } - public boolean isCompletingTransaction() { + public synchronized boolean isCompletingTransaction() { return currentState == State.COMMITTING_TRANSACTION || currentState == State.ABORTING_TRANSACTION; } - public boolean isInTransaction() { - return currentState == State.IN_TRANSACTION || isCompletingTransaction(); + public synchronized boolean isInErrorState() { + return currentState == State.ABORTABLE_ERROR || currentState == State.FATAL_ERROR; } - public boolean isInErrorState() { - return currentState == State.ABORTABLE_ERROR || currentState == State.FATAL_ERROR; + synchronized boolean isInTransaction() { + return currentState == State.IN_TRANSACTION || isCompletingTransaction(); } - public synchronized void transitionToAbortableError(RuntimeException exception) { + synchronized void transitionToAbortableError(RuntimeException exception) { transitionTo(State.ABORTABLE_ERROR, exception); } - public synchronized void transitionToFatalError(RuntimeException exception) { + synchronized void transitionToFatalError(RuntimeException exception) { transitionTo(State.FATAL_ERROR, exception); } @@ -383,17 +383,17 @@ public class TransactionManager { } // visible for testing - boolean transactionContainsPartition(TopicPartition topicPartition) { + synchronized boolean transactionContainsPartition(TopicPartition topicPartition) { return isInTransaction() && partitionsInTransaction.contains(topicPartition); } // visible for testing - boolean hasPendingOffsetCommits() { + synchronized boolean hasPendingOffsetCommits() { return isInTransaction() && !pendingTxnOffsetCommits.isEmpty(); } // visible for testing - boolean isReadyForTransaction() { + synchronized boolean isReadyForTransaction() { return isTransactional() && currentState == State.READY; } @@ -443,7 +443,7 @@ public class TransactionManager { return false; } - private void lookupCoordinator(FindCoordinatorRequest.CoordinatorType type, String coordinatorKey) { + private synchronized void lookupCoordinator(FindCoordinatorRequest.CoordinatorType type, String coordinatorKey) { switch (type) { case GROUP: consumerGroupCoordinator = null; @@ -459,7 +459,7 @@ public class TransactionManager { pendingRequests.add(new FindCoordinatorHandler(builder)); } - private void completeTransaction() { + private synchronized void completeTransaction() { transitionTo(State.READY); lastError = null; partitionsInTransaction.clear(); @@ -516,8 +516,10 @@ public class TransactionManager { } void reenqueue() { - this.isRetry = true; - pendingRequests.add(this); + synchronized (TransactionManager.this) { + this.isRetry = true; + pendingRequests.add(this); + } } @Override @@ -534,7 +536,9 @@ public class TransactionManager { fatalError(response.versionMismatch()); } else if (response.hasResponse()) { log.trace("Got transactional response for request:" + requestBuilder()); - handleResponse(response.responseBody()); + synchronized (TransactionManager.this) { + handleResponse(response.responseBody()); + } } else { fatalError(new KafkaException("Could not execute transactional request for unknown reasons")); } http://git-wip-us.apache.org/repos/asf/kafka/blob/c62793ea/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java index 840cb1e..ff93da8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java @@ -24,7 +24,7 @@ public final class TransactionalRequestResult { static final TransactionalRequestResult COMPLETE = new TransactionalRequestResult(new CountDownLatch(0)); private final CountDownLatch latch; - private RuntimeException error = null; + private volatile RuntimeException error = null; public TransactionalRequestResult() { this(new CountDownLatch(1));
