Repository: kafka
Updated Branches:
  refs/heads/0.11.0 b8b493091 -> 2af69cc6e


KAFKA-5422; Handle multiple transitions to ABORTABLE_ERROR correctly

Author: Apurva Mehta <[email protected]>

Reviewers: Jason Gustafson <[email protected]>

Closes #3285 from 
apurvam/KAFKA-5422-allow-multiple-transitions-to-abortable-error

(cherry picked from commit 8e7839f610a9278f800ba2f7378fb10abded8f28)
Signed-off-by: Jason Gustafson <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2af69cc6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2af69cc6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2af69cc6

Branch: refs/heads/0.11.0
Commit: 2af69cc6e522dea8d9e879b30a7ba68505e0510c
Parents: b8b4930
Author: Apurva Mehta <[email protected]>
Authored: Fri Jun 9 15:54:23 2017 -0700
Committer: Jason Gustafson <[email protected]>
Committed: Fri Jun 9 15:54:32 2017 -0700

----------------------------------------------------------------------
 .../producer/internals/TransactionManager.java  |  3 +-
 .../internals/TransactionManagerTest.java       | 65 ++++++++++++++++++++
 2 files changed, 67 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2af69cc6/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index a761b31..821c56b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -106,7 +106,8 @@ public class TransactionManager {
                 case ABORTING_TRANSACTION:
                     return source == IN_TRANSACTION || source == 
ABORTABLE_ERROR;
                 case ABORTABLE_ERROR:
-                    return source == IN_TRANSACTION || source == 
COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION;
+                    return source == IN_TRANSACTION || source == 
COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION
+                            || source == ABORTABLE_ERROR;
                 case FATAL_ERROR:
                 default:
                     // We can transition to FATAL_ERROR unconditionally.

http://git-wip-us.apache.org/repos/asf/kafka/blob/2af69cc6/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 22afcea..8d5dbe9 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -1433,6 +1433,71 @@ public class TransactionManagerTest {
     }
 
     @Test
+    public void testTransitionToAbortableErrorOnMultipleBatchExpiry() throws 
InterruptedException, ExecutionException {
+        final long pid = 13131L;
+        final short epoch = 1;
+
+        doInitTransactions(pid, epoch);
+
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+        transactionManager.maybeAddPartitionToTransaction(tp1);
+
+        Future<RecordMetadata> firstBatchResponse = accumulator.append(tp0, 
time.milliseconds(), "key".getBytes(),
+                "value".getBytes(), Record.EMPTY_HEADERS, null, 
MAX_BLOCK_TIMEOUT).future;
+        Future<RecordMetadata> secondBatchResponse = accumulator.append(tp1, 
time.milliseconds(), "key".getBytes(),
+               "value".getBytes(), Record.EMPTY_HEADERS, null, 
MAX_BLOCK_TIMEOUT).future;
+
+        assertFalse(firstBatchResponse.isDone());
+        assertFalse(secondBatchResponse.isDone());
+
+        Map<TopicPartition, Errors> partitionErrors = new HashMap<>();
+        partitionErrors.put(tp0, Errors.NONE);
+        partitionErrors.put(tp1, Errors.NONE);
+        prepareAddPartitionsToTxn(partitionErrors);
+
+        assertFalse(transactionManager.transactionContainsPartition(tp0));
+        assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
+        sender.run(time.milliseconds());  // send addPartitions.
+        // Check that only addPartitions was sent.
+        assertTrue(transactionManager.transactionContainsPartition(tp0));
+        assertTrue(transactionManager.transactionContainsPartition(tp1));
+        assertTrue(transactionManager.isSendToPartitionAllowed(tp1));
+        assertTrue(transactionManager.isSendToPartitionAllowed(tp1));
+        assertFalse(firstBatchResponse.isDone());
+        assertFalse(secondBatchResponse.isDone());
+
+        // Sleep 10 seconds to make sure that the batches in the queue would 
be expired if they can't be drained.
+        time.sleep(10000);
+        // Disconnect the target node for the pending produce request. This 
will ensure that sender will try to
+        // expire the batch.
+        Node clusterNode = this.cluster.nodes().get(0);
+        client.disconnect(clusterNode.idString());
+        client.blackout(clusterNode, 100);
+
+        sender.run(time.milliseconds());  // We should try to flush the 
produce, but expire it instead without sending anything.
+        assertTrue(firstBatchResponse.isDone());
+        assertTrue(secondBatchResponse.isDone());
+
+        try {
+            // make sure the produce was expired.
+            firstBatchResponse.get();
+            fail("Expected to get a TimeoutException since the queued 
ProducerBatch should have been expired");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof  TimeoutException);
+        }
+
+        try {
+            // make sure the produce was expired.
+            secondBatchResponse.get();
+            fail("Expected to get a TimeoutException since the queued 
ProducerBatch should have been expired");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof  TimeoutException);
+        }
+        assertTrue(transactionManager.hasAbortableError());
+    }
+
+    @Test
     public void testDropCommitOnBatchExpiry() throws InterruptedException, 
ExecutionException {
         final long pid = 13131L;
         final short epoch = 1;

Reply via email to