Repository: kafka Updated Branches: refs/heads/trunk 45f226176 -> d662b09c9
KAFKA-5268; Fix bounce test transient failure by clearing partitions before writing Complete state to transaction log Author: Jason Gustafson <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #3089 from hachikuji/KAFKA-5268 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d662b09c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d662b09c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d662b09c Branch: refs/heads/trunk Commit: d662b09c9f32d7d4dcfc18522a4d2789b43d319c Parents: 45f2261 Author: Jason Gustafson <[email protected]> Authored: Thu May 18 11:17:30 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Thu May 18 11:17:30 2017 -0700 ---------------------------------------------------------------------- .../coordinator/transaction/TransactionCoordinator.scala | 2 +- .../kafka/coordinator/transaction/TransactionMetadata.scala | 2 +- .../coordinator/transaction/TransactionCoordinatorTest.scala | 8 +++++++- 3 files changed, 9 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d662b09c/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala index ebfbde5..8de6dbd 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala @@ -243,7 +243,7 @@ class TransactionCoordinator(brokerId: Int, Left(Errors.CONCURRENT_TRANSACTIONS) } else if (txnMetadata.state == PrepareCommit || txnMetadata.state == PrepareAbort) { Left(Errors.CONCURRENT_TRANSACTIONS) - } else if (partitions.subsetOf(txnMetadata.topicPartitions)) { + } else if (txnMetadata.state == Ongoing && partitions.subsetOf(txnMetadata.topicPartitions)) { // this is an optimization: if the partitions are already in the metadata reply OK immediately Left(Errors.NONE) } else { http://git-wip-us.apache.org/repos/asf/kafka/blob/d662b09c/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala index d739b9a..6e29308 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala @@ -185,7 +185,7 @@ private[transaction] class TransactionMetadata(val producerId: Long, def prepareComplete(updateTimestamp: Long): TransactionMetadataTransition = { val newState = if (state == PrepareCommit) CompleteCommit else CompleteAbort - prepareTransitionTo(newState, producerEpoch, txnTimeoutMs, topicPartitions.toSet, txnStartTimestamp, updateTimestamp) + prepareTransitionTo(newState, producerEpoch, txnTimeoutMs, Set.empty[TopicPartition], txnStartTimestamp, updateTimestamp) } // visible for testing only http://git-wip-us.apache.org/repos/asf/kafka/blob/d662b09c/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala index 7271edd..04f76bd 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala @@ -672,7 +672,13 @@ class TransactionCoordinatorTest { .andReturn(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, prepareMetadata))) .once() - val newMetadata = prepareMetadata.copy().prepareComplete(now) + val newMetadata = TransactionMetadataTransition(producerId = pid, + producerEpoch = epoch, + txnTimeoutMs = txnTimeoutMs, + txnState = finalState, + topicPartitions = Set.empty[TopicPartition], + txnStartTimestamp = prepareMetadata.txnStartTimestamp, + txnLastUpdateTimestamp = now) EasyMock.expect(transactionMarkerChannelManager.addTxnMarkersToSend( EasyMock.eq(transactionalId), EasyMock.eq(coordinatorEpoch),
