Repository: kafka Updated Branches: refs/heads/0.11.0 d00e1bdeb -> 651d9a538
KAFKA-5251; Producer should cancel unsent AddPartitions and Produce requests on abort Author: Jason Gustafson <[email protected]> Reviewers: Apurva Mehta <[email protected]>, Guozhang Wang <[email protected]> Closes #3161 from hachikuji/KAFKA-5251 (cherry picked from commit d41cf1b77819ede5716b31683d0137eb60cb7bfb) Signed-off-by: Jason Gustafson <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/651d9a53 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/651d9a53 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/651d9a53 Branch: refs/heads/0.11.0 Commit: 651d9a5385c103aafd9f5d44ae016af7879f694f Parents: d00e1bd Author: Jason Gustafson <[email protected]> Authored: Tue May 30 20:32:51 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Tue May 30 20:54:13 2017 -0700 ---------------------------------------------------------------------- .../kafka/clients/producer/KafkaProducer.java | 2 +- .../producer/internals/RecordAccumulator.java | 19 +- .../clients/producer/internals/Sender.java | 66 +++--- .../producer/internals/TransactionManager.java | 62 +++--- .../common/requests/AddOffsetsToTxnRequest.java | 3 +- .../requests/AddPartitionsToTxnRequest.java | 7 +- .../kafka/common/requests/EndTxnRequest.java | 3 +- .../kafka/common/requests/FetchRequest.java | 7 +- .../common/requests/TxnOffsetCommitRequest.java | 3 +- .../clients/producer/internals/SenderTest.java | 2 +- .../internals/TransactionManagerTest.java | 209 +++++++++++++------ .../kafka/api/TransactionsTest.scala | 1 + 12 files changed, 259 insertions(+), 125 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/651d9a53/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 4fcbcc8..dc6b911 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -697,7 +697,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { throw new IllegalStateException("Cannot perform a 'send' before completing a call to initTransactions " + "when transactions are enabled."); - if (transactionManager.isInErrorState()) { + if (transactionManager.hasError()) { Exception lastError = transactionManager.lastError(); throw new KafkaException("Cannot perform send because at least one previous transactional or " + "idempotent request has failed with errors.", lastError); http://git-wip-us.apache.org/repos/asf/kafka/blob/651d9a53/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index d3d1b82..330c244 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -621,7 +621,6 @@ public final class RecordAccumulator { void abortBatches(final RuntimeException reason) { for (ProducerBatch batch : incomplete.all()) { Deque<ProducerBatch> dq = getDeque(batch.topicPartition); - // Close the batch before aborting synchronized (dq) { batch.abort(); dq.remove(batch); @@ -631,6 +630,24 @@ public final class RecordAccumulator { } } + void abortUnclosedBatches(RuntimeException reason) { + for (ProducerBatch batch : incomplete.all()) { + Deque<ProducerBatch> dq = getDeque(batch.topicPartition); + boolean aborted = false; + synchronized (dq) { + if (!batch.isClosed()) { + aborted = true; + batch.abort(); + dq.remove(batch); + } + } + if (aborted) { + batch.done(-1L, RecordBatch.NO_TIMESTAMP, reason); + deallocate(batch); + } + } + } + public void mutePartition(TopicPartition tp) { muted.add(tp); } http://git-wip-us.apache.org/repos/asf/kafka/blob/651d9a53/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 3fa5903..f498f7d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -24,6 +24,7 @@ import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.NetworkClientUtils; import org.apache.kafka.clients.RequestCompletionHandler; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; @@ -46,6 +47,7 @@ import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.InitProducerIdRequest; import org.apache.kafka.common.requests.InitProducerIdResponse; import org.apache.kafka.common.requests.ProduceRequest; @@ -204,7 +206,7 @@ public class Sender implements Runnable { // do not continue sending if the transaction manager is in a failed state or if there // is no producer id (for the idempotent case). - if (transactionManager.isInErrorState() || !transactionManager.hasProducerId()) { + if (transactionManager.hasError() || !transactionManager.hasProducerId()) { RuntimeException lastError = transactionManager.lastError(); if (lastError != null) maybeAbortBatches(lastError); @@ -295,24 +297,38 @@ public class Sender implements Runnable { } private boolean maybeSendTransactionalRequest(long now) { - TransactionManager.TxnRequestHandler nextRequestHandler = transactionManager.nextRequestHandler(); - if (nextRequestHandler == null) { - log.trace("TransactionalId: {} -- There are no pending transactional requests to send", - transactionManager.transactionalId()); - return false; - } - - if (nextRequestHandler.isEndTxn() && transactionManager.isCompletingTransaction() && accumulator.hasUnflushedBatches()) { + String transactionalId = transactionManager.transactionalId(); + if (transactionManager.isCompletingTransaction() && + !transactionManager.hasPartitionsToAdd() && + accumulator.hasUnflushedBatches()) { + + // If the transaction is being aborted, then we can clear any unsent produce requests + if (transactionManager.isAborting()) + accumulator.abortUnclosedBatches(new KafkaException("Failing batch since transaction was aborted")); + + // There may still be requests left which are being retried. Since we do not know whether they had + // been successfully appended to the broker log, we must resend them until their final status is clear. + // If they had been appended and we did not receive the error, then our sequence number would no longer + // be correct which would lead to an OutOfSequenceException. if (!accumulator.flushInProgress()) accumulator.beginFlush(); - transactionManager.reenqueue(nextRequestHandler); - log.trace("TransactionalId: {} -- Going to wait for pending ProducerBatches to flush before sending an " + - "end transaction request", transactionManager.transactionalId()); + + // Do not send the EndTxn until all pending batches have been completed + if (accumulator.hasUnflushedBatches()) { + log.trace("TransactionalId: {} -- Waiting for pending batches to be flushed before completing transaction", + transactionalId); + return false; + } + } + + TransactionManager.TxnRequestHandler nextRequestHandler = transactionManager.nextRequestHandler(); + if (nextRequestHandler == null) { + log.trace("TransactionalId: {} -- There are no pending transactional requests to send", transactionalId); return false; } - log.debug("TransactionalId: {} -- Sending transactional request {}", transactionManager.transactionalId(), - nextRequestHandler.requestBuilder()); + AbstractRequest.Builder<?> requestBuilder = nextRequestHandler.requestBuilder(); + log.trace("TransactionalId: {} -- Preparing to send request {}", transactionalId, requestBuilder); while (true) { Node targetNode = null; @@ -332,25 +348,25 @@ public class Sender implements Runnable { } if (targetNode != null) { if (nextRequestHandler.isRetry()) { - log.trace("TransactionalId: {} -- Waiting {}ms before resending a transactional request {}", - transactionManager.transactionalId(), retryBackoffMs, nextRequestHandler.requestBuilder()); + log.trace("TransactionalId: {} -- Waiting {}ms before resending request {}", + transactionalId, + retryBackoffMs, requestBuilder); time.sleep(retryBackoffMs); } ClientRequest clientRequest = client.newClientRequest(targetNode.idString(), - nextRequestHandler.requestBuilder(), now, true, nextRequestHandler); + requestBuilder, now, true, nextRequestHandler); transactionManager.setInFlightRequestCorrelationId(clientRequest.correlationId()); - log.trace("TransactionalId: {} -- Sending transactional request {} to node {}", transactionManager.transactionalId(), - nextRequestHandler.requestBuilder(), clientRequest.destination()); + log.debug("TransactionalId: {} -- Sending transactional request {} to node {}", + transactionalId, requestBuilder, clientRequest.destination()); client.send(clientRequest, now); return true; } } catch (IOException e) { - log.debug("TransactionalId: {} -- Disconnect from {} while trying to send transactional " + - "request {}. Going to back off and retry", transactionManager.transactionalId(), - targetNode, nextRequestHandler.requestBuilder()); + log.debug("TransactionalId: {} -- Disconnect from {} while trying to send request {}. Going " + + "to back off and retry", transactionalId, targetNode, requestBuilder); } - log.trace("TransactionalId: {}. About to wait for {}ms before trying to send another transactional request.", - transactionManager.transactionalId(), retryBackoffMs); + log.trace("TransactionalId: {} -- About to wait for {}ms before trying to send another request.", + transactionalId, retryBackoffMs); time.sleep(retryBackoffMs); metadata.requestUpdate(); } @@ -402,7 +418,7 @@ public class Sender implements Runnable { } private void maybeWaitForProducerId() { - while (!transactionManager.hasProducerId() && !transactionManager.isInErrorState()) { + while (!transactionManager.hasProducerId() && !transactionManager.hasError()) { try { Node node = awaitLeastLoadedNodeReady(requestTimeout); if (node != null) { http://git-wip-us.apache.org/repos/asf/kafka/blob/651d9a53/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index ec7ced2..30fff86 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -67,8 +67,8 @@ public class TransactionManager { private final Map<TopicPartition, Integer> sequenceNumbers; private final PriorityQueue<TxnRequestHandler> pendingRequests; - private final Set<TopicPartition> newPartitionsToBeAddedToTransaction; - private final Set<TopicPartition> pendingPartitionsToBeAddedToTransaction; + private final Set<TopicPartition> newPartitionsInTransaction; + private final Set<TopicPartition> pendingPartitionsInTransaction; private final Set<TopicPartition> partitionsInTransaction; private final Map<TopicPartition, CommittedOffset> pendingTxnOffsetCommits; @@ -139,8 +139,8 @@ public class TransactionManager { this.transactionTimeoutMs = transactionTimeoutMs; this.transactionCoordinator = null; this.consumerGroupCoordinator = null; - this.newPartitionsToBeAddedToTransaction = new HashSet<>(); - this.pendingPartitionsToBeAddedToTransaction = new HashSet<>(); + this.newPartitionsInTransaction = new HashSet<>(); + this.pendingPartitionsInTransaction = new HashSet<>(); this.partitionsInTransaction = new HashSet<>(); this.pendingTxnOffsetCommits = new HashMap<>(); this.pendingRequests = new PriorityQueue<>(10, new Comparator<TxnRequestHandler>() { @@ -184,11 +184,14 @@ public class TransactionManager { if (currentState != State.ABORTABLE_ERROR) maybeFailWithError(); transitionTo(State.ABORTING_TRANSACTION); + + // We're aborting the transaction, so there should be no need to add new partitions + newPartitionsInTransaction.clear(); return beginCompletingTransaction(false); } private TransactionalRequestResult beginCompletingTransaction(boolean isCommit) { - if (!newPartitionsToBeAddedToTransaction.isEmpty()) { + if (!newPartitionsInTransaction.isEmpty()) { pendingRequests.add(addPartitionsToTransactionHandler()); } @@ -222,7 +225,7 @@ public class TransactionManager { if (partitionsInTransaction.contains(topicPartition)) return; - newPartitionsToBeAddedToTransaction.add(topicPartition); + newPartitionsInTransaction.add(topicPartition); } public RuntimeException lastError() { @@ -241,14 +244,22 @@ public class TransactionManager { return transactionalId != null; } + public synchronized boolean hasPartitionsToAdd() { + return !newPartitionsInTransaction.isEmpty() || !pendingPartitionsInTransaction.isEmpty(); + } + public synchronized boolean isCompletingTransaction() { return currentState == State.COMMITTING_TRANSACTION || currentState == State.ABORTING_TRANSACTION; } - public synchronized boolean isInErrorState() { + public synchronized boolean hasError() { return currentState == State.ABORTABLE_ERROR || currentState == State.FATAL_ERROR; } + public synchronized boolean isAborting() { + return currentState == State.ABORTING_TRANSACTION; + } + synchronized boolean isInTransaction() { return currentState == State.IN_TRANSACTION || isCompletingTransaction(); } @@ -334,7 +345,7 @@ public class TransactionManager { } synchronized TxnRequestHandler nextRequestHandler() { - if (!newPartitionsToBeAddedToTransaction.isEmpty()) + if (!newPartitionsInTransaction.isEmpty()) pendingRequests.add(addPartitionsToTransactionHandler()); TxnRequestHandler nextRequestHandler = pendingRequests.poll(); @@ -345,15 +356,16 @@ public class TransactionManager { } if (nextRequestHandler != null && nextRequestHandler.isEndTxn() && !transactionStarted) { - ((EndTxnHandler) nextRequestHandler).result.done(); + nextRequestHandler.result.done(); if (currentState != State.FATAL_ERROR) { + log.debug("TransactionId: {} -- Not sending EndTxn for completed transaction since no partitions " + + "or offsets were successfully added", transactionalId); completeTransaction(); } return pendingRequests.poll(); } return nextRequestHandler; - } synchronized void retry(TxnRequestHandler request) { @@ -403,7 +415,7 @@ public class TransactionManager { } // visible for testing - synchronized boolean isReadyForTransaction() { + synchronized boolean isReady() { return isTransactional() && currentState == State.READY; } @@ -434,12 +446,12 @@ public class TransactionManager { } private void maybeFailWithError() { - if (isInErrorState()) + if (hasError()) throw new KafkaException("Cannot execute transactional method because we are in an error state", lastError); } private boolean maybeTerminateRequestWithError(TxnRequestHandler requestHandler) { - if (isInErrorState()) { + if (hasError()) { if (requestHandler instanceof EndTxnHandler) { // we allow abort requests to break out of the error state. The state and the last error // will be cleared when the request returns @@ -477,10 +489,10 @@ public class TransactionManager { } private synchronized TxnRequestHandler addPartitionsToTransactionHandler() { - pendingPartitionsToBeAddedToTransaction.addAll(newPartitionsToBeAddedToTransaction); - newPartitionsToBeAddedToTransaction.clear(); + pendingPartitionsInTransaction.addAll(newPartitionsInTransaction); + newPartitionsInTransaction.clear(); AddPartitionsToTxnRequest.Builder builder = new AddPartitionsToTxnRequest.Builder(transactionalId, - producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, new ArrayList<>(pendingPartitionsToBeAddedToTransaction)); + producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, new ArrayList<>(pendingPartitionsInTransaction)); return new AddPartitionsToTxnHandler(builder); } @@ -497,7 +509,7 @@ public class TransactionManager { return new TxnOffsetCommitHandler(result, builder); } - abstract class TxnRequestHandler implements RequestCompletionHandler { + abstract class TxnRequestHandler implements RequestCompletionHandler { protected final TransactionalRequestResult result; private boolean isRetry = false; @@ -658,12 +670,13 @@ public class TransactionManager { log.debug("TransactionalId {} -- Received AddPartitionsToTxn response with errors {}", transactionalId, errors); - for (TopicPartition topicPartition : pendingPartitionsToBeAddedToTransaction) { - final Errors error = errors.get(topicPartition); - if (error == Errors.NONE || error == null) { + for (Map.Entry<TopicPartition, Errors> topicPartitionErrorEntry : errors.entrySet()) { + TopicPartition topicPartition = topicPartitionErrorEntry.getKey(); + Errors error = topicPartitionErrorEntry.getValue(); + + if (error == Errors.NONE) { continue; - } - if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) { + } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) { lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); reenqueue(); return; @@ -695,8 +708,9 @@ public class TransactionManager { } else if (hasPartitionErrors) { abortableError(new KafkaException("Could not add partitions to transaction due to partition level errors")); } else { - partitionsInTransaction.addAll(pendingPartitionsToBeAddedToTransaction); - pendingPartitionsToBeAddedToTransaction.clear(); + Set<TopicPartition> addedPartitions = errors.keySet(); + partitionsInTransaction.addAll(addedPartitions); + pendingPartitionsInTransaction.removeAll(addedPartitions); transactionStarted = true; result.done(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/651d9a53/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java index 3339470..36b290f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java @@ -54,7 +54,8 @@ public class AddOffsetsToTxnRequest extends AbstractRequest { @Override public String toString() { StringBuilder bld = new StringBuilder(); - bld.append("(transactionalId=").append(transactionalId). + bld.append("(type=AddOffsetsToTxnRequest"). + append(", transactionalId=").append(transactionalId). append(", producerId=").append(producerId). append(", producerEpoch=").append(producerEpoch). append(", consumerGroupId=").append(consumerGroupId). http://git-wip-us.apache.org/repos/asf/kafka/blob/651d9a53/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java index e24fa5a..6fe034c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java @@ -55,10 +55,15 @@ public class AddPartitionsToTxnRequest extends AbstractRequest { return new AddPartitionsToTxnRequest(version, transactionalId, producerId, producerEpoch, partitions); } + public List<TopicPartition> partitions() { + return partitions; + } + @Override public String toString() { StringBuilder bld = new StringBuilder(); - bld.append("(transactionalId=").append(transactionalId). + bld.append("(type=AddPartitionsToTxnRequest"). + append(", transactionalId=").append(transactionalId). append(", producerId=").append(producerId). append(", producerEpoch=").append(producerEpoch). append(", partitions=").append(partitions). http://git-wip-us.apache.org/repos/asf/kafka/blob/651d9a53/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java index b9f052c..01d73b2 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java @@ -54,7 +54,8 @@ public class EndTxnRequest extends AbstractRequest { @Override public String toString() { StringBuilder bld = new StringBuilder(); - bld.append("(transactionalId=").append(transactionalId). + bld.append("(type=EndTxnRequest"). + append(", transactionalId=").append(transactionalId). append(", producerId=").append(producerId). append(", producerEpoch=").append(producerEpoch). append(", result=").append(result). http://git-wip-us.apache.org/repos/asf/kafka/blob/651d9a53/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java index fc7d53c..39c027b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java @@ -151,7 +151,7 @@ public class FetchRequest extends AbstractRequest { @Override public String toString() { StringBuilder bld = new StringBuilder(); - bld.append("(type:FetchRequest"). + bld.append("(type=FetchRequest"). append(", replicaId=").append(replicaId). append(", maxWait=").append(maxWait). append(", minBytes=").append(minBytes). @@ -163,11 +163,6 @@ public class FetchRequest extends AbstractRequest { } private FetchRequest(short version, int replicaId, int maxWait, int minBytes, int maxBytes, - LinkedHashMap<TopicPartition, PartitionData> fetchData) { - this(version, replicaId, maxWait, minBytes, maxBytes, fetchData, IsolationLevel.READ_UNCOMMITTED); - } - - private FetchRequest(short version, int replicaId, int maxWait, int minBytes, int maxBytes, LinkedHashMap<TopicPartition, PartitionData> fetchData, IsolationLevel isolationLevel) { super(version); this.replicaId = replicaId; http://git-wip-us.apache.org/repos/asf/kafka/blob/651d9a53/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java index 68fa3d2..2ea8ecf 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java @@ -67,7 +67,8 @@ public class TxnOffsetCommitRequest extends AbstractRequest { @Override public String toString() { StringBuilder bld = new StringBuilder(); - bld.append("(transactionalId=").append(transactionalId). + bld.append("(type=TxnOffsetCommitRequest"). + append(", transactionalId=").append(transactionalId). append(", producerId=").append(producerId). append(", producerEpoch=").append(producerEpoch). append(", consumerGroupId=").append(consumerGroupId). http://git-wip-us.apache.org/repos/asf/kafka/blob/651d9a53/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 77b1da8..faa6ea5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -403,7 +403,7 @@ public class SenderTest { client.setNode(new Node(1, "localhost", 33343)); prepareAndReceiveInitProducerId(producerId, Errors.CLUSTER_AUTHORIZATION_FAILED); assertFalse(transactionManager.hasProducerId()); - assertTrue(transactionManager.isInErrorState()); + assertTrue(transactionManager.hasError()); assertTrue(transactionManager.lastError() instanceof ClusterAuthorizationException); // cluster authorization is a fatal error for the producer http://git-wip-us.apache.org/repos/asf/kafka/blob/651d9a53/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index a1bd970..ed7ec84 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.TransactionalIdAuthorizationException; import org.apache.kafka.common.internals.ClusterResourceListeners; @@ -59,7 +60,6 @@ import org.apache.kafka.test.TestUtils; import org.junit.Before; import org.junit.Test; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -69,6 +69,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import static java.util.Collections.singleton; +import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -79,7 +80,7 @@ import static org.junit.Assert.fail; public class TransactionManagerTest { private static final int MAX_REQUEST_SIZE = 1024 * 1024; private static final short ACKS_ALL = -1; - private static final int MAX_RETRIES = 0; + private static final int MAX_RETRIES = Integer.MAX_VALUE; private static final String CLIENT_ID = "clientId"; private static final int MAX_BLOCK_TIMEOUT = 1000; private static final int REQUEST_TIMEOUT = 1000; @@ -109,19 +110,8 @@ public class TransactionManagerTest { this.transactionManager = new TransactionManager(transactionalId, transactionTimeoutMs); Metrics metrics = new Metrics(metricConfig, time); this.accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, apiVersions, transactionManager); - this.sender = new Sender(this.client, - this.metadata, - this.accumulator, - true, - MAX_REQUEST_SIZE, - ACKS_ALL, - MAX_RETRIES, - metrics, - this.time, - REQUEST_TIMEOUT, - 50, - transactionManager, - apiVersions); + this.sender = new Sender(this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, + MAX_RETRIES, metrics, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); this.metadata.update(this.cluster, Collections.<String>emptySet(), time.milliseconds()); client.setNode(brokerNode); } @@ -163,7 +153,7 @@ public class TransactionManagerTest { transactionManager.maybeAddPartitionToTransaction(tp0); Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future; + "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future; assertFalse(responseFuture.isDone()); prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid); @@ -271,7 +261,7 @@ public class TransactionManagerTest { sender.run(time.milliseconds()); // find coordinator sender.run(time.milliseconds()); - assertTrue(transactionManager.isInErrorState()); + assertTrue(transactionManager.hasError()); assertTrue(transactionManager.lastError() instanceof TransactionalIdAuthorizationException); sender.run(time.milliseconds()); // one more run to fail the InitProducerId future @@ -294,7 +284,7 @@ public class TransactionManagerTest { prepareInitPidResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, false, pid, RecordBatch.NO_PRODUCER_EPOCH); sender.run(time.milliseconds()); - assertTrue(transactionManager.isInErrorState()); + assertTrue(transactionManager.hasError()); assertTrue(initPidResult.isCompleted()); assertFalse(initPidResult.isSuccessful()); assertTrue(initPidResult.error() instanceof TransactionalIdAuthorizationException); @@ -321,7 +311,7 @@ public class TransactionManagerTest { prepareFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, false, CoordinatorType.GROUP, consumerGroupId); sender.run(time.milliseconds()); // FindCoordinator Failed sender.run(time.milliseconds()); // TxnOffsetCommit Aborted - assertTrue(transactionManager.isInErrorState()); + assertTrue(transactionManager.hasError()); assertTrue(transactionManager.lastError() instanceof GroupAuthorizationException); assertTrue(sendOffsetsResult.isCompleted()); assertFalse(sendOffsetsResult.isSuccessful()); @@ -356,7 +346,7 @@ public class TransactionManagerTest { prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, singletonMap(tp, Errors.GROUP_AUTHORIZATION_FAILED)); sender.run(time.milliseconds()); // TxnOffsetCommit Handled - assertTrue(transactionManager.isInErrorState()); + assertTrue(transactionManager.hasError()); assertTrue(transactionManager.lastError() instanceof GroupAuthorizationException); assertTrue(sendOffsetsResult.isCompleted()); assertFalse(sendOffsetsResult.isSuccessful()); @@ -384,7 +374,7 @@ public class TransactionManagerTest { prepareAddOffsetsToTxnResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, consumerGroupId, pid, epoch); sender.run(time.milliseconds()); // AddOffsetsToTxn Handled - assertTrue(transactionManager.isInErrorState()); + assertTrue(transactionManager.hasError()); assertTrue(transactionManager.lastError() instanceof TransactionalIdAuthorizationException); assertTrue(sendOffsetsResult.isCompleted()); assertFalse(sendOffsetsResult.isSuccessful()); @@ -416,7 +406,7 @@ public class TransactionManagerTest { prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, singletonMap(tp, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED)); sender.run(time.milliseconds()); // TxnOffsetCommit Handled - assertTrue(transactionManager.isInErrorState()); + assertTrue(transactionManager.hasError()); assertTrue(transactionManager.lastError() instanceof TransactionalIdAuthorizationException); assertTrue(sendOffsetsResult.isCompleted()); assertFalse(sendOffsetsResult.isSuccessful()); @@ -439,7 +429,7 @@ public class TransactionManagerTest { prepareAddPartitionsToTxn(tp, Errors.TOPIC_AUTHORIZATION_FAILED); sender.run(time.milliseconds()); - assertTrue(transactionManager.isInErrorState()); + assertTrue(transactionManager.hasError()); assertTrue(transactionManager.lastError() instanceof TopicAuthorizationException); TopicAuthorizationException exception = (TopicAuthorizationException) transactionManager.lastError(); @@ -462,7 +452,7 @@ public class TransactionManagerTest { prepareAddPartitionsToTxn(tp, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED); sender.run(time.milliseconds()); - assertTrue(transactionManager.isInErrorState()); + assertTrue(transactionManager.hasError()); assertTrue(transactionManager.lastError() instanceof TransactionalIdAuthorizationException); assertFatalError(TransactionalIdAuthorizationException.class); @@ -584,7 +574,7 @@ public class TransactionManagerTest { sender.run(time.milliseconds()); // send produce. assertTrue(responseFuture.isDone()); - assertTrue(transactionManager.isInErrorState()); + assertTrue(transactionManager.hasError()); responseFuture.get(); } @@ -608,16 +598,22 @@ public class TransactionManagerTest { sender.run(time.milliseconds()); // Send AddPartitionsRequest assertFalse(commitResult.isCompleted()); - sender.run(time.milliseconds()); // Send Produce Request, returns OutOfOrderSequenceException. + sender.run(time.milliseconds()); // try to commit. assertTrue(commitResult.isCompleted()); // commit should be cancelled with exception without being sent. try { commitResult.await(); fail(); // the get() must throw an exception. - } catch (RuntimeException e) { - assertTrue(e instanceof KafkaException); + } catch (KafkaException e) { + } + + try { + responseFuture.get(); + fail("Expected produce future to raise an exception"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof OutOfOrderSequenceException); } // Commit is not allowed, so let's abort and try again. @@ -627,7 +623,7 @@ public class TransactionManagerTest { assertTrue(abortResult.isCompleted()); assertTrue(abortResult.isSuccessful()); - assertTrue(transactionManager.isReadyForTransaction()); // make sure we are ready for a transaction now. + assertTrue(transactionManager.isReady()); // make sure we are ready for a transaction now. } @Test @@ -643,20 +639,125 @@ public class TransactionManagerTest { Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future; - TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction(); assertFalse(responseFuture.isDone()); prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid); prepareProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, pid, epoch); prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch); sender.run(time.milliseconds()); // Send AddPartitionsRequest - assertFalse(abortResult.isCompleted()); - sender.run(time.milliseconds()); // Send Produce Request, returns OutOfOrderSequenceException. + + TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction(); + sender.run(time.milliseconds()); // try to abort + assertTrue(abortResult.isCompleted()); + assertTrue(abortResult.isSuccessful()); + assertTrue(transactionManager.isReady()); // make sure we are ready for a transaction now. + } + + @Test + public void testCancelUnsentAddPartitionsAndProduceOnAbort() throws InterruptedException { + final long pid = 13131L; + final short epoch = 1; + + doInitTransactions(pid, epoch); + + transactionManager.beginTransaction(); + transactionManager.maybeAddPartitionToTransaction(tp0); + + Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), + "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future; + + assertFalse(responseFuture.isDone()); + + TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction(); + // note since no partitions were added to the transaction, no EndTxn will be sent + sender.run(time.milliseconds()); // try to abort assertTrue(abortResult.isCompleted()); assertTrue(abortResult.isSuccessful()); - assertTrue(transactionManager.isReadyForTransaction()); // make sure we are ready for a transaction now. + assertTrue(transactionManager.isReady()); // make sure we are ready for a transaction now. + + try { + responseFuture.get(); + fail("Expected produce future to raise an exception"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof KafkaException); + } + } + + @Test + public void testAbortResendsAddPartitionErrorIfRetried() throws InterruptedException { + final long producerId = 13131L; + final short producerEpoch = 1; + + doInitTransactions(producerId, producerEpoch); + + transactionManager.beginTransaction(); + transactionManager.maybeAddPartitionToTransaction(tp0); + prepareAddPartitionsToTxnResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, tp0, producerEpoch, producerId); + + Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), + "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future; + + sender.run(time.milliseconds()); // Send AddPartitions and let it fail + assertFalse(responseFuture.isDone()); + + TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction(); + + // we should resend the AddPartitions + prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, producerEpoch, producerId); + prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, producerId, producerEpoch); + + sender.run(time.milliseconds()); // Resend AddPartitions + sender.run(time.milliseconds()); // Send EndTxn + + assertTrue(abortResult.isCompleted()); + assertTrue(abortResult.isSuccessful()); + assertTrue(transactionManager.isReady()); // make sure we are ready for a transaction now. + + try { + responseFuture.get(); + fail("Expected produce future to raise an exception"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof KafkaException); + } + } + + @Test + public void testAbortResendsProduceRequestIfRetried() throws Exception { + final long producerId = 13131L; + final short producerEpoch = 1; + + doInitTransactions(producerId, producerEpoch); + + transactionManager.beginTransaction(); + transactionManager.maybeAddPartitionToTransaction(tp0); + prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, producerEpoch, producerId); + prepareProduceResponse(Errors.REQUEST_TIMED_OUT, producerId, producerEpoch); + + Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), + "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future; + + sender.run(time.milliseconds()); // Send AddPartitions + sender.run(time.milliseconds()); // Send ProduceRequest and let it fail + + assertFalse(responseFuture.isDone()); + + TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction(); + + // we should resend the ProduceRequest before aborting + prepareProduceResponse(Errors.NONE, producerId, producerEpoch); + prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, producerId, producerEpoch); + + sender.run(time.milliseconds()); // Resend ProduceRequest + sender.run(time.milliseconds()); // Send EndTxn + + assertTrue(abortResult.isCompleted()); + assertTrue(abortResult.isSuccessful()); + assertTrue(transactionManager.isReady()); // make sure we are ready for a transaction now. + + RecordMetadata recordMetadata = responseFuture.get(); + assertEquals(tp0.topic(), recordMetadata.topic()); } @Test @@ -739,27 +840,18 @@ public class TransactionManagerTest { @Test public void shouldNotSendAbortTxnRequestWhenOnlyAddPartitionsRequestFailed() throws Exception { - client.setNode(brokerNode); - // This is called from the initTransactions method in the producer as the first order of business. - // It finds the coordinator and then gets a PID. final long pid = 13131L; final short epoch = 1; - transactionManager.initializeTransactions(); - prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); - sender.run(time.milliseconds()); // find coordinator - sender.run(time.milliseconds()); - - prepareInitPidResponse(Errors.NONE, false, pid, epoch); - sender.run(time.milliseconds()); // get pid. + doInitTransactions(pid, epoch); transactionManager.beginTransaction(); transactionManager.maybeAddPartitionToTransaction(tp0); - TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction(); - prepareAddPartitionsToTxnResponse(Errors.TOPIC_AUTHORIZATION_FAILED, tp0, epoch, pid); sender.run(time.milliseconds()); // Send AddPartitionsRequest + + TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction(); assertFalse(abortResult.isCompleted()); sender.run(time.milliseconds()); @@ -769,19 +861,10 @@ public class TransactionManagerTest { @Test public void shouldNotSendAbortTxnRequestWhenOnlyAddOffsetsRequestFailed() throws Exception { - client.setNode(brokerNode); - // This is called from the initTransactions method in the producer as the first order of business. - // It finds the coordinator and then gets a PID. final long pid = 13131L; final short epoch = 1; - transactionManager.initializeTransactions(); - prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); - sender.run(time.milliseconds()); // find coordinator - sender.run(time.milliseconds()); - - prepareInitPidResponse(Errors.NONE, false, pid, epoch); - sender.run(time.milliseconds()); // get pid. + doInitTransactions(pid, epoch); transactionManager.beginTransaction(); Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); @@ -815,7 +898,7 @@ public class TransactionManagerTest { assertFalse(responseFuture.isDone()); prepareAddPartitionsToTxn(tp0, error); sender.run(time.milliseconds()); // attempt send addPartitions. - assertTrue(transactionManager.isInErrorState()); + assertTrue(transactionManager.hasError()); assertFalse(transactionManager.transactionContainsPartition(tp0)); } @@ -883,7 +966,7 @@ public class TransactionManagerTest { AddPartitionsToTxnRequest addPartitionsToTxnRequest = (AddPartitionsToTxnRequest) body; assertEquals(pid, addPartitionsToTxnRequest.producerId()); assertEquals(epoch, addPartitionsToTxnRequest.producerEpoch()); - assertEquals(Arrays.asList(topicPartition), addPartitionsToTxnRequest.partitions()); + assertEquals(singletonList(topicPartition), addPartitionsToTxnRequest.partitions()); assertEquals(transactionalId, addPartitionsToTxnRequest.transactionalId()); return true; } @@ -958,23 +1041,23 @@ public class TransactionManagerTest { fail("Should have raised " + cause.getSimpleName()); } catch (KafkaException e) { assertTrue(cause.isAssignableFrom(e.getCause().getClass())); - assertTrue(transactionManager.isInErrorState()); + assertTrue(transactionManager.hasError()); } - assertTrue(transactionManager.isInErrorState()); + assertTrue(transactionManager.hasError()); transactionManager.beginAbortingTransaction(); - assertFalse(transactionManager.isInErrorState()); + assertFalse(transactionManager.hasError()); } private void assertFatalError(Class<? extends RuntimeException> cause) { - assertTrue(transactionManager.isInErrorState()); + assertTrue(transactionManager.hasError()); try { transactionManager.beginAbortingTransaction(); fail("Should have raised " + cause.getSimpleName()); } catch (KafkaException e) { assertTrue(cause.isAssignableFrom(e.getCause().getClass())); - assertTrue(transactionManager.isInErrorState()); + assertTrue(transactionManager.hasError()); } // Transaction abort cannot clear fatal error state @@ -983,7 +1066,7 @@ public class TransactionManagerTest { fail("Should have raised " + cause.getSimpleName()); } catch (KafkaException e) { assertTrue(cause.isAssignableFrom(e.getCause().getClass())); - assertTrue(transactionManager.isInErrorState()); + assertTrue(transactionManager.hasError()); } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/651d9a53/core/src/test/scala/integration/kafka/api/TransactionsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala index fd9d884..205dc6e 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala @@ -70,6 +70,7 @@ class TransactionsTest extends KafkaServerTestHarness { producer.beginTransaction() producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "2", willBeCommitted = false)) producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "4", "4", willBeCommitted = false)) + producer.flush() producer.abortTransaction() producer.beginTransaction()
