Repository: kafka Updated Branches: refs/heads/trunk d662b09c9 -> 34e379f10
KAFKA-5171; TC should not accept empty string transactional id This is a rebase version of [PR#2973](https://github.com/apache/kafka/pull/2973). guozhangwang , please review this updated PR. Author: umesh chaudhary <[email protected]> Reviewers: Guozhang Wang <[email protected]>, Jason Gustafson <[email protected]> Closes #3086 from umesh9794/mylocal Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/34e379f1 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/34e379f1 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/34e379f1 Branch: refs/heads/trunk Commit: 34e379f10779bfb17fb399fde0357d17dc34ab62 Parents: d662b09 Author: umesh chaudhary <[email protected]> Authored: Thu May 18 11:38:44 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Thu May 18 11:38:44 2017 -0700 ---------------------------------------------------------------------- .../coordinator/transaction/TransactionCoordinator.scala | 9 +++++++-- .../transaction/TransactionCoordinatorTest.scala | 6 +++--- 2 files changed, 10 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/34e379f1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala index 8de6dbd..491e16a 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala @@ -93,11 +93,16 @@ class TransactionCoordinator(brokerId: Int, def handleInitProducerId(transactionalId: String, transactionTimeoutMs: Int, responseCallback: InitProducerIdCallback): Unit = { - if (transactionalId == null || transactionalId.isEmpty) { - // if the transactional id is not specified, then always blindly accept the request + + if (transactionalId == null) { + // if the transactional id is null, then always blindly accept the request // and return a new producerId from the producerId manager val producerId = producerIdManager.generateProducerId() responseCallback(InitProducerIdResult(producerId, producerEpoch = 0, Errors.NONE)) + } else if (transactionalId.isEmpty) { + //If transactional id is empty then return error as invalid request. This is + // to make TransactionCoordinator's behavior consistent with producer client + responseCallback(initTransactionError(Errors.INVALID_REQUEST)) } else if (!txnManager.isCoordinatorFor(transactionalId)) { // check if it is the assigned coordinator for the transactional id responseCallback(initTransactionError(Errors.NOT_COORDINATOR)) http://git-wip-us.apache.org/repos/asf/kafka/blob/34e379f1/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala index 04f76bd..43ad7a7 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala @@ -86,14 +86,14 @@ class TransactionCoordinatorTest { } @Test - def shouldAcceptInitPidAndReturnNextPidWhenTransactionalIdIsEmpty(): Unit = { + def shouldReturnInvalidRequestWhenTransactionalIdIsEmpty(): Unit = { mockPidManager() EasyMock.replay(pidManager) coordinator.handleInitProducerId("", txnTimeoutMs, initProducerIdMockCallback) - assertEquals(InitProducerIdResult(0L, 0, Errors.NONE), result) + assertEquals(InitProducerIdResult(-1L, -1, Errors.INVALID_REQUEST), result) coordinator.handleInitProducerId("", txnTimeoutMs, initProducerIdMockCallback) - assertEquals(InitProducerIdResult(1L, 0, Errors.NONE), result) + assertEquals(InitProducerIdResult(-1L, -1, Errors.INVALID_REQUEST), result) } @Test
