Repository: kafka Updated Branches: refs/heads/trunk 338857e1c -> 8b04d8ba0
KAFKA-5269; Retry on unknown topic/partition error in transactional requests We should retry AddPartitionsToTxnRequest and TxnOffsetCommitRequest when receiving an UNKNOWN_TOPIC_OR_PARTITION error. As described in the JIRA: It turns out that the `UNKNOWN_TOPIC_OR_PARTITION` is returned from the request handler in KafkaAPis for the AddPartitionsToTxn and the TxnOffsetCommitRequest when the broker's metadata doesn't contain one or more partitions in the request. This can happen for instance when the broker is bounced and has not received the cluster metadata yet. We should retry in these cases, as this is the model followed by the consumer when committing offsets, and by the producer with a ProduceRequest. Author: Apurva Mehta <[email protected]> Reviewers: Guozhang Wang <[email protected]>, Jason Gustafson <[email protected]> Closes #3094 from apurvam/KAFKA-5269-handle-unknown-topic-partition-in-transaction-manager Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8b04d8ba Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8b04d8ba Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8b04d8ba Branch: refs/heads/trunk Commit: 8b04d8ba071fbcafc72a17d4cbfe6f00613e59b3 Parents: 338857e Author: Apurva Mehta <[email protected]> Authored: Fri May 19 18:51:37 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Fri May 19 18:51:42 2017 -0700 ---------------------------------------------------------------------- .../kafka/clients/producer/KafkaProducer.java | 3 +- .../producer/internals/TransactionManager.java | 10 +- .../apache/kafka/common/protocol/Errors.java | 3 +- .../internals/TransactionManagerTest.java | 154 +++++++++++++++---- .../kafka/api/TransactionsBounceTest.scala | 4 +- 5 files changed, 140 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/8b04d8ba/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 1ba13b2..ac0169a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -51,6 +51,7 @@ import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.network.Selector; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.AbstractRecords; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.RecordBatch; @@ -696,7 +697,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { throw new IllegalStateException("Cannot perform a 'send' before completing a call to initTransactions when transactions are enabled."); if (transactionManager.isFenced()) - throw new ProducerFencedException("The current producer has been fenced off by a another producer using the same transactional id."); + throw Errors.INVALID_PRODUCER_EPOCH.exception(); if (transactionManager.isInErrorState()) { String errorMessage = http://git-wip-us.apache.org/repos/asf/kafka/blob/8b04d8ba/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 55c1782..c6787f2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -179,7 +179,7 @@ public class TransactionManager { public synchronized TransactionalRequestResult beginAbortingTransaction() { ensureTransactional(); if (isFenced()) - throw new ProducerFencedException("There is a newer producer using the same transactional.id."); + throw Errors.INVALID_PRODUCER_EPOCH.exception(); transitionTo(State.ABORTING_TRANSACTION); return beginCompletingTransaction(false); } @@ -424,7 +424,7 @@ public class TransactionManager { private void maybeFailWithError() { if (isFenced()) - throw new ProducerFencedException("There is a newer producer instance using the same transactional id."); + throw Errors.INVALID_PRODUCER_EPOCH.exception(); if (isInErrorState()) { String errorMessage = "Cannot execute transactional method because we are in an error state."; if (lastError != null) @@ -631,12 +631,12 @@ public class TransactionManager { if (error == Errors.NONE || error == null) { continue; } - if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) { lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); reenqueue(); return; - } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) { + } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS + || error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { reenqueue(); return; } else if (error == Errors.INVALID_PRODUCER_EPOCH) { @@ -848,6 +848,8 @@ public class TransactionManager { coordinatorReloaded = true; lookupCoordinator(FindCoordinatorRequest.CoordinatorType.GROUP, builder.consumerGroupId()); } + } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { + hadFailure = true; } else if (error == Errors.INVALID_PRODUCER_EPOCH) { fenced(); return; http://git-wip-us.apache.org/repos/asf/kafka/blob/8b04d8ba/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index db94b2c..f94fb4d 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -429,7 +429,8 @@ public enum Errors { return new DuplicateSequenceNumberException(message); } }), - INVALID_PRODUCER_EPOCH(47, "Producer attempted an operation with an old epoch", + INVALID_PRODUCER_EPOCH(47, "Producer attempted an operation with an old epoch. Either there is a newer producer " + + "with the same transactionalId, or the producer's transaction has been expired by the broker.", new ApiExceptionBuilder() { @Override public ApiException build(String message) { http://git-wip-us.apache.org/repos/asf/kafka/blob/8b04d8ba/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 6a35061..fcf0488 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -190,17 +190,7 @@ public class TransactionManagerTest { assertFalse(transactionManager.hasPendingOffsetCommits()); - client.prepareResponse(new MockClient.RequestMatcher() { - @Override - public boolean matches(AbstractRequest body) { - AddOffsetsToTxnRequest addOffsetsToTxnRequest = (AddOffsetsToTxnRequest) body; - assertEquals(consumerGroupId, addOffsetsToTxnRequest.consumerGroupId()); - assertEquals(transactionalId, addOffsetsToTxnRequest.transactionalId()); - assertEquals(pid, addOffsetsToTxnRequest.producerId()); - assertEquals(epoch, addOffsetsToTxnRequest.producerEpoch()); - return true; - } - }, new AddOffsetsToTxnResponse(0, Errors.NONE)); + prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, pid, epoch); sender.run(time.milliseconds()); // Send AddOffsetsRequest assertTrue(transactionManager.hasPendingOffsetCommits()); // We should now have created and queued the offset commit request. @@ -210,17 +200,7 @@ public class TransactionManagerTest { txnOffsetCommitResponse.put(tp1, Errors.NONE); prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, consumerGroupId); - - client.prepareResponse(new MockClient.RequestMatcher() { - @Override - public boolean matches(AbstractRequest body) { - TxnOffsetCommitRequest txnOffsetCommitRequest = (TxnOffsetCommitRequest) body; - assertEquals(consumerGroupId, txnOffsetCommitRequest.consumerGroupId()); - assertEquals(pid, txnOffsetCommitRequest.producerId()); - assertEquals(epoch, txnOffsetCommitRequest.producerEpoch()); - return true; - } - }, new TxnOffsetCommitResponse(0, txnOffsetCommitResponse)); + prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, txnOffsetCommitResponse); assertEquals(null, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP)); sender.run(time.milliseconds()); // try to send TxnOffsetCommitRequest, but find we don't have a group coordinator. @@ -542,15 +522,106 @@ public class TransactionManagerTest { assertTrue(transactionManager.isReadyForTransaction()); // make sure we are ready for a transaction now. } + @Test + public void testHandlingOfUnknownTopicPartitionErrorOnAddPartitions() throws InterruptedException { + client.setNode(brokerNode); + // This is called from the initTransactions method in the producer as the first order of business. + // It finds the coordinator and then gets a PID. + final long pid = 13131L; + final short epoch = 1; + transactionManager.initializeTransactions(); + prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); + + sender.run(time.milliseconds()); // find coordinator + sender.run(time.milliseconds()); + assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION)); + + prepareInitPidResponse(Errors.NONE, false, pid, epoch); + + sender.run(time.milliseconds()); // get pid. + + assertTrue(transactionManager.hasProducerId()); + transactionManager.beginTransaction(); + transactionManager.maybeAddPartitionToTransaction(tp0); + + Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), + "value".getBytes(), Record.EMPTY_HEADERS, new MockCallback(transactionManager), MAX_BLOCK_TIMEOUT).future; + + assertFalse(responseFuture.isDone()); + prepareAddPartitionsToTxnResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, tp0, epoch, pid); + + sender.run(time.milliseconds()); // Send AddPartitionsRequest + assertFalse(transactionManager.transactionContainsPartition(tp0)); // The partition should not yet be added. + + prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid); + prepareProduceResponse(Errors.NONE, pid, epoch); + sender.run(time.milliseconds()); // Send AddPartitionsRequest successfully. + assertTrue(transactionManager.transactionContainsPartition(tp0)); + + sender.run(time.milliseconds()); // Send ProduceRequest. + assertTrue(responseFuture.isDone()); + } @Test - public void shouldNotAddPartitionsToTransactionWhenTopicAuthorizationFailed() throws Exception { - verifyAddPartitionsFailsWithPartitionLevelError(Errors.TOPIC_AUTHORIZATION_FAILED); + public void testHandlingOfUnknownTopicPartitionErrorOnTxnOffsetCommit() throws InterruptedException { + client.setNode(brokerNode); + // This is called from the initTransactions method in the producer as the first order of business. + // It finds the coordinator and then gets a PID. + final long pid = 13131L; + final short epoch = 1; + transactionManager.initializeTransactions(); + prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); + + sender.run(time.milliseconds()); // find coordinator + sender.run(time.milliseconds()); + assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION)); + + prepareInitPidResponse(Errors.NONE, false, pid, epoch); + + sender.run(time.milliseconds()); // get pid. + + assertTrue(transactionManager.hasProducerId()); + transactionManager.beginTransaction(); + + Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); + offsets.put(tp1, new OffsetAndMetadata(1)); + final String consumerGroupId = "myconsumergroup"; + + TransactionalRequestResult addOffsetsResult = transactionManager.sendOffsetsToTransaction(offsets, consumerGroupId); + prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, pid, epoch); + + sender.run(time.milliseconds()); // send AddOffsetsToTxnResult + + assertFalse(addOffsetsResult.isCompleted()); // The request should complete only after the TxnOffsetCommit completes. + + Map<TopicPartition, Errors> txnOffsetCommitResponse = new HashMap<>(); + txnOffsetCommitResponse.put(tp1, Errors.UNKNOWN_TOPIC_OR_PARTITION); + + prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, consumerGroupId); + prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, txnOffsetCommitResponse); + + assertEquals(null, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP)); + sender.run(time.milliseconds()); // try to send TxnOffsetCommitRequest, but find we don't have a group coordinator. + sender.run(time.milliseconds()); // send find coordinator for group request + assertNotNull(transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP)); + assertTrue(transactionManager.hasPendingOffsetCommits()); + + sender.run(time.milliseconds()); // send TxnOffsetCommitRequest request. + + assertTrue(transactionManager.hasPendingOffsetCommits()); // The TxnOffsetCommit failed. + assertFalse(addOffsetsResult.isCompleted()); // We should only be done after both RPCs complete successfully. + + txnOffsetCommitResponse.put(tp1, Errors.NONE); + prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, txnOffsetCommitResponse); + sender.run(time.milliseconds()); // Send TxnOffsetCommitRequest again. + + assertTrue(addOffsetsResult.isCompleted()); + assertTrue(addOffsetsResult.isSuccessful()); } @Test - public void shouldNotAddPartitionsToTransactionWhenUnknownTopicOrPartition() throws Exception { - verifyAddPartitionsFailsWithPartitionLevelError(Errors.UNKNOWN_TOPIC_OR_PARTITION); + public void shouldNotAddPartitionsToTransactionWhenTopicAuthorizationFailed() throws Exception { + verifyAddPartitionsFailsWithPartitionLevelError(Errors.TOPIC_AUTHORIZATION_FAILED); } private void verifyAddPartitionsFailsWithPartitionLevelError(final Errors error) throws InterruptedException { @@ -679,10 +750,41 @@ public class TransactionManagerTest { }, new EndTxnResponse(0, error)); } + private void prepareAddOffsetsToTxnResponse(Errors error, final String consumerGroupId, final long producerId, + final short producerEpoch) { + client.prepareResponse(new MockClient.RequestMatcher() { + @Override + public boolean matches(AbstractRequest body) { + AddOffsetsToTxnRequest addOffsetsToTxnRequest = (AddOffsetsToTxnRequest) body; + assertEquals(consumerGroupId, addOffsetsToTxnRequest.consumerGroupId()); + assertEquals(transactionalId, addOffsetsToTxnRequest.transactionalId()); + assertEquals(producerId, addOffsetsToTxnRequest.producerId()); + assertEquals(producerEpoch, addOffsetsToTxnRequest.producerEpoch()); + return true; + } + }, new AddOffsetsToTxnResponse(0, error)); + } + + private void prepareTxnOffsetCommitResponse(final String consumerGroupId, final long producerId, + final short producerEpoch, Map<TopicPartition, Errors> txnOffsetCommitResponse) { + client.prepareResponse(new MockClient.RequestMatcher() { + @Override + public boolean matches(AbstractRequest body) { + TxnOffsetCommitRequest txnOffsetCommitRequest = (TxnOffsetCommitRequest) body; + assertEquals(consumerGroupId, txnOffsetCommitRequest.consumerGroupId()); + assertEquals(producerId, txnOffsetCommitRequest.producerId()); + assertEquals(producerEpoch, txnOffsetCommitRequest.producerEpoch()); + return true; + } + }, new TxnOffsetCommitResponse(0, txnOffsetCommitResponse)); + + } + private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs) { ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, RecordBatch.NO_TIMESTAMP); Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = Collections.singletonMap(tp, resp); return new ProduceResponse(partResp, throttleTimeMs); } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/8b04d8ba/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala index f1fd365..110e680 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala @@ -72,7 +72,7 @@ class TransactionsBounceTest extends KafkaServerTestHarness { .map(KafkaConfig.fromProps(_, overridingProps)) } - @Ignore // need to fix KAFKA-5268 and KAFKA-5269 before re-enabling + @Ignore // Disabling this as it is flaky on Jenkins. @Test def testBrokerFailure() { // basic idea is to seed a topic with 10000 records, and copy it transactionally while bouncing brokers @@ -99,7 +99,7 @@ class TransactionsBounceTest extends KafkaServerTestHarness { val records = TestUtils.pollUntilAtLeastNumRecords(consumer, toRead) trace(s"received ${records.size} messages. sending them transactionally to $outputTopic") producer.beginTransaction() - val shouldAbort = iteration % 10 == 0 + val shouldAbort = iteration % 2 == 0 records.zipWithIndex.foreach { case (record, i) => producer.send( TestUtils.producerRecordWithExpectedTransactionStatus(outputTopic, record.key, record.value, !shouldAbort),
