Repository: kafka Updated Branches: refs/heads/trunk dc520d275 -> a8794d8a5
KAFKA-5260; Producer should not send AbortTxn unless transaction has actually begun Keep track of when a transaction has begun by setting a flag, `transactionStarted` when a successfull `AddPartitionsToTxnResponse` or `AddOffsetsToTxnResponse` had been received. If an `AbortTxnRequest` about to be sent and `transactionStarted` is false, don't send the request and transition the state to `READY` Author: Damian Guy <[email protected]> Reviewers: Apurva Mehta <[email protected]>, Guozhang Wang <[email protected]>, Jason Gustafson <[email protected]> Closes #3126 from dguy/kafka-5260 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a8794d8a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a8794d8a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a8794d8a Branch: refs/heads/trunk Commit: a8794d8a5d18bb4eaafceac1ef675243af945862 Parents: dc520d2 Author: Damian Guy <[email protected]> Authored: Mon May 29 22:52:59 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Mon May 29 22:52:59 2017 -0700 ---------------------------------------------------------------------- .../producer/internals/TransactionManager.java | 13 ++++ .../internals/TransactionManagerTest.java | 64 ++++++++++++++++++++ .../src/main/scala/kafka/server/KafkaApis.scala | 1 - .../kafka/api/AuthorizerIntegrationTest.scala | 25 +++++++- 4 files changed, 99 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/a8794d8a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index e5c6ec2..ec7ced2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -79,6 +79,7 @@ public class TransactionManager { private volatile State currentState = State.UNINITIALIZED; private volatile RuntimeException lastError = null; private volatile ProducerIdAndEpoch producerIdAndEpoch; + private volatile boolean transactionStarted = false; private enum State { UNINITIALIZED, @@ -343,7 +344,16 @@ public class TransactionManager { return null; } + if (nextRequestHandler != null && nextRequestHandler.isEndTxn() && !transactionStarted) { + ((EndTxnHandler) nextRequestHandler).result.done(); + if (currentState != State.FATAL_ERROR) { + completeTransaction(); + } + return pendingRequests.poll(); + } + return nextRequestHandler; + } synchronized void retry(TxnRequestHandler request) { @@ -462,6 +472,7 @@ public class TransactionManager { private synchronized void completeTransaction() { transitionTo(State.READY); lastError = null; + transactionStarted = false; partitionsInTransaction.clear(); } @@ -686,6 +697,7 @@ public class TransactionManager { } else { partitionsInTransaction.addAll(pendingPartitionsToBeAddedToTransaction); pendingPartitionsToBeAddedToTransaction.clear(); + transactionStarted = true; result.done(); } } @@ -831,6 +843,7 @@ public class TransactionManager { if (error == Errors.NONE) { // note the result is not completed until the TxnOffsetCommit returns pendingRequests.add(txnOffsetCommitHandler(result, offsets, builder.consumerGroupId())); + transactionStarted = true; } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) { lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); reenqueue(); http://git-wip-us.apache.org/repos/asf/kafka/blob/a8794d8a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index e9363d0..a1bd970 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -737,6 +737,70 @@ public class TransactionManagerTest { verifyAddPartitionsFailsWithPartitionLevelError(Errors.TOPIC_AUTHORIZATION_FAILED); } + @Test + public void shouldNotSendAbortTxnRequestWhenOnlyAddPartitionsRequestFailed() throws Exception { + client.setNode(brokerNode); + // This is called from the initTransactions method in the producer as the first order of business. + // It finds the coordinator and then gets a PID. + final long pid = 13131L; + final short epoch = 1; + transactionManager.initializeTransactions(); + prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); + + sender.run(time.milliseconds()); // find coordinator + sender.run(time.milliseconds()); + + prepareInitPidResponse(Errors.NONE, false, pid, epoch); + sender.run(time.milliseconds()); // get pid. + + transactionManager.beginTransaction(); + transactionManager.maybeAddPartitionToTransaction(tp0); + + TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction(); + + prepareAddPartitionsToTxnResponse(Errors.TOPIC_AUTHORIZATION_FAILED, tp0, epoch, pid); + sender.run(time.milliseconds()); // Send AddPartitionsRequest + assertFalse(abortResult.isCompleted()); + + sender.run(time.milliseconds()); + assertTrue(abortResult.isCompleted()); + assertTrue(abortResult.isSuccessful()); + } + + @Test + public void shouldNotSendAbortTxnRequestWhenOnlyAddOffsetsRequestFailed() throws Exception { + client.setNode(brokerNode); + // This is called from the initTransactions method in the producer as the first order of business. + // It finds the coordinator and then gets a PID. + final long pid = 13131L; + final short epoch = 1; + transactionManager.initializeTransactions(); + prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); + + sender.run(time.milliseconds()); // find coordinator + sender.run(time.milliseconds()); + + prepareInitPidResponse(Errors.NONE, false, pid, epoch); + sender.run(time.milliseconds()); // get pid. + + transactionManager.beginTransaction(); + Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); + offsets.put(tp1, new OffsetAndMetadata(1)); + final String consumerGroupId = "myconsumergroup"; + + transactionManager.sendOffsetsToTransaction(offsets, consumerGroupId); + + TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction(); + + prepareAddOffsetsToTxnResponse(Errors.TOPIC_AUTHORIZATION_FAILED, consumerGroupId, pid, epoch); + sender.run(time.milliseconds()); // Send AddOffsetsToTxnRequest + assertFalse(abortResult.isCompleted()); + + sender.run(time.milliseconds()); + assertTrue(abortResult.isCompleted()); + assertTrue(abortResult.isSuccessful()); + } + private void verifyAddPartitionsFailsWithPartitionLevelError(final Errors error) throws InterruptedException { final long pid = 1L; final short epoch = 1; http://git-wip-us.apache.org/repos/asf/kafka/blob/a8794d8a/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index fb69c50..dd6f18d 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1550,7 +1550,6 @@ class KafkaApis(val requestChannel: RequestChannel, val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest] val transactionalId = addPartitionsToTxnRequest.transactionalId val partitionsToAdd = addPartitionsToTxnRequest.partitions - if (!authorize(request.session, Write, new Resource(TransactionalId, transactionalId))) sendResponseMaybeThrottle(request, requestThrottleMs => addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)) http://git-wip-us.apache.org/repos/asf/kafka/blob/a8794d8a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index c464834..017c57f 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -14,7 +14,7 @@ package kafka.api import java.nio.ByteBuffer import java.util -import java.util.concurrent.ExecutionException +import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit} import java.util.regex.Pattern import java.util.{ArrayList, Collections, Properties} @@ -24,7 +24,7 @@ import kafka.server.{BaseRequestTest, KafkaConfig} import kafka.utils.TestUtils import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener import org.apache.kafka.clients.consumer._ -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} +import org.apache.kafka.clients.producer._ import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol} @@ -1037,7 +1037,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest { producer.initTransactions() producer.beginTransaction() producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, "1".getBytes)).get - producer.flush() removeAllAcls() try { producer.commitTransaction() @@ -1048,6 +1047,26 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } @Test + def shouldSuccessfullyAbortTransactionAfterTopicAuthorizationException(): Unit = { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), new Resource(Topic, deleteTopic)) + val producer = buildTransactionalProducer() + producer.initTransactions() + producer.beginTransaction() + producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, "1".getBytes)).get + // try and add a partition resulting in TopicAuthorizationException + try { + producer.send(new ProducerRecord(deleteTopic, 0, "1".getBytes, "1".getBytes)).get + } catch { + case e : ExecutionException => + assertTrue(e.getCause.isInstanceOf[TopicAuthorizationException]) + } + // now rollback + producer.abortTransaction() + } + + @Test def shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessOnSendOffsetsToTxn(): Unit = { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource) addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), groupResource)
