Repository: kafka
Updated Branches:
  refs/heads/trunk 338857e1c -> 8b04d8ba0


KAFKA-5269; Retry on unknown topic/partition error in transactional requests

We should retry AddPartitionsToTxnRequest and TxnOffsetCommitRequest when 
receiving an UNKNOWN_TOPIC_OR_PARTITION error.

As described in the JIRA: It turns out that the `UNKNOWN_TOPIC_OR_PARTITION` is 
returned from the request handler in KafkaAPis for the AddPartitionsToTxn and 
the TxnOffsetCommitRequest when the broker's metadata doesn't contain one or 
more partitions in the request. This can happen for instance when the broker is 
bounced and has not received the cluster metadata yet.

We should retry in these cases, as this is the model followed by the consumer 
when committing offsets, and by the producer with a ProduceRequest.

Author: Apurva Mehta <[email protected]>

Reviewers: Guozhang Wang <[email protected]>, Jason Gustafson 
<[email protected]>

Closes #3094 from 
apurvam/KAFKA-5269-handle-unknown-topic-partition-in-transaction-manager


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8b04d8ba
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8b04d8ba
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8b04d8ba

Branch: refs/heads/trunk
Commit: 8b04d8ba071fbcafc72a17d4cbfe6f00613e59b3
Parents: 338857e
Author: Apurva Mehta <[email protected]>
Authored: Fri May 19 18:51:37 2017 -0700
Committer: Jason Gustafson <[email protected]>
Committed: Fri May 19 18:51:42 2017 -0700

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   |   3 +-
 .../producer/internals/TransactionManager.java  |  10 +-
 .../apache/kafka/common/protocol/Errors.java    |   3 +-
 .../internals/TransactionManagerTest.java       | 154 +++++++++++++++----
 .../kafka/api/TransactionsBounceTest.scala      |   4 +-
 5 files changed, 140 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8b04d8ba/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 1ba13b2..ac0169a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -51,6 +51,7 @@ import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.AbstractRecords;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.RecordBatch;
