Repository: kafka
Updated Branches:
  refs/heads/0.11.0 1c59aa735 -> 9419204c2 (forced update)


KAFKA-5364; ensurePartitionAdded does not handle pending partitions in 
abortable error state

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Apurva Mehta <apu...@confluent.io>, Guozhang Wang 
<wangg...@gmail.com>, Ismael Juma <ism...@juma.me.uk>

Closes #3231 from hachikuji/KAFKA-5364


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9419204c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9419204c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9419204c

Branch: refs/heads/0.11.0
Commit: 9419204c2b4e63e9b1a47335f16191dc6e23543f
Parents: b9c876e
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue Jun 6 13:57:20 2017 +0100
Committer: Ismael Juma <ism...@juma.me.uk>
Committed: Tue Jun 6 14:00:05 2017 +0100

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   |  16 +-
 .../producer/internals/RecordAccumulator.java   |   4 +-
 .../producer/internals/TransactionManager.java  |  78 +++--
 .../internals/RecordAccumulatorTest.java        |   1 -
 .../clients/producer/internals/SenderTest.java  |   4 +-
 .../internals/TransactionManagerTest.java       | 342 ++++++++++++++++++-
 6 files changed, 381 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9419204c/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 1d16721..0f109d8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -608,7 +608,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      */
     private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, 
