Repository: kafka
Updated Branches:
  refs/heads/trunk dc520d275 -> a8794d8a5


KAFKA-5260; Producer should not send AbortTxn unless transaction has actually 
begun

Keep track of when a transaction has begun by setting a flag, 
`transactionStarted` when a successfull `AddPartitionsToTxnResponse` or 
`AddOffsetsToTxnResponse` had been received. If an `AbortTxnRequest` about to 
be sent and `transactionStarted` is false, don't send the request and 
transition the state to `READY`

Author: Damian Guy <[email protected]>

Reviewers: Apurva Mehta <[email protected]>, Guozhang Wang 
<[email protected]>, Jason Gustafson <[email protected]>

Closes #3126 from dguy/kafka-5260


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a8794d8a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a8794d8a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a8794d8a

Branch: refs/heads/trunk
Commit: a8794d8a5d18bb4eaafceac1ef675243af945862
Parents: dc520d2
Author: Damian Guy <[email protected]>
Authored: Mon May 29 22:52:59 2017 -0700
Committer: Jason Gustafson <[email protected]>
Committed: Mon May 29 22:52:59 2017 -0700

----------------------------------------------------------------------
 .../producer/internals/TransactionManager.java  | 13 ++++
 .../internals/TransactionManagerTest.java       | 64 ++++++++++++++++++++
 .../src/main/scala/kafka/server/KafkaApis.scala |  1 -
 .../kafka/api/AuthorizerIntegrationTest.scala   | 25 +++++++-
 4 files changed, 99 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a8794d8a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index e5c6ec2..ec7ced2 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -79,6 +79,7 @@ public class TransactionManager {
     private volatile State currentState = State.UNINITIALIZED;
     private volatile RuntimeException lastError = null;
     private volatile ProducerIdAndEpoch producerIdAndEpoch;
+    private volatile boolean transactionStarted = false;
 
     private enum State {
         UNINITIALIZED,
@@ -343,7 +344,16 @@ public class TransactionManager {
             return null;
         }
 
+        if (nextRequestHandler != null && nextRequestHandler.isEndTxn() && 
!transactionStarted) {
+            ((EndTxnHandler) nextRequestHandler).result.done();
+            if (currentState != State.FATAL_ERROR) {
+                completeTransaction();
+            }
+            return pendingRequests.poll();
+        }
+
         return nextRequestHandler;
+
     }
 
     synchronized void retry(TxnRequestHandler request) {
@@ -462,6 +472,7 @@ public class TransactionManager {
     private synchronized void completeTransaction() {
         transitionTo(State.READY);
         lastError = null;
+        transactionStarted = false;
         partitionsInTransaction.clear();
     }
 
@@ -686,6 +697,7 @@ public class TransactionManager {
             } else {
                 
partitionsInTransaction.addAll(pendingPartitionsToBeAddedToTransaction);
                 pendingPartitionsToBeAddedToTransaction.clear();
+                transactionStarted = true;
                 result.done();
             }
         }
@@ -831,6 +843,7 @@ public class TransactionManager {
             if (error == Errors.NONE) {
                 // note the result is not completed until the TxnOffsetCommit 
returns
                 pendingRequests.add(txnOffsetCommitHandler(result, offsets, 
builder.consumerGroupId()));
+                transactionStarted = true;
             } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == 
Errors.NOT_COORDINATOR) {
                 
lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, 
transactionalId);
                 reenqueue();

http://git-wip-us.apache.org/repos/asf/kafka/blob/a8794d8a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index e9363d0..a1bd970 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -737,6 +737,70 @@ public class TransactionManagerTest {
         
verifyAddPartitionsFailsWithPartitionLevelError(Errors.TOPIC_AUTHORIZATION_FAILED);
     }
 
+    @Test
+    public void 
shouldNotSendAbortTxnRequestWhenOnlyAddPartitionsRequestFailed() throws 
Exception {
+        client.setNode(brokerNode);
+        // This is called from the initTransactions method in the producer as 
the first order of business.
+        // It finds the coordinator and then gets a PID.
+        final long pid = 13131L;
+        final short epoch = 1;
+        transactionManager.initializeTransactions();
+        prepareFindCoordinatorResponse(Errors.NONE, false, 
FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
+
+        sender.run(time.milliseconds());  // find coordinator
+        sender.run(time.milliseconds());
+
+        prepareInitPidResponse(Errors.NONE, false, pid, epoch);
+        sender.run(time.milliseconds());  // get pid.
+
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+
+        TransactionalRequestResult abortResult = 
transactionManager.beginAbortingTransaction();
+
+        prepareAddPartitionsToTxnResponse(Errors.TOPIC_AUTHORIZATION_FAILED, 
tp0, epoch, pid);
+        sender.run(time.milliseconds());  // Send AddPartitionsRequest
+        assertFalse(abortResult.isCompleted());
+
+        sender.run(time.milliseconds());
+        assertTrue(abortResult.isCompleted());
+        assertTrue(abortResult.isSuccessful());
+    }
+
+    @Test
+    public void shouldNotSendAbortTxnRequestWhenOnlyAddOffsetsRequestFailed() 
throws Exception {
+        client.setNode(brokerNode);
+        // This is called from the initTransactions method in the producer as 
the first order of business.
+        // It finds the coordinator and then gets a PID.
+        final long pid = 13131L;
+        final short epoch = 1;
+        transactionManager.initializeTransactions();
+        prepareFindCoordinatorResponse(Errors.NONE, false, 
FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
+
+        sender.run(time.milliseconds());  // find coordinator
+        sender.run(time.milliseconds());
+
+        prepareInitPidResponse(Errors.NONE, false, pid, epoch);
+        sender.run(time.milliseconds());  // get pid.
+
+        transactionManager.beginTransaction();
+        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        offsets.put(tp1, new OffsetAndMetadata(1));
+        final String consumerGroupId = "myconsumergroup";
+
+        transactionManager.sendOffsetsToTransaction(offsets, consumerGroupId);
+
+        TransactionalRequestResult abortResult = 
transactionManager.beginAbortingTransaction();
+
+        prepareAddOffsetsToTxnResponse(Errors.TOPIC_AUTHORIZATION_FAILED, 
consumerGroupId, pid, epoch);
+        sender.run(time.milliseconds());  // Send AddOffsetsToTxnRequest
+        assertFalse(abortResult.isCompleted());
+
+        sender.run(time.milliseconds());
+        assertTrue(abortResult.isCompleted());
+        assertTrue(abortResult.isSuccessful());
+    }
+
     private void verifyAddPartitionsFailsWithPartitionLevelError(final Errors 
error) throws InterruptedException {
         final long pid = 1L;
         final short epoch = 1;

http://git-wip-us.apache.org/repos/asf/kafka/blob/a8794d8a/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index fb69c50..dd6f18d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1550,7 +1550,6 @@ class KafkaApis(val requestChannel: RequestChannel,
     val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest]
     val transactionalId = addPartitionsToTxnRequest.transactionalId
     val partitionsToAdd = addPartitionsToTxnRequest.partitions
-
     if (!authorize(request.session, Write, new Resource(TransactionalId, 
transactionalId)))
       sendResponseMaybeThrottle(request, requestThrottleMs =>
         addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception))

http://git-wip-us.apache.org/repos/asf/kafka/blob/a8794d8a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index c464834..017c57f 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -14,7 +14,7 @@ package kafka.api
 
 import java.nio.ByteBuffer
 import java.util
-import java.util.concurrent.ExecutionException
+import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
 import java.util.regex.Pattern
 import java.util.{ArrayList, Collections, Properties}
 
@@ -24,7 +24,7 @@ import kafka.server.{BaseRequestTest, KafkaConfig}
 import kafka.utils.TestUtils
 import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.clients.consumer._
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
+import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
@@ -1037,7 +1037,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     producer.initTransactions()
     producer.beginTransaction()
     producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, 
"1".getBytes)).get
-    producer.flush()
     removeAllAcls()
     try {
       producer.commitTransaction()
@@ -1048,6 +1047,26 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   @Test
+  def shouldSuccessfullyAbortTransactionAfterTopicAuthorizationException(): 
Unit = {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), transactionalIdResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), topicResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), new Resource(Topic, deleteTopic))
+    val producer = buildTransactionalProducer()
+    producer.initTransactions()
+    producer.beginTransaction()
+    producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, 
"1".getBytes)).get
+    // try and add a partition resulting in TopicAuthorizationException
+    try {
+      producer.send(new ProducerRecord(deleteTopic, 0, "1".getBytes, 
"1".getBytes)).get
+    } catch {
+      case e : ExecutionException =>
+        assertTrue(e.getCause.isInstanceOf[TopicAuthorizationException])
+    }
+    // now rollback
+    producer.abortTransaction()
+  }
+
+  @Test
   def 
shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessOnSendOffsetsToTxn():
 Unit = {
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), transactionalIdResource)
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Write)), groupResource)

Reply via email to