Repository: kafka Updated Branches: refs/heads/0.11.0 4424534e9 -> 07cfcc53b
http://git-wip-us.apache.org/repos/asf/kafka/blob/07cfcc53/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index 2094528..54246c4 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -98,13 +98,13 @@ class TransactionStateManagerTest { def testAddGetPids() { transactionManager.addLoadedTransactionsToCache(partitionId, coordinatorEpoch, new Pool[String, TransactionMetadata]()) - assertEquals(Right(None), transactionManager.getAndMaybeAddTransactionState(transactionalId1)) + assertEquals(Right(None), transactionManager.getTransactionState(transactionalId1)) assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), - transactionManager.getAndMaybeAddTransactionState(transactionalId1, Some(txnMetadata1))) + transactionManager.putTransactionStateIfNotExists(transactionalId1, txnMetadata1)) assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), - transactionManager.getAndMaybeAddTransactionState(transactionalId1)) + transactionManager.getTransactionState(transactionalId1)) assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), - transactionManager.getAndMaybeAddTransactionState(transactionalId1, Some(txnMetadata2))) + transactionManager.putTransactionStateIfNotExists(transactionalId1, txnMetadata2)) } @Test @@ -160,11 +160,11 @@ class TransactionStateManagerTest { prepareTxnLog(topicPartition, startOffset, records) // this partition should not be part of the owned partitions - transactionManager.getAndMaybeAddTransactionState(transactionalId1).fold( + transactionManager.getTransactionState(transactionalId1).fold( err => assertEquals(Errors.NOT_COORDINATOR, err), _ => fail(transactionalId1 + "'s transaction state is already in the cache") ) - transactionManager.getAndMaybeAddTransactionState(transactionalId2).fold( + transactionManager.getTransactionState(transactionalId2).fold( err => assertEquals(Errors.NOT_COORDINATOR, err), _ => fail(transactionalId2 + "'s transaction state is already in the cache") ) @@ -174,16 +174,16 @@ class TransactionStateManagerTest { // let the time advance to trigger the background thread loading scheduler.tick() - transactionManager.getAndMaybeAddTransactionState(transactionalId1).fold( + transactionManager.getTransactionState(transactionalId1).fold( err => fail(transactionalId1 + "'s transaction state access returns error " + err), entry => entry.getOrElse(fail(transactionalId1 + "'s transaction state was not loaded into the cache")) ) - val cachedPidMetadata1 = transactionManager.getAndMaybeAddTransactionState(transactionalId1).fold( + val cachedPidMetadata1 = transactionManager.getTransactionState(transactionalId1).fold( err => fail(transactionalId1 + "'s transaction state access returns error " + err), entry => entry.getOrElse(fail(transactionalId1 + "'s transaction state was not loaded into the cache")) ) - val cachedPidMetadata2 = transactionManager.getAndMaybeAddTransactionState(transactionalId2).fold( + val cachedPidMetadata2 = transactionManager.getTransactionState(transactionalId2).fold( err => fail(transactionalId2 + "'s transaction state access returns error " + err), entry => entry.getOrElse(fail(transactionalId2 + "'s transaction state was not loaded into the cache")) ) @@ -197,11 +197,11 @@ class TransactionStateManagerTest { // let the time advance to trigger the background thread removing scheduler.tick() - transactionManager.getAndMaybeAddTransactionState(transactionalId1).fold( + transactionManager.getTransactionState(transactionalId1).fold( err => assertEquals(Errors.NOT_COORDINATOR, err), _ => fail(transactionalId1 + "'s transaction state is still in the cache") ) - transactionManager.getAndMaybeAddTransactionState(transactionalId2).fold( + transactionManager.getTransactionState(transactionalId2).fold( err => assertEquals(Errors.NOT_COORDINATOR, err), _ => fail(transactionalId2 + "'s transaction state is still in the cache") ) @@ -212,7 +212,7 @@ class TransactionStateManagerTest { transactionManager.addLoadedTransactionsToCache(partitionId, coordinatorEpoch, new Pool[String, TransactionMetadata]()) // first insert the initial transaction metadata - transactionManager.getAndMaybeAddTransactionState(transactionalId1, Some(txnMetadata1)) + transactionManager.putTransactionStateIfNotExists(transactionalId1, txnMetadata1) prepareForTxnMessageAppend(Errors.NONE) expectedError = Errors.NONE @@ -224,7 +224,7 @@ class TransactionStateManagerTest { // append the new metadata into log transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch, newMetadata, assertCallback) - assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getAndMaybeAddTransactionState(transactionalId1)) + assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1)) assertTrue(txnMetadata1.pendingState.isEmpty) // append to log again with expected failures @@ -236,22 +236,22 @@ class TransactionStateManagerTest { prepareForTxnMessageAppend(Errors.UNKNOWN_TOPIC_OR_PARTITION) transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback) - assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getAndMaybeAddTransactionState(transactionalId1)) + assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1)) assertTrue(txnMetadata1.pendingState.isEmpty) prepareForTxnMessageAppend(Errors.NOT_ENOUGH_REPLICAS) transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback) - assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getAndMaybeAddTransactionState(transactionalId1)) + assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1)) assertTrue(txnMetadata1.pendingState.isEmpty) prepareForTxnMessageAppend(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND) transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback) - assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getAndMaybeAddTransactionState(transactionalId1)) + assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1)) assertTrue(txnMetadata1.pendingState.isEmpty) prepareForTxnMessageAppend(Errors.REQUEST_TIMED_OUT) transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback) - assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getAndMaybeAddTransactionState(transactionalId1)) + assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1)) assertTrue(txnMetadata1.pendingState.isEmpty) // test NOT_COORDINATOR cases @@ -259,7 +259,7 @@ class TransactionStateManagerTest { prepareForTxnMessageAppend(Errors.NOT_LEADER_FOR_PARTITION) transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback) - assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getAndMaybeAddTransactionState(transactionalId1)) + assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1)) assertTrue(txnMetadata1.pendingState.isEmpty) // test Unknown cases @@ -267,12 +267,12 @@ class TransactionStateManagerTest { prepareForTxnMessageAppend(Errors.MESSAGE_TOO_LARGE) transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback) - assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getAndMaybeAddTransactionState(transactionalId1)) + assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1)) assertTrue(txnMetadata1.pendingState.isEmpty) prepareForTxnMessageAppend(Errors.RECORD_LIST_TOO_LARGE) transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback) - assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getAndMaybeAddTransactionState(transactionalId1)) + assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1)) assertTrue(txnMetadata1.pendingState.isEmpty) } @@ -281,7 +281,7 @@ class TransactionStateManagerTest { transactionManager.addLoadedTransactionsToCache(partitionId, 0, new Pool[String, TransactionMetadata]()) // first insert the initial transaction metadata - transactionManager.getAndMaybeAddTransactionState(transactionalId1, Some(txnMetadata1)) + transactionManager.putTransactionStateIfNotExists(transactionalId1, txnMetadata1) prepareForTxnMessageAppend(Errors.NONE) expectedError = Errors.NOT_COORDINATOR @@ -300,7 +300,7 @@ class TransactionStateManagerTest { def testAppendTransactionToLogWhilePendingStateChanged() = { // first insert the initial transaction metadata transactionManager.addLoadedTransactionsToCache(partitionId, coordinatorEpoch, new Pool[String, TransactionMetadata]()) - transactionManager.getAndMaybeAddTransactionState(transactionalId1, Some(txnMetadata1)) + transactionManager.putTransactionStateIfNotExists(transactionalId1, txnMetadata1) prepareForTxnMessageAppend(Errors.NONE) expectedError = Errors.INVALID_PRODUCER_EPOCH @@ -317,7 +317,7 @@ class TransactionStateManagerTest { @Test def shouldReturnNotCooridnatorErrorIfTransactionIdPartitionNotOwned(): Unit = { - transactionManager.getAndMaybeAddTransactionState(transactionalId1).fold( + transactionManager.getTransactionState(transactionalId1).fold( err => assertEquals(Errors.NOT_COORDINATOR, err), _ => fail(transactionalId1 + "'s transaction state is already in the cache") ) @@ -329,12 +329,12 @@ class TransactionStateManagerTest { transactionManager.addLoadedTransactionsToCache(partitionId, 0, new Pool[String, TransactionMetadata]()) } - transactionManager.getAndMaybeAddTransactionState("ongoing", Some(transactionMetadata("ongoing", producerId = 0, state = Ongoing))) - transactionManager.getAndMaybeAddTransactionState("not-expiring", Some(transactionMetadata("not-expiring", producerId = 1, state = Ongoing, txnTimeout = 10000))) - transactionManager.getAndMaybeAddTransactionState("prepare-commit", Some(transactionMetadata("prepare-commit", producerId = 2, state = PrepareCommit))) - transactionManager.getAndMaybeAddTransactionState("prepare-abort", Some(transactionMetadata("prepare-abort", producerId = 3, state = PrepareAbort))) - transactionManager.getAndMaybeAddTransactionState("complete-commit", Some(transactionMetadata("complete-commit", producerId = 4, state = CompleteCommit))) - transactionManager.getAndMaybeAddTransactionState("complete-abort", Some(transactionMetadata("complete-abort", producerId = 5, state = CompleteAbort))) + transactionManager.putTransactionStateIfNotExists("ongoing", transactionMetadata("ongoing", producerId = 0, state = Ongoing)) + transactionManager.putTransactionStateIfNotExists("not-expiring", transactionMetadata("not-expiring", producerId = 1, state = Ongoing, txnTimeout = 10000)) + transactionManager.putTransactionStateIfNotExists("prepare-commit", transactionMetadata("prepare-commit", producerId = 2, state = PrepareCommit)) + transactionManager.putTransactionStateIfNotExists("prepare-abort", transactionMetadata("prepare-abort", producerId = 3, state = PrepareAbort)) + transactionManager.putTransactionStateIfNotExists("complete-commit", transactionMetadata("complete-commit", producerId = 4, state = CompleteCommit)) + transactionManager.putTransactionStateIfNotExists("complete-abort", transactionMetadata("complete-abort", producerId = 5, state = CompleteAbort)) time.sleep(2000) val expiring = transactionManager.timedOutTransactions() @@ -401,7 +401,7 @@ class TransactionStateManagerTest { } private def verifyMetadataDoesExist(transactionalId: String) = { - transactionManager.getAndMaybeAddTransactionState(transactionalId, None) match { + transactionManager.getTransactionState(transactionalId) match { case Left(errors) => fail("shouldn't have been any errors") case Right(None) => fail("metadata should have been removed") case Right(Some(metadata)) => // ok @@ -409,7 +409,7 @@ class TransactionStateManagerTest { } private def verifyMetadataDoesntExist(transactionalId: String) = { - transactionManager.getAndMaybeAddTransactionState(transactionalId, None) match { + transactionManager.getTransactionState(transactionalId) match { case Left(errors) => fail("shouldn't have been any errors") case Right(Some(metdata)) => fail("metadata should have been removed") case Right(None) => // ok @@ -453,10 +453,10 @@ class TransactionStateManagerTest { txnMetadata1.txnLastUpdateTimestamp = time.milliseconds() - txnConfig.transactionalIdExpirationMs txnMetadata1.state = txnState - transactionManager.getAndMaybeAddTransactionState(transactionalId1, Some(txnMetadata1)) + transactionManager.putTransactionStateIfNotExists(transactionalId1, txnMetadata1) txnMetadata2.txnLastUpdateTimestamp = time.milliseconds() - transactionManager.getAndMaybeAddTransactionState(transactionalId2, Some(txnMetadata2)) + transactionManager.putTransactionStateIfNotExists(transactionalId2, txnMetadata2) transactionManager.enableTransactionalIdExpiration() time.sleep(txnConfig.removeExpiredTransactionalIdsIntervalMs) http://git-wip-us.apache.org/repos/asf/kafka/blob/07cfcc53/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala index 7227671..ac1d623 100644 --- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala @@ -79,6 +79,33 @@ class ProducerStateManagerTest extends JUnitSuite { } @Test + def testProducerSequenceWrapAround(): Unit = { + val epoch = 15.toShort + val sequence = Int.MaxValue + val offset = 735L + append(stateManager, producerId, epoch, sequence, offset, isLoadingFromLog = true) + + append(stateManager, producerId, epoch, 0, offset + 500) + + val maybeLastEntry = stateManager.lastEntry(producerId) + assertTrue(maybeLastEntry.isDefined) + + val lastEntry = maybeLastEntry.get + assertEquals(epoch, lastEntry.producerEpoch) + assertEquals(0, lastEntry.firstSeq) + assertEquals(0, lastEntry.lastSeq) + } + + @Test(expected = classOf[OutOfOrderSequenceException]) + def testProducerSequenceInvalidWrapAround(): Unit = { + val epoch = 15.toShort + val sequence = Int.MaxValue + val offset = 735L + append(stateManager, producerId, epoch, sequence, offset, isLoadingFromLog = true) + append(stateManager, producerId, epoch, 1, offset + 500) + } + + @Test def testNoValidationOnFirstEntryWhenLoadingLog(): Unit = { val epoch = 5.toShort val sequence = 16
