Repository: kafka Updated Branches: refs/heads/0.11.0 1c59aa735 -> 9419204c2 (forced update)
KAFKA-5364; ensurePartitionAdded does not handle pending partitions in abortable error state Author: Jason Gustafson <ja...@confluent.io> Reviewers: Apurva Mehta <apu...@confluent.io>, Guozhang Wang <wangg...@gmail.com>, Ismael Juma <ism...@juma.me.uk> Closes #3231 from hachikuji/KAFKA-5364 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9419204c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9419204c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9419204c Branch: refs/heads/0.11.0 Commit: 9419204c2b4e63e9b1a47335f16191dc6e23543f Parents: b9c876e Author: Jason Gustafson <ja...@confluent.io> Authored: Tue Jun 6 13:57:20 2017 +0100 Committer: Ismael Juma <ism...@juma.me.uk> Committed: Tue Jun 6 14:00:05 2017 +0100 ---------------------------------------------------------------------- .../kafka/clients/producer/KafkaProducer.java | 16 +- .../producer/internals/RecordAccumulator.java | 4 +- .../producer/internals/TransactionManager.java | 78 +++-- .../internals/RecordAccumulatorTest.java | 1 - .../clients/producer/internals/SenderTest.java | 4 +- .../internals/TransactionManagerTest.java | 342 ++++++++++++++++++- 6 files changed, 381 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9419204c/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 1d16721..0f109d8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -608,7 +608,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { */ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { if (transactionManager != null) - ensureProperTransactionalState(); + transactionManager.failIfUnreadyForSend(); TopicPartition tp = null; try { @@ -691,20 +691,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> { } } - private void ensureProperTransactionalState() { - if (transactionManager.isTransactional() && !transactionManager.hasProducerId()) - throw new IllegalStateException("Cannot perform a 'send' before completing a call to initTransactions " + - "when transactions are enabled."); - - if (transactionManager.hasError()) { - Exception lastError = transactionManager.lastError(); - throw new KafkaException("Cannot perform send because at least one previous transactional or " + - "idempotent request has failed with errors.", lastError); - } - if (transactionManager.isCompletingTransaction()) - throw new IllegalStateException("Cannot call send while a commit or abort is in progress."); - } - private void setReadOnly(Headers headers) { if (headers instanceof RecordHeaders) { ((RecordHeaders) headers).setReadOnly(); http://git-wip-us.apache.org/repos/asf/kafka/blob/9419204c/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 2c4917d..0315b13 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -469,7 +469,7 @@ public final class RecordAccumulator { ProducerIdAndEpoch producerIdAndEpoch = null; boolean isTransactional = false; if (transactionManager != null) { - if (!transactionManager.ensurePartitionAdded(tp)) + if (!transactionManager.sendToPartitionAllowed(tp)) break; producerIdAndEpoch = transactionManager.producerIdAndEpoch(); @@ -477,7 +477,7 @@ public final class RecordAccumulator { // we cannot send the batch until we have refreshed the producer id break; - isTransactional = transactionManager.isInTransaction(); + isTransactional = transactionManager.isTransactional(); } ProducerBatch batch = deque.pollFirst(); http://git-wip-us.apache.org/repos/asf/kafka/blob/9419204c/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index c081b23..2842cd1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -205,7 +205,7 @@ public class TransactionManager { } public synchronized TransactionalRequestResult sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, - String consumerGroupId) { + String consumerGroupId) { ensureTransactional(); maybeFailWithError(); if (currentState != State.IN_TRANSACTION) @@ -221,34 +221,39 @@ public class TransactionManager { } public synchronized void maybeAddPartitionToTransaction(TopicPartition topicPartition) { - if (!isInTransaction()) - throw new IllegalArgumentException("Cannot add partitions to a transaction in state " + currentState); + if (currentState != State.IN_TRANSACTION) + throw new IllegalStateException("Cannot add partitions to a transaction in state " + currentState); - if (partitionsInTransaction.contains(topicPartition)) + if (partitionsInTransaction.contains(topicPartition) || pendingPartitionsInTransaction.contains(topicPartition)) return; log.debug("{}Begin adding new partition {} to transaction", logPrefix, topicPartition); newPartitionsInTransaction.add(topicPartition); } - public RuntimeException lastError() { + RuntimeException lastError() { return lastError; } - public synchronized boolean ensurePartitionAdded(TopicPartition tp) { + public synchronized void failIfUnreadyForSend() { + if (hasError()) + throw new KafkaException("Cannot perform send because at least one previous transactional or " + + "idempotent request has failed with errors.", lastError); + + if (isTransactional()) { + if (!hasProducerId()) + throw new IllegalStateException("Cannot perform a 'send' before completing a call to initTransactions " + + "when transactions are enabled."); + + if (currentState != State.IN_TRANSACTION) + throw new IllegalStateException("Cannot call send in state " + currentState); + } + } + + synchronized boolean sendToPartitionAllowed(TopicPartition tp) { if (hasFatalError()) return false; - if (isInTransaction() || hasAbortableError()) { - // We should enter this branch in an error state because if this partition is already in the transaction, - // there is a chance that the corresponding batch is in retry. So we must let it completely flush. - if (!(partitionsInTransaction.contains(tp) || isPartitionPending(tp))) { - transitionToFatalError(new IllegalStateException("Attempted to dequeue a record batch to send " + - "for partition " + tp + ", which would never be added to the transaction.")); - return false; - } - return partitionsInTransaction.contains(tp); - } - return true; + return !isTransactional() || partitionsInTransaction.contains(tp); } public String transactionalId() { @@ -263,26 +268,22 @@ public class TransactionManager { return transactionalId != null; } - public synchronized boolean hasPartitionsToAdd() { + synchronized boolean hasPartitionsToAdd() { return !newPartitionsInTransaction.isEmpty() || !pendingPartitionsInTransaction.isEmpty(); } - public synchronized boolean isCompletingTransaction() { + synchronized boolean isCompletingTransaction() { return currentState == State.COMMITTING_TRANSACTION || currentState == State.ABORTING_TRANSACTION; } - public synchronized boolean hasError() { + synchronized boolean hasError() { return currentState == State.ABORTABLE_ERROR || currentState == State.FATAL_ERROR; } - public synchronized boolean isAborting() { + synchronized boolean isAborting() { return currentState == State.ABORTING_TRANSACTION; } - synchronized boolean isInTransaction() { - return currentState == State.IN_TRANSACTION || isCompletingTransaction(); - } - synchronized void transitionToAbortableError(RuntimeException exception) { transitionTo(State.ABORTABLE_ERROR, exception); } @@ -291,6 +292,16 @@ public class TransactionManager { transitionTo(State.FATAL_ERROR, exception); } + // visible for testing + synchronized boolean isPartitionAdded(TopicPartition partition) { + return partitionsInTransaction.contains(partition); + } + + // visible for testing + synchronized boolean isPartitionPendingAdd(TopicPartition partition) { + return newPartitionsInTransaction.contains(partition) || pendingPartitionsInTransaction.contains(partition); + } + /** * Get the current producer id and epoch without blocking. Callers must use {@link ProducerIdAndEpoch#isValid()} to * verify that the result is valid. @@ -437,21 +448,23 @@ public class TransactionManager { // visible for testing synchronized boolean transactionContainsPartition(TopicPartition topicPartition) { - return isInTransaction() && partitionsInTransaction.contains(topicPartition); + return partitionsInTransaction.contains(topicPartition); } // visible for testing synchronized boolean hasPendingOffsetCommits() { - return isInTransaction() && !pendingTxnOffsetCommits.isEmpty(); + return !pendingTxnOffsetCommits.isEmpty(); } // visible for testing - synchronized boolean isReady() { - return isTransactional() && currentState == State.READY; + synchronized boolean hasOngoingTransaction() { + // transactions are considered ongoing once started until completion or a fatal error + return currentState == State.IN_TRANSACTION || isCompletingTransaction() || hasAbortableError(); } - private synchronized boolean isPartitionPending(TopicPartition tp) { - return isInTransaction() && (pendingPartitionsInTransaction.contains(tp) || newPartitionsInTransaction.contains(tp)); + // visible for testing + synchronized boolean isReady() { + return isTransactional() && currentState == State.READY; } private void transitionTo(State target) { @@ -472,8 +485,7 @@ public class TransactionManager { } if (lastError != null) - log.error("{}Transition from state {} to error state {}", logPrefix, currentState, - target, lastError); + log.debug("{}Transition from state {} to error state {}", logPrefix, currentState, target, lastError); else log.debug("{}Transition from state {} to {}", logPrefix, currentState, target); http://git-wip-us.apache.org/repos/asf/kafka/blob/9419204c/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 2875ba2..7b9f26b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -394,7 +394,6 @@ public class RecordAccumulatorTest { accum.abortIncompleteBatches(); assertEquals(numExceptionReceivedInCallback.get(), 100); assertFalse(accum.hasUnsent()); - } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/9419204c/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 927a937..26093d7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -624,7 +624,7 @@ public class SenderTest { assertTrue("Client ready status should be true", client.isReady(node, 0L)); responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L)); - client.respond(produceRequestMatcher(tp, producerIdAndEpoch, 0, txnManager.isInTransaction()), + client.respond(produceRequestMatcher(tp, producerIdAndEpoch, 0, txnManager.isTransactional()), new ProduceResponse(responseMap)); sender.run(time.milliseconds()); // receive @@ -640,7 +640,7 @@ public class SenderTest { assertTrue("Client ready status should be true", client.isReady(node, 0L)); responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.NONE, 1L, 0L)); - client.respond(produceRequestMatcher(tp, producerIdAndEpoch, 1, txnManager.isInTransaction()), + client.respond(produceRequestMatcher(tp, producerIdAndEpoch, 1, txnManager.isTransactional()), new ProduceResponse(responseMap)); sender.run(time.milliseconds()); // receive http://git-wip-us.apache.org/repos/asf/kafka/blob/9419204c/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 6e633ec..f661baf 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -123,6 +123,329 @@ public class TransactionManagerTest { } @Test(expected = IllegalStateException.class) + public void testFailIfUnreadyForSendNoProducerId() { + transactionManager.failIfUnreadyForSend(); + } + + @Test + public void testFailIfUnreadyForSendIdempotentProducer() { + TransactionManager idempotentTransactionManager = new TransactionManager(); + idempotentTransactionManager.failIfUnreadyForSend(); + } + + @Test(expected = KafkaException.class) + public void testFailIfUnreadyForSendIdempotentProducerFatalError() { + TransactionManager idempotentTransactionManager = new TransactionManager(); + idempotentTransactionManager.transitionToFatalError(new KafkaException()); + idempotentTransactionManager.failIfUnreadyForSend(); + } + + @Test(expected = IllegalStateException.class) + public void testFailIfUnreadyForSendNoOngoingTransaction() { + long pid = 13131L; + short epoch = 1; + doInitTransactions(pid, epoch); + transactionManager.failIfUnreadyForSend(); + } + + @Test(expected = KafkaException.class) + public void testFailIfUnreadyForSendAfterAbortableError() { + long pid = 13131L; + short epoch = 1; + doInitTransactions(pid, epoch); + transactionManager.beginTransaction(); + transactionManager.transitionToAbortableError(new KafkaException()); + transactionManager.failIfUnreadyForSend(); + } + + @Test(expected = KafkaException.class) + public void testFailIfUnreadyForSendAfterFatalError() { + long pid = 13131L; + short epoch = 1; + doInitTransactions(pid, epoch); + transactionManager.transitionToFatalError(new KafkaException()); + transactionManager.failIfUnreadyForSend(); + } + + @Test + public void testHasOngoingTransactionSuccessfulAbort() { + long pid = 13131L; + short epoch = 1; + TopicPartition partition = new TopicPartition("foo", 0); + + assertFalse(transactionManager.hasOngoingTransaction()); + doInitTransactions(pid, epoch); + assertFalse(transactionManager.hasOngoingTransaction()); + + transactionManager.beginTransaction(); + assertTrue(transactionManager.hasOngoingTransaction()); + + transactionManager.maybeAddPartitionToTransaction(partition); + assertTrue(transactionManager.hasOngoingTransaction()); + + prepareAddPartitionsToTxn(partition, Errors.NONE); + sender.run(time.milliseconds()); + + transactionManager.beginAbortingTransaction(); + assertTrue(transactionManager.hasOngoingTransaction()); + + prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch); + sender.run(time.milliseconds()); + assertFalse(transactionManager.hasOngoingTransaction()); + } + + @Test + public void testHasOngoingTransactionSuccessfulCommit() { + long pid = 13131L; + short epoch = 1; + TopicPartition partition = new TopicPartition("foo", 0); + + assertFalse(transactionManager.hasOngoingTransaction()); + doInitTransactions(pid, epoch); + assertFalse(transactionManager.hasOngoingTransaction()); + + transactionManager.beginTransaction(); + assertTrue(transactionManager.hasOngoingTransaction()); + + transactionManager.maybeAddPartitionToTransaction(partition); + assertTrue(transactionManager.hasOngoingTransaction()); + + prepareAddPartitionsToTxn(partition, Errors.NONE); + sender.run(time.milliseconds()); + + transactionManager.beginCommittingTransaction(); + assertTrue(transactionManager.hasOngoingTransaction()); + + prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch); + sender.run(time.milliseconds()); + assertFalse(transactionManager.hasOngoingTransaction()); + } + + @Test + public void testHasOngoingTransactionAbortableError() { + long pid = 13131L; + short epoch = 1; + TopicPartition partition = new TopicPartition("foo", 0); + + assertFalse(transactionManager.hasOngoingTransaction()); + doInitTransactions(pid, epoch); + assertFalse(transactionManager.hasOngoingTransaction()); + + transactionManager.beginTransaction(); + assertTrue(transactionManager.hasOngoingTransaction()); + + transactionManager.maybeAddPartitionToTransaction(partition); + assertTrue(transactionManager.hasOngoingTransaction()); + + prepareAddPartitionsToTxn(partition, Errors.NONE); + sender.run(time.milliseconds()); + + transactionManager.transitionToAbortableError(new KafkaException()); + assertTrue(transactionManager.hasOngoingTransaction()); + + transactionManager.beginAbortingTransaction(); + assertTrue(transactionManager.hasOngoingTransaction()); + + prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch); + sender.run(time.milliseconds()); + assertFalse(transactionManager.hasOngoingTransaction()); + } + + @Test + public void testHasOngoingTransactionFatalError() { + long pid = 13131L; + short epoch = 1; + TopicPartition partition = new TopicPartition("foo", 0); + + assertFalse(transactionManager.hasOngoingTransaction()); + doInitTransactions(pid, epoch); + assertFalse(transactionManager.hasOngoingTransaction()); + + transactionManager.beginTransaction(); + assertTrue(transactionManager.hasOngoingTransaction()); + + transactionManager.maybeAddPartitionToTransaction(partition); + assertTrue(transactionManager.hasOngoingTransaction()); + + prepareAddPartitionsToTxn(partition, Errors.NONE); + sender.run(time.milliseconds()); + + transactionManager.transitionToFatalError(new KafkaException()); + assertFalse(transactionManager.hasOngoingTransaction()); + } + + @Test + public void testMaybeAddPartitionToTransaction() { + long pid = 13131L; + short epoch = 1; + TopicPartition partition = new TopicPartition("foo", 0); + doInitTransactions(pid, epoch); + transactionManager.beginTransaction(); + + transactionManager.maybeAddPartitionToTransaction(partition); + assertTrue(transactionManager.hasPartitionsToAdd()); + assertFalse(transactionManager.isPartitionAdded(partition)); + assertTrue(transactionManager.isPartitionPendingAdd(partition)); + + prepareAddPartitionsToTxn(partition, Errors.NONE); + sender.run(time.milliseconds()); + + assertFalse(transactionManager.hasPartitionsToAdd()); + assertTrue(transactionManager.isPartitionAdded(partition)); + assertFalse(transactionManager.isPartitionPendingAdd(partition)); + + // adding the partition again should not have any effect + transactionManager.maybeAddPartitionToTransaction(partition); + assertFalse(transactionManager.hasPartitionsToAdd()); + assertTrue(transactionManager.isPartitionAdded(partition)); + assertFalse(transactionManager.isPartitionPendingAdd(partition)); + } + + @Test(expected = IllegalStateException.class) + public void testMaybeAddPartitionToTransactionBeforeInitTransactions() { + transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0)); + } + + @Test(expected = IllegalStateException.class) + public void testMaybeAddPartitionToTransactionBeforeBeginTransaction() { + long pid = 13131L; + short epoch = 1; + doInitTransactions(pid, epoch); + transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0)); + } + + @Test(expected = IllegalStateException.class) + public void testMaybeAddPartitionToTransactionAfterAbortableError() { + long pid = 13131L; + short epoch = 1; + doInitTransactions(pid, epoch); + transactionManager.beginTransaction(); + transactionManager.transitionToAbortableError(new KafkaException()); + transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0)); + } + + @Test(expected = IllegalStateException.class) + public void testMaybeAddPartitionToTransactionAfterFatalError() { + long pid = 13131L; + short epoch = 1; + doInitTransactions(pid, epoch); + transactionManager.transitionToFatalError(new KafkaException()); + transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0)); + } + + @Test + public void testSendToPartitionAllowedWithPendingPartitionAfterAbortableError() { + final long pid = 13131L; + final short epoch = 1; + + doInitTransactions(pid, epoch); + + transactionManager.beginTransaction(); + transactionManager.maybeAddPartitionToTransaction(tp0); + transactionManager.transitionToAbortableError(new KafkaException()); + + assertFalse(transactionManager.sendToPartitionAllowed(tp0)); + assertTrue(transactionManager.hasAbortableError()); + } + + @Test + public void testSendToPartitionAllowedWithInFlightPartitionAddAfterAbortableError() { + final long pid = 13131L; + final short epoch = 1; + + doInitTransactions(pid, epoch); + + transactionManager.beginTransaction(); + transactionManager.maybeAddPartitionToTransaction(tp0); + + // Send the AddPartitionsToTxn request and leave it in-flight + sender.run(time.milliseconds()); + transactionManager.transitionToAbortableError(new KafkaException()); + + assertFalse(transactionManager.sendToPartitionAllowed(tp0)); + assertTrue(transactionManager.hasAbortableError()); + } + + @Test + public void testSendToPartitionAllowedWithPendingPartitionAfterFatalError() { + final long pid = 13131L; + final short epoch = 1; + + doInitTransactions(pid, epoch); + + transactionManager.beginTransaction(); + transactionManager.maybeAddPartitionToTransaction(tp0); + transactionManager.transitionToFatalError(new KafkaException()); + + assertFalse(transactionManager.sendToPartitionAllowed(tp0)); + assertTrue(transactionManager.hasFatalError()); + } + + @Test + public void testSendToPartitionAllowedWithInFlightPartitionAddAfterFatalError() { + final long pid = 13131L; + final short epoch = 1; + + doInitTransactions(pid, epoch); + + transactionManager.beginTransaction(); + transactionManager.maybeAddPartitionToTransaction(tp0); + + // Send the AddPartitionsToTxn request and leave it in-flight + sender.run(time.milliseconds()); + transactionManager.transitionToFatalError(new KafkaException()); + + assertFalse(transactionManager.sendToPartitionAllowed(tp0)); + assertTrue(transactionManager.hasFatalError()); + } + + @Test + public void testSendToPartitionAllowedWithAddedPartitionAfterAbortableError() { + final long pid = 13131L; + final short epoch = 1; + + doInitTransactions(pid, epoch); + + transactionManager.beginTransaction(); + + transactionManager.maybeAddPartitionToTransaction(tp0); + prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid); + sender.run(time.milliseconds()); + assertFalse(transactionManager.hasPartitionsToAdd()); + transactionManager.transitionToAbortableError(new KafkaException()); + + assertTrue(transactionManager.sendToPartitionAllowed(tp0)); + assertTrue(transactionManager.hasAbortableError()); + } + + @Test + public void testSendToPartitionAllowedWithAddedPartitionAfterFatalError() { + final long pid = 13131L; + final short epoch = 1; + + doInitTransactions(pid, epoch); + + transactionManager.beginTransaction(); + transactionManager.maybeAddPartitionToTransaction(tp0); + prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid); + sender.run(time.milliseconds()); + assertFalse(transactionManager.hasPartitionsToAdd()); + transactionManager.transitionToFatalError(new KafkaException()); + + assertFalse(transactionManager.sendToPartitionAllowed(tp0)); + assertTrue(transactionManager.hasFatalError()); + } + + @Test + public void testSendToPartitionAllowedWithUnaddedPartition() { + final long pid = 13131L; + final short epoch = 1; + doInitTransactions(pid, epoch); + transactionManager.beginTransaction(); + assertFalse(transactionManager.sendToPartitionAllowed(tp0)); + } + + @Test(expected = IllegalStateException.class) public void testInvalidSequenceIncrement() { TransactionManager transactionManager = new TransactionManager(); transactionManager.incrementSequenceNumber(tp0, 3333); @@ -148,8 +471,6 @@ public class TransactionManagerTest { @Test public void testBasicTransaction() throws InterruptedException { - // This is called from the initTransactions method in the producer as the first order of business. - // It finds the coordinator and then gets a PID. final long pid = 13131L; final short epoch = 1; @@ -166,11 +487,11 @@ public class TransactionManagerTest { prepareProduceResponse(Errors.NONE, pid, epoch); assertFalse(transactionManager.transactionContainsPartition(tp0)); - assertFalse(transactionManager.ensurePartitionAdded(tp0)); + assertFalse(transactionManager.sendToPartitionAllowed(tp0)); sender.run(time.milliseconds()); // send addPartitions. // Check that only addPartitions was sent. assertTrue(transactionManager.transactionContainsPartition(tp0)); - assertTrue(transactionManager.ensurePartitionAdded(tp0)); + assertTrue(transactionManager.sendToPartitionAllowed(tp0)); assertFalse(responseFuture.isDone()); sender.run(time.milliseconds()); // send produce request. @@ -210,7 +531,7 @@ public class TransactionManagerTest { prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch); sender.run(time.milliseconds()); // commit. - assertFalse(transactionManager.isInTransaction()); + assertFalse(transactionManager.hasOngoingTransaction()); assertFalse(transactionManager.isCompletingTransaction()); assertFalse(transactionManager.transactionContainsPartition(tp0)); } @@ -507,12 +828,12 @@ public class TransactionManagerTest { prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch); assertFalse(commitResult.isCompleted()); - assertTrue(transactionManager.isInTransaction()); + assertTrue(transactionManager.hasOngoingTransaction()); assertTrue(transactionManager.isCompletingTransaction()); sender.run(time.milliseconds()); assertTrue(commitResult.isCompleted()); - assertFalse(transactionManager.isInTransaction()); + assertFalse(transactionManager.hasOngoingTransaction()); } @Test @@ -910,8 +1231,8 @@ public class TransactionManagerTest { accumulator.append(tp1, time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT); - assertFalse(transactionManager.ensurePartitionAdded(tp0)); - assertFalse(transactionManager.ensurePartitionAdded(tp1)); + assertFalse(transactionManager.sendToPartitionAllowed(tp0)); + assertFalse(transactionManager.sendToPartitionAllowed(tp1)); Node node1 = new Node(0, "localhost", 1111); Node node2 = new Node(1, "localhost", 1112); @@ -951,7 +1272,7 @@ public class TransactionManagerTest { sender.run(time.milliseconds()); // Send AddPartitions, should be in abortable state. assertTrue(transactionManager.hasAbortableError()); - assertTrue(transactionManager.ensurePartitionAdded(tp1)); + assertTrue(transactionManager.sendToPartitionAllowed(tp1)); // Try to drain a message destined for tp1, it should get drained. Node node1 = new Node(1, "localhost", 1112); @@ -993,7 +1314,6 @@ public class TransactionManagerTest { // We shouldn't drain batches which haven't been added to the transaction yet. assertTrue(drainedBatches.containsKey(node1.id())); assertTrue(drainedBatches.get(node1.id()).isEmpty()); - assertTrue(transactionManager.hasFatalError()); } private void verifyAddPartitionsFailsWithPartitionLevelError(final Errors error) throws InterruptedException {