Callback callback) {
         if (transactionManager != null)
-            ensureProperTransactionalState();
+            transactionManager.failIfUnreadyForSend();
 
         TopicPartition tp = null;
         try {
@@ -691,20 +691,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> 
{
         }
     }
 
-    private void ensureProperTransactionalState() {
-        if (transactionManager.isTransactional() && 
!transactionManager.hasProducerId())
-            throw new IllegalStateException("Cannot perform a 'send' before 
completing a call to initTransactions " +
-                    "when transactions are enabled.");
-
-        if (transactionManager.hasError()) {
-            Exception lastError = transactionManager.lastError();
-            throw new KafkaException("Cannot perform send because at least one 
previous transactional or " +
-                    "idempotent request has failed with errors.", lastError);
-        }
-        if (transactionManager.isCompletingTransaction())
-            throw new IllegalStateException("Cannot call send while a commit 
or abort is in progress.");
-    }
-
     private void setReadOnly(Headers headers) {
         if (headers instanceof RecordHeaders) {
             ((RecordHeaders) headers).setReadOnly();

http://git-wip-us.apache.org/repos/asf/kafka/blob/9419204c/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 2c4917d..0315b13 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -469,7 +469,7 @@ public final class RecordAccumulator {
                                         ProducerIdAndEpoch producerIdAndEpoch 
= null;
                                         boolean isTransactional = false;
                                         if (transactionManager != null) {
-                                            if 
(!transactionManager.ensurePartitionAdded(tp))
+                                            if 
(!transactionManager.sendToPartitionAllowed(tp))
                                                 break;
 
                                             producerIdAndEpoch = 
transactionManager.producerIdAndEpoch();
@@ -477,7 +477,7 @@ public final class RecordAccumulator {
                                                 // we cannot send the batch 
until we have refreshed the producer id
                                                 break;
 
-                                            isTransactional = 
transactionManager.isInTransaction();
+                                            isTransactional = 
transactionManager.isTransactional();
                                         }
 
                                         ProducerBatch batch = 
deque.pollFirst();

http://git-wip-us.apache.org/repos/asf/kafka/blob/9419204c/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index c081b23..2842cd1 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -205,7 +205,7 @@ public class TransactionManager {
     }
 
     public synchronized TransactionalRequestResult 
sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
-                                                                           
String consumerGroupId) {
+                                                                            
String consumerGroupId) {
         ensureTransactional();
         maybeFailWithError();
         if (currentState != State.IN_TRANSACTION)
@@ -221,34 +221,39 @@ public class TransactionManager {
     }
 
     public synchronized void maybeAddPartitionToTransaction(TopicPartition 
topicPartition) {
-        if (!isInTransaction())
-            throw new IllegalArgumentException("Cannot add partitions to a 
transaction in state " + currentState);
+        if (currentState != State.IN_TRANSACTION)
+            throw new IllegalStateException("Cannot add partitions to a 
transaction in state " + currentState);
 
-        if (partitionsInTransaction.contains(topicPartition))
+        if (partitionsInTransaction.contains(topicPartition) || 
pendingPartitionsInTransaction.contains(topicPartition))
             return;
 
         log.debug("{}Begin adding new partition {} to transaction", logPrefix, 
topicPartition);
         newPartitionsInTransaction.add(topicPartition);
     }
 
-    public RuntimeException lastError() {
+    RuntimeException lastError() {
         return lastError;
     }
 
-    public synchronized boolean ensurePartitionAdded(TopicPartition tp) {
+    public synchronized void failIfUnreadyForSend() {
+        if (hasError())
+            throw new KafkaException("Cannot perform send because at least one 
previous transactional or " +
+                    "idempotent request has failed with errors.", lastError);
+
+        if (isTransactional()) {
+            if (!hasProducerId())
+                throw new IllegalStateException("Cannot perform a 'send' 
before completing a call to initTransactions " +
+                        "when transactions are enabled.");
+
+            if (currentState != State.IN_TRANSACTION)
+                throw new IllegalStateException("Cannot call send in state " + 
currentState);
+        }
+    }
+
+    synchronized boolean sendToPartitionAllowed(TopicPartition tp) {
         if (hasFatalError())
             return false;
-        if (isInTransaction() || hasAbortableError()) {
-            // We should enter this branch in an error state because if this 
partition is already in the transaction,
-            // there is a chance that the corresponding batch is in retry. So 
we must let it completely flush.
-            if (!(partitionsInTransaction.contains(tp) || 
isPartitionPending(tp))) {
-                transitionToFatalError(new IllegalStateException("Attempted to 
dequeue a record batch to send " +
-                        "for partition " + tp + ", which would never be added 
to the transaction."));
-                return false;
-            }
-            return partitionsInTransaction.contains(tp);
-        }
-        return true;
+        return !isTransactional() || partitionsInTransaction.contains(tp);
     }
 
     public String transactionalId() {
@@ -263,26 +268,22 @@ public class TransactionManager {
         return transactionalId != null;
     }
 
-    public synchronized boolean hasPartitionsToAdd() {
+    synchronized boolean hasPartitionsToAdd() {
         return !newPartitionsInTransaction.isEmpty() || 
!pendingPartitionsInTransaction.isEmpty();
     }
 
-    public synchronized boolean isCompletingTransaction() {
+    synchronized boolean isCompletingTransaction() {
         return currentState == State.COMMITTING_TRANSACTION || currentState == 
State.ABORTING_TRANSACTION;
     }
 
-    public synchronized boolean hasError() {
+    synchronized boolean hasError() {
         return currentState == State.ABORTABLE_ERROR || currentState == 
State.FATAL_ERROR;
     }
 
-    public synchronized boolean isAborting() {
+    synchronized boolean isAborting() {
         return currentState == State.ABORTING_TRANSACTION;
     }
 
-    synchronized boolean isInTransaction() {
-        return currentState == State.IN_TRANSACTION || 
isCompletingTransaction();
-    }
-
     synchronized void transitionToAbortableError(RuntimeException exception) {
         transitionTo(State.ABORTABLE_ERROR, exception);
     }
@@ -291,6 +292,16 @@ public class TransactionManager {
         transitionTo(State.FATAL_ERROR, exception);
     }
 
+    // visible for testing
+    synchronized boolean isPartitionAdded(TopicPartition partition) {
+        return partitionsInTransaction.contains(partition);
+    }
+
+    // visible for testing
+    synchronized boolean isPartitionPendingAdd(TopicPartition partition) {
+        return newPartitionsInTransaction.contains(partition) || 
pendingPartitionsInTransaction.contains(partition);
+    }
+
     /**
      * Get the current producer id and epoch without blocking. Callers must 
use {@link ProducerIdAndEpoch#isValid()} to
      * verify that the result is valid.
@@ -437,21 +448,23 @@ public class TransactionManager {
 
     // visible for testing
     synchronized boolean transactionContainsPartition(TopicPartition 
topicPartition) {
-        return isInTransaction() && 
partitionsInTransaction.contains(topicPartition);
+        return partitionsInTransaction.contains(topicPartition);
     }
 
     // visible for testing
     synchronized boolean hasPendingOffsetCommits() {
-        return isInTransaction() && !pendingTxnOffsetCommits.isEmpty();
+        return !pendingTxnOffsetCommits.isEmpty();
     }
 
     // visible for testing
-    synchronized boolean isReady() {
-        return isTransactional() && currentState == State.READY;
+    synchronized boolean hasOngoingTransaction() {
+        // transactions are considered ongoing once started until completion 
or a fatal error
+        return currentState == State.IN_TRANSACTION || 
isCompletingTransaction() || hasAbortableError();
     }
 
-    private synchronized boolean isPartitionPending(TopicPartition tp) {
-        return isInTransaction() && 
(pendingPartitionsInTransaction.contains(tp) || 
newPartitionsInTransaction.contains(tp));
+    // visible for testing
+    synchronized boolean isReady() {
+        return isTransactional() && currentState == State.READY;
     }
 
     private void transitionTo(State target) {
@@ -472,8 +485,7 @@ public class TransactionManager {
         }
 
         if (lastError != null)
-            log.error("{}Transition from state {} to error state {}", 
logPrefix, currentState,
-                    target, lastError);
+            log.debug("{}Transition from state {} to error state {}", 
logPrefix, currentState, target, lastError);
         else
             log.debug("{}Transition from state {} to {}", logPrefix, 
currentState, target);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9419204c/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 2875ba2..7b9f26b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -394,7 +394,6 @@ public class RecordAccumulatorTest {
         accum.abortIncompleteBatches();
         assertEquals(numExceptionReceivedInCallback.get(), 100);
         assertFalse(accum.hasUnsent());
-
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/9419204c/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 927a937..26093d7 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -624,7 +624,7 @@ public class SenderTest {
             assertTrue("Client ready status should be true", 
client.isReady(node, 0L));
 
             responseMap.put(tp, new 
ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L));
-            client.respond(produceRequestMatcher(tp, producerIdAndEpoch, 0, 
txnManager.isInTransaction()),
+            client.respond(produceRequestMatcher(tp, producerIdAndEpoch, 0, 
txnManager.isTransactional()),
                     new ProduceResponse(responseMap));
 
             sender.run(time.milliseconds()); // receive
@@ -640,7 +640,7 @@ public class SenderTest {
             assertTrue("Client ready status should be true", 
client.isReady(node, 0L));
 
             responseMap.put(tp, new 
ProduceResponse.PartitionResponse(Errors.NONE, 1L, 0L));
-            client.respond(produceRequestMatcher(tp, producerIdAndEpoch, 1, 
txnManager.isInTransaction()),
+            client.respond(produceRequestMatcher(tp, producerIdAndEpoch, 1, 
txnManager.isTransactional()),
                     new ProduceResponse(responseMap));
 
             sender.run(time.milliseconds()); // receive

http://git-wip-us.apache.org/repos/asf/kafka/blob/9419204c/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 6e633ec..f661baf 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -123,6 +123,329 @@ public class TransactionManagerTest {
     }
 
     @Test(expected = IllegalStateException.class)
+    public void testFailIfUnreadyForSendNoProducerId() {
+        transactionManager.failIfUnreadyForSend();
+    }
+
+    @Test
+    public void testFailIfUnreadyForSendIdempotentProducer() {
+        TransactionManager idempotentTransactionManager = new 
TransactionManager();
+        idempotentTransactionManager.failIfUnreadyForSend();
+    }
+
+    @Test(expected = KafkaException.class)
+    public void testFailIfUnreadyForSendIdempotentProducerFatalError() {
+        TransactionManager idempotentTransactionManager = new 
TransactionManager();
+        idempotentTransactionManager.transitionToFatalError(new 
KafkaException());
+        idempotentTransactionManager.failIfUnreadyForSend();
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testFailIfUnreadyForSendNoOngoingTransaction() {
+        long pid = 13131L;
+        short epoch = 1;
+        doInitTransactions(pid, epoch);
+        transactionManager.failIfUnreadyForSend();
+    }
+
+    @Test(expected = KafkaException.class)
+    public void testFailIfUnreadyForSendAfterAbortableError() {
+        long pid = 13131L;
+        short epoch = 1;
+        doInitTransactions(pid, epoch);
+        transactionManager.beginTransaction();
+        transactionManager.transitionToAbortableError(new KafkaException());
+        transactionManager.failIfUnreadyForSend();
+    }
+
+    @Test(expected = KafkaException.class)
+    public void testFailIfUnreadyForSendAfterFatalError() {
+        long pid = 13131L;
+        short epoch = 1;
+        doInitTransactions(pid, epoch);
+        transactionManager.transitionToFatalError(new KafkaException());
+        transactionManager.failIfUnreadyForSend();
+    }
+
+    @Test
+    public void testHasOngoingTransactionSuccessfulAbort() {
+        long pid = 13131L;
+        short epoch = 1;
+        TopicPartition partition = new TopicPartition("foo", 0);
+
+        assertFalse(transactionManager.hasOngoingTransaction());
+        doInitTransactions(pid, epoch);
+        assertFalse(transactionManager.hasOngoingTransaction());
+
+        transactionManager.beginTransaction();
+        assertTrue(transactionManager.hasOngoingTransaction());
+
+        transactionManager.maybeAddPartitionToTransaction(partition);
+        assertTrue(transactionManager.hasOngoingTransaction());
+
+        prepareAddPartitionsToTxn(partition, Errors.NONE);
+        sender.run(time.milliseconds());
+
+        transactionManager.beginAbortingTransaction();
+        assertTrue(transactionManager.hasOngoingTransaction());
+
+        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, 
epoch);
+        sender.run(time.milliseconds());
+        assertFalse(transactionManager.hasOngoingTransaction());
+    }
+
+    @Test
+    public void testHasOngoingTransactionSuccessfulCommit() {
+        long pid = 13131L;
+        short epoch = 1;
+        TopicPartition partition = new TopicPartition("foo", 0);
+
+        assertFalse(transactionManager.hasOngoingTransaction());
+        doInitTransactions(pid, epoch);
+        assertFalse(transactionManager.hasOngoingTransaction());
+
+        transactionManager.beginTransaction();
+        assertTrue(transactionManager.hasOngoingTransaction());
+
+        transactionManager.maybeAddPartitionToTransaction(partition);
+        assertTrue(transactionManager.hasOngoingTransaction());
+
+        prepareAddPartitionsToTxn(partition, Errors.NONE);
+        sender.run(time.milliseconds());
+
+        transactionManager.beginCommittingTransaction();
+        assertTrue(transactionManager.hasOngoingTransaction());
+
+        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, 
epoch);
+        sender.run(time.milliseconds());
+        assertFalse(transactionManager.hasOngoingTransaction());
+    }
+
+    @Test
+    public void testHasOngoingTransactionAbortableError() {
+        long pid = 13131L;
+        short epoch = 1;
+        TopicPartition partition = new TopicPartition("foo", 0);
+
+        assertFalse(transactionManager.hasOngoingTransaction());
+        doInitTransactions(pid, epoch);
+        assertFalse(transactionManager.hasOngoingTransaction());
+
+        transactionManager.beginTransaction();
+        assertTrue(transactionManager.hasOngoingTransaction());
+
+        transactionManager.maybeAddPartitionToTransaction(partition);
+        assertTrue(transactionManager.hasOngoingTransaction());
+
+        prepareAddPartitionsToTxn(partition, Errors.NONE);
+        sender.run(time.milliseconds());
+
+        transactionManager.transitionToAbortableError(new KafkaException());
+        assertTrue(transactionManager.hasOngoingTransaction());
+
+        transactionManager.beginAbortingTransaction();
+        assertTrue(transactionManager.hasOngoingTransaction());
+
+        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, 
epoch);
+        sender.run(time.milliseconds());
+        assertFalse(transactionManager.hasOngoingTransaction());
+    }
+
+    @Test
+    public void testHasOngoingTransactionFatalError() {
+        long pid = 13131L;
+        short epoch = 1;
+        TopicPartition partition = new TopicPartition("foo", 0);
+
+        assertFalse(transactionManager.hasOngoingTransaction());
+        doInitTransactions(pid, epoch);
+        assertFalse(transactionManager.hasOngoingTransaction());
+
+        transactionManager.beginTransaction();
+        assertTrue(transactionManager.hasOngoingTransaction());
+
+        transactionManager.maybeAddPartitionToTransaction(partition);
+        assertTrue(transactionManager.hasOngoingTransaction());
+
+        prepareAddPartitionsToTxn(partition, Errors.NONE);
+        sender.run(time.milliseconds());
+
+        transactionManager.transitionToFatalError(new KafkaException());
+        assertFalse(transactionManager.hasOngoingTransaction());
+    }
+
+    @Test
+    public void testMaybeAddPartitionToTransaction() {
+        long pid = 13131L;
+        short epoch = 1;
+        TopicPartition partition = new TopicPartition("foo", 0);
+        doInitTransactions(pid, epoch);
+        transactionManager.beginTransaction();
+
+        transactionManager.maybeAddPartitionToTransaction(partition);
+        assertTrue(transactionManager.hasPartitionsToAdd());
+        assertFalse(transactionManager.isPartitionAdded(partition));
+        assertTrue(transactionManager.isPartitionPendingAdd(partition));
+
+        prepareAddPartitionsToTxn(partition, Errors.NONE);
+        sender.run(time.milliseconds());
+
+        assertFalse(transactionManager.hasPartitionsToAdd());
+        assertTrue(transactionManager.isPartitionAdded(partition));
+        assertFalse(transactionManager.isPartitionPendingAdd(partition));
+
+        // adding the partition again should not have any effect
+        transactionManager.maybeAddPartitionToTransaction(partition);
+        assertFalse(transactionManager.hasPartitionsToAdd());
+        assertTrue(transactionManager.isPartitionAdded(partition));
+        assertFalse(transactionManager.isPartitionPendingAdd(partition));
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testMaybeAddPartitionToTransactionBeforeInitTransactions() {
+        transactionManager.maybeAddPartitionToTransaction(new 
TopicPartition("foo", 0));
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testMaybeAddPartitionToTransactionBeforeBeginTransaction() {
+        long pid = 13131L;
+        short epoch = 1;
+        doInitTransactions(pid, epoch);
+        transactionManager.maybeAddPartitionToTransaction(new 
TopicPartition("foo", 0));
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testMaybeAddPartitionToTransactionAfterAbortableError() {
+        long pid = 13131L;
+        short epoch = 1;
+        doInitTransactions(pid, epoch);
+        transactionManager.beginTransaction();
+        transactionManager.transitionToAbortableError(new KafkaException());
+        transactionManager.maybeAddPartitionToTransaction(new 
TopicPartition("foo", 0));
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testMaybeAddPartitionToTransactionAfterFatalError() {
+        long pid = 13131L;
+        short epoch = 1;
+        doInitTransactions(pid, epoch);
+        transactionManager.transitionToFatalError(new KafkaException());
+        transactionManager.maybeAddPartitionToTransaction(new 
TopicPartition("foo", 0));
+    }
+
+    @Test
+    public void 
testSendToPartitionAllowedWithPendingPartitionAfterAbortableError() {
+        final long pid = 13131L;
+        final short epoch = 1;
+
+        doInitTransactions(pid, epoch);
+
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+        transactionManager.transitionToAbortableError(new KafkaException());
+
+        assertFalse(transactionManager.sendToPartitionAllowed(tp0));
+        assertTrue(transactionManager.hasAbortableError());
+    }
+
+    @Test
+    public void 
testSendToPartitionAllowedWithInFlightPartitionAddAfterAbortableError() {
+        final long pid = 13131L;
+        final short epoch = 1;
+
+        doInitTransactions(pid, epoch);
+
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+
+        // Send the AddPartitionsToTxn request and leave it in-flight
+        sender.run(time.milliseconds());
+        transactionManager.transitionToAbortableError(new KafkaException());
+
+        assertFalse(transactionManager.sendToPartitionAllowed(tp0));
+        assertTrue(transactionManager.hasAbortableError());
+    }
+
+    @Test
+    public void 
testSendToPartitionAllowedWithPendingPartitionAfterFatalError() {
+        final long pid = 13131L;
+        final short epoch = 1;
+
+        doInitTransactions(pid, epoch);
+
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+        transactionManager.transitionToFatalError(new KafkaException());
+
+        assertFalse(transactionManager.sendToPartitionAllowed(tp0));
+        assertTrue(transactionManager.hasFatalError());
+    }
+
+    @Test
+    public void 
testSendToPartitionAllowedWithInFlightPartitionAddAfterFatalError() {
+        final long pid = 13131L;
+        final short epoch = 1;
+
+        doInitTransactions(pid, epoch);
+
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+
+        // Send the AddPartitionsToTxn request and leave it in-flight
+        sender.run(time.milliseconds());
+        transactionManager.transitionToFatalError(new KafkaException());
+
+        assertFalse(transactionManager.sendToPartitionAllowed(tp0));
+        assertTrue(transactionManager.hasFatalError());
+    }
+
+    @Test
+    public void 
testSendToPartitionAllowedWithAddedPartitionAfterAbortableError() {
+        final long pid = 13131L;
+        final short epoch = 1;
+
+        doInitTransactions(pid, epoch);
+
+        transactionManager.beginTransaction();
+
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+        prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
+        sender.run(time.milliseconds());
+        assertFalse(transactionManager.hasPartitionsToAdd());
+        transactionManager.transitionToAbortableError(new KafkaException());
+
+        assertTrue(transactionManager.sendToPartitionAllowed(tp0));
+        assertTrue(transactionManager.hasAbortableError());
+    }
+
+    @Test
+    public void testSendToPartitionAllowedWithAddedPartitionAfterFatalError() {
+        final long pid = 13131L;
+        final short epoch = 1;
+
+        doInitTransactions(pid, epoch);
+
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+        prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
+        sender.run(time.milliseconds());
+        assertFalse(transactionManager.hasPartitionsToAdd());
+        transactionManager.transitionToFatalError(new KafkaException());
+
+        assertFalse(transactionManager.sendToPartitionAllowed(tp0));
+        assertTrue(transactionManager.hasFatalError());
+    }
+
+    @Test
+    public void testSendToPartitionAllowedWithUnaddedPartition() {
+        final long pid = 13131L;
+        final short epoch = 1;
+        doInitTransactions(pid, epoch);
+        transactionManager.beginTransaction();
+        assertFalse(transactionManager.sendToPartitionAllowed(tp0));
+    }
+
+    @Test(expected = IllegalStateException.class)
     public void testInvalidSequenceIncrement() {
         TransactionManager transactionManager = new TransactionManager();
         transactionManager.incrementSequenceNumber(tp0, 3333);
@@ -148,8 +471,6 @@ public class TransactionManagerTest {
 
     @Test
     public void testBasicTransaction() throws InterruptedException {
-        // This is called from the initTransactions method in the producer as 
the first order of business.
-        // It finds the coordinator and then gets a PID.
         final long pid = 13131L;
         final short epoch = 1;
 
@@ -166,11 +487,11 @@ public class TransactionManagerTest {
 
         prepareProduceResponse(Errors.NONE, pid, epoch);
         assertFalse(transactionManager.transactionContainsPartition(tp0));
-        assertFalse(transactionManager.ensurePartitionAdded(tp0));
+        assertFalse(transactionManager.sendToPartitionAllowed(tp0));
         sender.run(time.milliseconds());  // send addPartitions.
         // Check that only addPartitions was sent.
         assertTrue(transactionManager.transactionContainsPartition(tp0));
-        assertTrue(transactionManager.ensurePartitionAdded(tp0));
+        assertTrue(transactionManager.sendToPartitionAllowed(tp0));
         assertFalse(responseFuture.isDone());
 
         sender.run(time.milliseconds());  // send produce request.
@@ -210,7 +531,7 @@ public class TransactionManagerTest {
         prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, 
epoch);
         sender.run(time.milliseconds());  // commit.
 
-        assertFalse(transactionManager.isInTransaction());
+        assertFalse(transactionManager.hasOngoingTransaction());
         assertFalse(transactionManager.isCompletingTransaction());
         assertFalse(transactionManager.transactionContainsPartition(tp0));
     }
@@ -507,12 +828,12 @@ public class TransactionManagerTest {
 
         prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, 
epoch);
         assertFalse(commitResult.isCompleted());
-        assertTrue(transactionManager.isInTransaction());
+        assertTrue(transactionManager.hasOngoingTransaction());
         assertTrue(transactionManager.isCompletingTransaction());
 
         sender.run(time.milliseconds());
         assertTrue(commitResult.isCompleted());
-        assertFalse(transactionManager.isInTransaction());
+        assertFalse(transactionManager.hasOngoingTransaction());
     }
 
     @Test
@@ -910,8 +1231,8 @@ public class TransactionManagerTest {
         accumulator.append(tp1, time.milliseconds(), "key".getBytes(),
                 "value".getBytes(), Record.EMPTY_HEADERS, null, 
MAX_BLOCK_TIMEOUT);
 
-        assertFalse(transactionManager.ensurePartitionAdded(tp0));
-        assertFalse(transactionManager.ensurePartitionAdded(tp1));
+        assertFalse(transactionManager.sendToPartitionAllowed(tp0));
+        assertFalse(transactionManager.sendToPartitionAllowed(tp1));
 
         Node node1 = new Node(0, "localhost", 1111);
         Node node2 = new Node(1, "localhost", 1112);
@@ -951,7 +1272,7 @@ public class TransactionManagerTest {
         sender.run(time.milliseconds());  // Send AddPartitions, should be in 
abortable state.
 
         assertTrue(transactionManager.hasAbortableError());
-        assertTrue(transactionManager.ensurePartitionAdded(tp1));
+        assertTrue(transactionManager.sendToPartitionAllowed(tp1));
 
         // Try to drain a message destined for tp1, it should get drained.
         Node node1 = new Node(1, "localhost", 1112);
@@ -993,7 +1314,6 @@ public class TransactionManagerTest {
         // We shouldn't drain batches which haven't been added to the 
transaction yet.
         assertTrue(drainedBatches.containsKey(node1.id()));
         assertTrue(drainedBatches.get(node1.id()).isEmpty());
-        assertTrue(transactionManager.hasFatalError());
     }
 
     private void verifyAddPartitionsFailsWithPartitionLevelError(final Errors 
error) throws InterruptedException {

Reply via email to