Repository: kafka Updated Branches: refs/heads/0.11.0 07cfcc53b -> da6da5356
KAFKA-5364; Don't fail producer if drained partition is not yet in transaction Due to the async nature of the producer, it is possible to attempt to drain a messages whose partition hasn't been added to the transaction yet. Before this patch, we considered this a fatal error. However, it is only in error if the partition isn't in the queue to be sent to the coordinator. This patch updates the logic so that we only fail the producer if the partition would never be added to the transaction. If the partition of the batch is yet to be added, we will simply wait for the partition to be added to the transaction before sending the batch to the broker. Author: Apurva Mehta <[email protected]> Reviewers: Guozhang Wang <[email protected]>, Jason Gustafson <[email protected]> Closes #3202 from apurvam/KAFKA-5364-ensure-partitions-added-to-txn-before-send (cherry picked from commit 673ab671e6d72f48fcc98de0b73564983c34e752) Signed-off-by: Jason Gustafson <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/da6da535 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/da6da535 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/da6da535 Branch: refs/heads/0.11.0 Commit: da6da53563066519c587c751c1b16b65760fe096 Parents: 07cfcc5 Author: Apurva Mehta <[email protected]> Authored: Fri Jun 2 00:53:21 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Fri Jun 2 00:53:39 2017 -0700 ---------------------------------------------------------------------- .../producer/internals/TransactionManager.java | 34 +++++- .../internals/TransactionManagerTest.java | 111 ++++++++++++++++++- 2 files changed, 139 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/da6da535/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 9d9deac..0a69e02 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -234,10 +234,17 @@ public class TransactionManager { } public synchronized boolean ensurePartitionAdded(TopicPartition tp) { - if (isInTransaction() && !partitionsInTransaction.contains(tp)) { - transitionToFatalError(new IllegalStateException("Attempted to dequeue a record batch to send " + - "for partition " + tp + ", which hasn't been added to the transaction yet")); + if (hasFatalError()) return false; + if (isInTransaction() || hasAbortableError()) { + // We should enter this branch in an error state because if this partition is already in the transaction, + // there is a chance that the corresponding batch is in retry. So we must let it completely flush. + if (!(partitionsInTransaction.contains(tp) || isPartitionPending(tp))) { + transitionToFatalError(new IllegalStateException("Attempted to dequeue a record batch to send " + + "for partition " + tp + ", which would never be added to the transaction.")); + return false; + } + return partitionsInTransaction.contains(tp); } return true; } @@ -416,6 +423,16 @@ public class TransactionManager { return inFlightRequestCorrelationId != NO_INFLIGHT_REQUEST_CORRELATION_ID; } + // visible for testing. + boolean hasFatalError() { + return currentState == State.FATAL_ERROR; + } + + // visible for testing. + boolean hasAbortableError() { + return currentState == State.ABORTABLE_ERROR; + } + // visible for testing synchronized boolean transactionContainsPartition(TopicPartition topicPartition) { return isInTransaction() && partitionsInTransaction.contains(topicPartition); @@ -431,6 +448,10 @@ public class TransactionManager { return isTransactional() && currentState == State.READY; } + private synchronized boolean isPartitionPending(TopicPartition tp) { + return isInTransaction() && (pendingPartitionsInTransaction.contains(tp) || newPartitionsInTransaction.contains(tp)); + } + private void transitionTo(State target) { transitionTo(target, null); } @@ -448,7 +469,12 @@ public class TransactionManager { lastError = null; } - log.debug("{}Transition from state {} to {}", logPrefix, currentState, target); + if (lastError != null) + log.error("{}Transition from state {} to error state {}", logPrefix, currentState, + target, lastError); + else + log.debug("Transition from state {} to {}", logPrefix, currentState, target); + currentState = target; } http://git-wip-us.apache.org/repos/asf/kafka/blob/da6da535/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 3e3f785..96eae8f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.OutOfOrderSequenceException; @@ -60,11 +61,15 @@ import org.apache.kafka.test.TestUtils; import org.junit.Before; import org.junit.Test; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -87,8 +92,9 @@ public class TransactionManagerTest { private final String transactionalId = "foobar"; private final int transactionTimeoutMs = 1121; - private TopicPartition tp0 = new TopicPartition("test", 0); - private TopicPartition tp1 = new TopicPartition("test", 1); + private final String topic = "test"; + private TopicPartition tp0 = new TopicPartition(topic, 0); + private TopicPartition tp1 = new TopicPartition(topic, 1); private MockTime time = new MockTime(); private MockClient client = new MockClient(time); @@ -160,9 +166,11 @@ public class TransactionManagerTest { prepareProduceResponse(Errors.NONE, pid, epoch); assertFalse(transactionManager.transactionContainsPartition(tp0)); + assertFalse(transactionManager.ensurePartitionAdded(tp0)); sender.run(time.milliseconds()); // send addPartitions. // Check that only addPartitions was sent. assertTrue(transactionManager.transactionContainsPartition(tp0)); + assertTrue(transactionManager.ensurePartitionAdded(tp0)); assertFalse(responseFuture.isDone()); sender.run(time.milliseconds()); // send produce request. @@ -884,6 +892,105 @@ public class TransactionManagerTest { assertTrue(abortResult.isSuccessful()); } + @Test + public void testNoDrainWhenPartitionsPending() throws InterruptedException { + final long pid = 13131L; + final short epoch = 1; + doInitTransactions(pid, epoch); + transactionManager.beginTransaction(); + transactionManager.maybeAddPartitionToTransaction(tp0); + accumulator.append(tp0, time.milliseconds(), "key".getBytes(), + "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT); + transactionManager.maybeAddPartitionToTransaction(tp1); + accumulator.append(tp1, time.milliseconds(), "key".getBytes(), + "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT); + + assertFalse(transactionManager.ensurePartitionAdded(tp0)); + assertFalse(transactionManager.ensurePartitionAdded(tp1)); + + Node node1 = new Node(0, "localhost", 1111); + Node node2 = new Node(1, "localhost", 1112); + PartitionInfo part1 = new PartitionInfo(topic, 0, node1, null, null); + PartitionInfo part2 = new PartitionInfo(topic, 1, node2, null, null); + + Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1, part2), + Collections.<String>emptySet(), Collections.<String>emptySet()); + Set<Node> nodes = new HashSet<>(); + nodes.add(node1); + nodes.add(node2); + Map<Integer, List<ProducerBatch>> drainedBatches = accumulator.drain(cluster, nodes, Integer.MAX_VALUE, + time.milliseconds()); + + // We shouldn't drain batches which haven't been added to the transaction yet. + assertTrue(drainedBatches.containsKey(node1.id())); + assertTrue(drainedBatches.get(node1.id()).isEmpty()); + assertTrue(drainedBatches.containsKey(node2.id())); + assertTrue(drainedBatches.get(node2.id()).isEmpty()); + assertFalse(transactionManager.hasError()); + } + + @Test + public void testAllowDrainInAbortableErrorState() throws InterruptedException { + final long pid = 13131L; + final short epoch = 1; + doInitTransactions(pid, epoch); + transactionManager.beginTransaction(); + transactionManager.maybeAddPartitionToTransaction(tp1); + prepareAddPartitionsToTxn(tp1, Errors.NONE); + sender.run(time.milliseconds()); // Send AddPartitions, tp1 should be in the transaction now. + + assertTrue(transactionManager.transactionContainsPartition(tp1)); + + transactionManager.maybeAddPartitionToTransaction(tp0); + prepareAddPartitionsToTxn(tp0, Errors.TOPIC_AUTHORIZATION_FAILED); + sender.run(time.milliseconds()); // Send AddPartitions, should be in abortable state. + + assertTrue(transactionManager.hasAbortableError()); + assertTrue(transactionManager.ensurePartitionAdded(tp1)); + + // Try to drain a message destined for tp1, it should get drained. + Node node1 = new Node(1, "localhost", 1112); + PartitionInfo part1 = new PartitionInfo(topic, 1, node1, null, null); + Cluster cluster = new Cluster(null, Arrays.asList(node1), Arrays.asList(part1), + Collections.<String>emptySet(), Collections.<String>emptySet()); + accumulator.append(tp1, time.milliseconds(), "key".getBytes(), + "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT); + Map<Integer, List<ProducerBatch>> drainedBatches = accumulator.drain(cluster, Collections.singleton(node1), + Integer.MAX_VALUE, + time.milliseconds()); + + // We should drain the appended record since we are in abortable state and the partition has already been + // added to the transaction. + assertTrue(drainedBatches.containsKey(node1.id())); + assertEquals(1, drainedBatches.get(node1.id()).size()); + assertTrue(transactionManager.hasAbortableError()); + } + + @Test + public void testRaiseErrorWhenNoPartitionsPendingOnDrain() throws InterruptedException { + final long pid = 13131L; + final short epoch = 1; + doInitTransactions(pid, epoch); + transactionManager.beginTransaction(); + // Don't execute transactionManager.maybeAddPartitionToTransaction(tp0). This should result in an error on drain. + accumulator.append(tp0, time.milliseconds(), "key".getBytes(), + "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT); + Node node1 = new Node(0, "localhost", 1111); + PartitionInfo part1 = new PartitionInfo(topic, 0, node1, null, null); + + Cluster cluster = new Cluster(null, Arrays.asList(node1), Arrays.asList(part1), + Collections.<String>emptySet(), Collections.<String>emptySet()); + Set<Node> nodes = new HashSet<>(); + nodes.add(node1); + Map<Integer, List<ProducerBatch>> drainedBatches = accumulator.drain(cluster, nodes, Integer.MAX_VALUE, + time.milliseconds()); + + // We shouldn't drain batches which haven't been added to the transaction yet. + assertTrue(drainedBatches.containsKey(node1.id())); + assertTrue(drainedBatches.get(node1.id()).isEmpty()); + assertTrue(transactionManager.hasFatalError()); + } + private void verifyAddPartitionsFailsWithPartitionLevelError(final Errors error) throws InterruptedException { final long pid = 1L; final short epoch = 1;