@@ -696,7 +697,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             throw new IllegalStateException("Cannot perform a 'send' before 
completing a call to initTransactions when transactions are enabled.");
 
         if (transactionManager.isFenced())
-            throw new ProducerFencedException("The current producer has been 
fenced off by a another producer using the same transactional id.");
+            throw Errors.INVALID_PRODUCER_EPOCH.exception();
 
         if (transactionManager.isInErrorState()) {
             String errorMessage =

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b04d8ba/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 55c1782..c6787f2 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -179,7 +179,7 @@ public class TransactionManager {
     public synchronized TransactionalRequestResult beginAbortingTransaction() {
         ensureTransactional();
         if (isFenced())
-            throw new ProducerFencedException("There is a newer producer using 
the same transactional.id.");
+            throw Errors.INVALID_PRODUCER_EPOCH.exception();
         transitionTo(State.ABORTING_TRANSACTION);
         return beginCompletingTransaction(false);
     }
@@ -424,7 +424,7 @@ public class TransactionManager {
 
     private void maybeFailWithError() {
         if (isFenced())
-            throw new ProducerFencedException("There is a newer producer 
instance using the same transactional id.");
+            throw Errors.INVALID_PRODUCER_EPOCH.exception();
         if (isInErrorState()) {
             String errorMessage = "Cannot execute transactional method because 
we are in an error state.";
             if (lastError != null)
@@ -631,12 +631,12 @@ public class TransactionManager {
                 if (error == Errors.NONE || error == null) {
                     continue;
                 }
-
                 if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == 
Errors.NOT_COORDINATOR) {
                     
lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, 
transactionalId);
                     reenqueue();
                     return;
-                } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || 
error == Errors.CONCURRENT_TRANSACTIONS) {
+                } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || 
error == Errors.CONCURRENT_TRANSACTIONS
+                        || error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                     reenqueue();
                     return;
                 } else if (error == Errors.INVALID_PRODUCER_EPOCH) {
@@ -848,6 +848,8 @@ public class TransactionManager {
                         coordinatorReloaded = true;
                         
lookupCoordinator(FindCoordinatorRequest.CoordinatorType.GROUP, 
builder.consumerGroupId());
                     }
+                } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+                    hadFailure = true;
                 } else if (error == Errors.INVALID_PRODUCER_EPOCH) {
                     fenced();
                     return;

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b04d8ba/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index db94b2c..f94fb4d 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -429,7 +429,8 @@ public enum Errors {
                 return new DuplicateSequenceNumberException(message);
             }
         }),
-    INVALID_PRODUCER_EPOCH(47, "Producer attempted an operation with an old 
epoch",
+    INVALID_PRODUCER_EPOCH(47, "Producer attempted an operation with an old 
epoch. Either there is a newer producer " +
+            "with the same transactionalId, or the producer's transaction has 
been expired by the broker.",
         new ApiExceptionBuilder() {
             @Override
             public ApiException build(String message) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b04d8ba/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 6a35061..fcf0488 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -190,17 +190,7 @@ public class TransactionManagerTest {
 
         assertFalse(transactionManager.hasPendingOffsetCommits());
 
-        client.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                AddOffsetsToTxnRequest addOffsetsToTxnRequest = 
(AddOffsetsToTxnRequest) body;
-                assertEquals(consumerGroupId, 
addOffsetsToTxnRequest.consumerGroupId());
-                assertEquals(transactionalId, 
addOffsetsToTxnRequest.transactionalId());
-                assertEquals(pid, addOffsetsToTxnRequest.producerId());
-                assertEquals(epoch, addOffsetsToTxnRequest.producerEpoch());
-                return true;
-            }
-        }, new AddOffsetsToTxnResponse(0, Errors.NONE));
+        prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, pid, 
epoch);
 
         sender.run(time.milliseconds());  // Send AddOffsetsRequest
         assertTrue(transactionManager.hasPendingOffsetCommits());  // We 
should now have created and queued the offset commit request.
@@ -210,17 +200,7 @@ public class TransactionManagerTest {
         txnOffsetCommitResponse.put(tp1, Errors.NONE);
 
         prepareFindCoordinatorResponse(Errors.NONE, false, 
FindCoordinatorRequest.CoordinatorType.GROUP, consumerGroupId);
-
-        client.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                TxnOffsetCommitRequest txnOffsetCommitRequest = 
(TxnOffsetCommitRequest) body;
-                assertEquals(consumerGroupId, 
txnOffsetCommitRequest.consumerGroupId());
-                assertEquals(pid, txnOffsetCommitRequest.producerId());
-                assertEquals(epoch, txnOffsetCommitRequest.producerEpoch());
-                return true;
-            }
-        }, new TxnOffsetCommitResponse(0, txnOffsetCommitResponse));
+        prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, 
txnOffsetCommitResponse);
 
         assertEquals(null, 
transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
         sender.run(time.milliseconds());  // try to send 
TxnOffsetCommitRequest, but find we don't have a group coordinator.
@@ -542,15 +522,106 @@ public class TransactionManagerTest {
         assertTrue(transactionManager.isReadyForTransaction());  // make sure 
we are ready for a transaction now.
     }
 
+    @Test
+    public void testHandlingOfUnknownTopicPartitionErrorOnAddPartitions() 
throws InterruptedException {
+        client.setNode(brokerNode);
+        // This is called from the initTransactions method in the producer as 
the first order of business.
+        // It finds the coordinator and then gets a PID.
+        final long pid = 13131L;
+        final short epoch = 1;
+        transactionManager.initializeTransactions();
+        prepareFindCoordinatorResponse(Errors.NONE, false, 
FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
+
+        sender.run(time.milliseconds());  // find coordinator
+        sender.run(time.milliseconds());
+        assertEquals(brokerNode, 
transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
+
+        prepareInitPidResponse(Errors.NONE, false, pid, epoch);
+
+        sender.run(time.milliseconds());  // get pid.
+
+        assertTrue(transactionManager.hasProducerId());
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+
+        Future<RecordMetadata> responseFuture = accumulator.append(tp0, 
time.milliseconds(), "key".getBytes(),
+                "value".getBytes(), Record.EMPTY_HEADERS, new 
MockCallback(transactionManager), MAX_BLOCK_TIMEOUT).future;
+
+        assertFalse(responseFuture.isDone());
+        prepareAddPartitionsToTxnResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
tp0, epoch, pid);
+
+        sender.run(time.milliseconds());  // Send AddPartitionsRequest
+        assertFalse(transactionManager.transactionContainsPartition(tp0));  // 
The partition should not yet be added.
+
+        prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
+        prepareProduceResponse(Errors.NONE, pid, epoch);
+        sender.run(time.milliseconds());  // Send AddPartitionsRequest 
successfully.
+        assertTrue(transactionManager.transactionContainsPartition(tp0));
+
+        sender.run(time.milliseconds());  // Send ProduceRequest.
+        assertTrue(responseFuture.isDone());
+    }
 
     @Test
-    public void 
shouldNotAddPartitionsToTransactionWhenTopicAuthorizationFailed() throws 
Exception {
-        
verifyAddPartitionsFailsWithPartitionLevelError(Errors.TOPIC_AUTHORIZATION_FAILED);
+    public void testHandlingOfUnknownTopicPartitionErrorOnTxnOffsetCommit() 
throws InterruptedException {
+        client.setNode(brokerNode);
+        // This is called from the initTransactions method in the producer as 
the first order of business.
+        // It finds the coordinator and then gets a PID.
+        final long pid = 13131L;
+        final short epoch = 1;
+        transactionManager.initializeTransactions();
+        prepareFindCoordinatorResponse(Errors.NONE, false, 
FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
+
+        sender.run(time.milliseconds());  // find coordinator
+        sender.run(time.milliseconds());
+        assertEquals(brokerNode, 
transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
+
+        prepareInitPidResponse(Errors.NONE, false, pid, epoch);
+
+        sender.run(time.milliseconds());  // get pid.
+
+        assertTrue(transactionManager.hasProducerId());
+        transactionManager.beginTransaction();
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        offsets.put(tp1, new OffsetAndMetadata(1));
+        final String consumerGroupId = "myconsumergroup";
+
+        TransactionalRequestResult addOffsetsResult = 
transactionManager.sendOffsetsToTransaction(offsets, consumerGroupId);
+        prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, pid, 
epoch);
+
+        sender.run(time.milliseconds());  // send AddOffsetsToTxnResult
+
+        assertFalse(addOffsetsResult.isCompleted());  // The request should 
complete only after the TxnOffsetCommit completes.
+
+        Map<TopicPartition, Errors> txnOffsetCommitResponse = new HashMap<>();
+        txnOffsetCommitResponse.put(tp1, Errors.UNKNOWN_TOPIC_OR_PARTITION);
+
+        prepareFindCoordinatorResponse(Errors.NONE, false, 
FindCoordinatorRequest.CoordinatorType.GROUP, consumerGroupId);
+        prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, 
txnOffsetCommitResponse);
+
+        assertEquals(null, 
transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
+        sender.run(time.milliseconds());  // try to send 
TxnOffsetCommitRequest, but find we don't have a group coordinator.
+        sender.run(time.milliseconds());  // send find coordinator for group 
request
+        
assertNotNull(transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
+        assertTrue(transactionManager.hasPendingOffsetCommits());
+
+        sender.run(time.milliseconds());  // send TxnOffsetCommitRequest 
request.
+
+        assertTrue(transactionManager.hasPendingOffsetCommits());  // The 
TxnOffsetCommit failed.
+        assertFalse(addOffsetsResult.isCompleted());  // We should only be 
done after both RPCs complete successfully.
+
+        txnOffsetCommitResponse.put(tp1, Errors.NONE);
+        prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, 
txnOffsetCommitResponse);
+        sender.run(time.milliseconds());  // Send TxnOffsetCommitRequest again.
+
+        assertTrue(addOffsetsResult.isCompleted());
+        assertTrue(addOffsetsResult.isSuccessful());
     }
 
     @Test
-    public void 
shouldNotAddPartitionsToTransactionWhenUnknownTopicOrPartition() throws 
Exception {
-        
verifyAddPartitionsFailsWithPartitionLevelError(Errors.UNKNOWN_TOPIC_OR_PARTITION);
+    public void 
shouldNotAddPartitionsToTransactionWhenTopicAuthorizationFailed() throws 
Exception {
+        
verifyAddPartitionsFailsWithPartitionLevelError(Errors.TOPIC_AUTHORIZATION_FAILED);
     }
 
     private void verifyAddPartitionsFailsWithPartitionLevelError(final Errors 
error) throws InterruptedException {
@@ -679,10 +750,41 @@ public class TransactionManagerTest {
         }, new EndTxnResponse(0, error));
     }
 
+    private void prepareAddOffsetsToTxnResponse(Errors error, final String 
consumerGroupId, final long producerId,
+                                                final short producerEpoch) {
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                AddOffsetsToTxnRequest addOffsetsToTxnRequest = 
(AddOffsetsToTxnRequest) body;
+                assertEquals(consumerGroupId, 
addOffsetsToTxnRequest.consumerGroupId());
+                assertEquals(transactionalId, 
addOffsetsToTxnRequest.transactionalId());
+                assertEquals(producerId, addOffsetsToTxnRequest.producerId());
+                assertEquals(producerEpoch, 
addOffsetsToTxnRequest.producerEpoch());
+                return true;
+            }
+        }, new AddOffsetsToTxnResponse(0, error));
+    }
+
+    private void prepareTxnOffsetCommitResponse(final String consumerGroupId, 
final long producerId,
+                                                final short producerEpoch, 
Map<TopicPartition, Errors> txnOffsetCommitResponse) {
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                TxnOffsetCommitRequest txnOffsetCommitRequest = 
(TxnOffsetCommitRequest) body;
+                assertEquals(consumerGroupId, 
txnOffsetCommitRequest.consumerGroupId());
+                assertEquals(producerId, txnOffsetCommitRequest.producerId());
+                assertEquals(producerEpoch, 
txnOffsetCommitRequest.producerEpoch());
+                return true;
+            }
+        }, new TxnOffsetCommitResponse(0, txnOffsetCommitResponse));
+
+    }
+
     private ProduceResponse produceResponse(TopicPartition tp, long offset, 
Errors error, int throttleTimeMs) {
         ProduceResponse.PartitionResponse resp = new 
ProduceResponse.PartitionResponse(error, offset, RecordBatch.NO_TIMESTAMP);
         Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = 
Collections.singletonMap(tp, resp);
         return new ProduceResponse(partResp, throttleTimeMs);
     }
 
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b04d8ba/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala 
b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
index f1fd365..110e680 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
@@ -72,7 +72,7 @@ class TransactionsBounceTest extends KafkaServerTestHarness {
       .map(KafkaConfig.fromProps(_, overridingProps))
   }
 
-  @Ignore  // need to fix KAFKA-5268 and KAFKA-5269 before re-enabling
+  @Ignore  // Disabling this as it is flaky on Jenkins.
   @Test
   def testBrokerFailure() {
     // basic idea is to seed a topic with 10000 records, and copy it 
transactionally while bouncing brokers
@@ -99,7 +99,7 @@ class TransactionsBounceTest extends KafkaServerTestHarness {
         val records = TestUtils.pollUntilAtLeastNumRecords(consumer, toRead)
         trace(s"received ${records.size} messages. sending them 
transactionally to $outputTopic")
         producer.beginTransaction()
-        val shouldAbort = iteration % 10 == 0
+        val shouldAbort = iteration % 2 == 0
         records.zipWithIndex.foreach { case (record, i) =>
           producer.send(
             TestUtils.producerRecordWithExpectedTransactionStatus(outputTopic, 
record.key, record.value, !shouldAbort),

Reply via email to