KAFKA-5259; TransactionalId auth implies ProducerId auth Author: Jason Gustafson <[email protected]>
Reviewers: Apurva Mehta <[email protected]>, Jun Rao <[email protected]> Closes #3075 from hachikuji/KAFKA-5259-FIXED (cherry picked from commit 38f6cae9e879baa35c5dbc5829bf09ecd59930c2) Signed-off-by: Jason Gustafson <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9a21bf20 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9a21bf20 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9a21bf20 Branch: refs/heads/0.11.0 Commit: 9a21bf20b623b790c9910813e373d2d61fe84c2c Parents: 7cb57dc Author: Jason Gustafson <[email protected]> Authored: Wed May 24 15:26:46 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Wed May 24 15:28:55 2017 -0700 ---------------------------------------------------------------------- .../apache/kafka/clients/ClientResponse.java | 7 +- .../kafka/clients/admin/AclOperation.java | 7 +- .../clients/consumer/internals/Fetcher.java | 2 +- .../kafka/clients/producer/KafkaProducer.java | 35 +- .../clients/producer/internals/Sender.java | 169 +- .../producer/internals/TransactionManager.java | 268 ++-- .../ProducerIdAuthorizationException.java | 23 - .../TransactionalIdAuthorizationException.java | 2 +- .../apache/kafka/common/protocol/Errors.java | 13 +- .../apache/kafka/common/protocol/Protocol.java | 3 + .../common/requests/AddOffsetsToTxnRequest.java | 11 + .../requests/AddOffsetsToTxnResponse.java | 5 +- .../requests/AddPartitionsToTxnRequest.java | 11 + .../requests/AddPartitionsToTxnResponse.java | 3 +- .../kafka/common/requests/EndTxnRequest.java | 11 + .../kafka/common/requests/EndTxnResponse.java | 3 +- .../common/requests/InitProducerIdResponse.java | 12 +- .../kafka/common/requests/ProduceRequest.java | 5 +- .../kafka/common/requests/ProduceResponse.java | 3 + .../common/requests/TxnOffsetCommitRequest.java | 31 +- .../requests/TxnOffsetCommitResponse.java | 1 + .../requests/WriteTxnMarkersResponse.java | 1 + .../kafka/clients/admin/AclOperationTest.java | 3 +- .../clients/producer/internals/SenderTest.java | 183 +-- .../internals/TransactionManagerTest.java | 445 ++++-- .../common/requests/RequestResponseTest.java | 10 +- .../src/main/scala/kafka/admin/AclCommand.scala | 54 +- .../kafka/coordinator/group/GroupMetadata.scala | 43 +- .../group/GroupMetadataManager.scala | 5 +- .../coordinator/group/MemberMetadata.scala | 14 +- .../scala/kafka/security/auth/Operation.scala | 6 +- .../scala/kafka/security/auth/Resource.scala | 3 +- .../kafka/security/auth/ResourceType.scala | 16 +- .../src/main/scala/kafka/server/KafkaApis.scala | 316 ++-- .../kafka/api/AuthorizerIntegrationTest.scala | 292 +++- .../kafka/api/TransactionsBounceTest.scala | 31 +- .../kafka/api/TransactionsTest.scala | 7 +- .../scala/unit/kafka/admin/AclCommandTest.scala | 36 +- .../group/GroupCoordinatorResponseTest.scala | 1492 ------------------ .../group/GroupCoordinatorTest.scala | 1492 ++++++++++++++++++ .../coordinator/group/GroupMetadataTest.scala | 2 +- .../unit/kafka/server/RequestQuotaTest.scala | 9 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 10 +- 43 files changed, 2771 insertions(+), 2324 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java b/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java index 715eae7..0ff30e9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.requests.AbstractResponse; import org.apache.kafka.common.requests.RequestHeader; @@ -31,7 +32,7 @@ public class ClientResponse { private final long receivedTimeMs; private final long latencyMs; private final boolean disconnected; - private final RuntimeException versionMismatch; + private final UnsupportedVersionException versionMismatch; private final AbstractResponse responseBody; /** @@ -51,7 +52,7 @@ public class ClientResponse { long createdTimeMs, long receivedTimeMs, boolean disconnected, - RuntimeException versionMismatch, + UnsupportedVersionException versionMismatch, AbstractResponse responseBody) { this.requestHeader = requestHeader; this.callback = callback; @@ -71,7 +72,7 @@ public class ClientResponse { return disconnected; } - public RuntimeException versionMismatch() { + public UnsupportedVersionException versionMismatch() { return versionMismatch; } http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java b/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java index 062e5e3..0c3ff50 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java @@ -83,7 +83,12 @@ public enum AclOperation { /** * ALTER_CONFIGS operation. */ - ALTER_CONFIGS((byte) 11); + ALTER_CONFIGS((byte) 11), + + /** + * IDEMPOTENT_WRITE operation. + */ + IDEMPOTENT_WRITE((byte) 12); private final static HashMap<Byte, AclOperation> CODE_TO_VALUE = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 509993f..6917a1d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -416,7 +416,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { } // we might lose the assignment while fetching the offset, so check it is still active if (subscriptions.isAssigned(partition)) { - log.debug("Resetting offset for partition {} to {} offset.", partition, offsetData.offset); + log.debug("Resetting offset for partition {} to offset {}.", partition, offsetData.offset); this.subscriptions.seek(partition, offsetData.offset); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index ac0169a..c11ecc7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -51,7 +51,6 @@ import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.network.Selector; -import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.AbstractRecords; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.RecordBatch; @@ -607,7 +606,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> { * Implementation of asynchronously send a record to a topic. */ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { - ensureProperTransactionalState(); + if (transactionManager != null) + ensureProperTransactionalState(); + TopicPartition tp = null; try { // first make sure the metadata for the topic is available @@ -642,9 +643,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> { long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp(); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); // producer callback will make sure to call both 'callback' and interceptor callback - Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp, transactionManager); + Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp); - if (transactionManager != null) + if (transactionManager != null && transactionManager.isTransactional()) transactionManager.maybeAddPartitionToTransaction(tp); RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, @@ -690,27 +691,17 @@ public class KafkaProducer<K, V> implements Producer<K, V> { } private void ensureProperTransactionalState() { - if (transactionManager == null) - return; - if (transactionManager.isTransactional() && !transactionManager.hasProducerId()) - throw new IllegalStateException("Cannot perform a 'send' before completing a call to initTransactions when transactions are enabled."); - - if (transactionManager.isFenced()) - throw Errors.INVALID_PRODUCER_EPOCH.exception(); + throw new IllegalStateException("Cannot perform a 'send' before completing a call to initTransactions " + + "when transactions are enabled."); if (transactionManager.isInErrorState()) { - String errorMessage = - "Cannot perform send because at least one previous transactional or idempotent request has failed with errors."; Exception lastError = transactionManager.lastError(); - if (lastError != null) - throw new KafkaException(errorMessage, lastError); - else - throw new KafkaException(errorMessage); + throw new KafkaException("Cannot perform send because at least one previous transactional or " + + "idempotent request has failed with errors.", lastError); } if (transactionManager.isCompletingTransaction()) throw new IllegalStateException("Cannot call send while a commit or abort is in progress."); - } private void setReadOnly(Headers headers) { @@ -1013,14 +1004,11 @@ public class KafkaProducer<K, V> implements Producer<K, V> { private final Callback userCallback; private final ProducerInterceptors<K, V> interceptors; private final TopicPartition tp; - private final TransactionManager transactionManager; - public InterceptorCallback(Callback userCallback, ProducerInterceptors<K, V> interceptors, - TopicPartition tp, TransactionManager transactionManager) { + private InterceptorCallback(Callback userCallback, ProducerInterceptors<K, V> interceptors, TopicPartition tp) { this.userCallback = userCallback; this.interceptors = interceptors; this.tp = tp; - this.transactionManager = transactionManager; } public void onCompletion(RecordMetadata metadata, Exception exception) { @@ -1034,9 +1022,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> { } if (this.userCallback != null) this.userCallback.onCompletion(metadata, exception); - - if (exception != null && transactionManager != null) - transactionManager.setError(exception); } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 4c3b99d..116a1c5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -24,15 +24,18 @@ import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.NetworkClientUtils; import org.apache.kafka.clients.RequestCompletionHandler; import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.InvalidMetadataException; import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.errors.TransactionalIdAuthorizationException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -189,28 +192,34 @@ public class Sender implements Runnable { * @param now The current POSIX time in milliseconds */ void run(long now) { - long pollTimeout = retryBackoffMs; - if (!maybeSendTransactionalRequest(now)) { - pollTimeout = sendProducerData(now); + if (transactionManager != null) { + if (!transactionManager.isTransactional()) { + // this is an idempotent producer, so make sure we have a producer id + maybeWaitForProducerId(); + } else if (transactionManager.hasInflightRequest() || maybeSendTransactionalRequest(now)) { + // as long as there are outstanding transactional requests, we simply wait for them to return + client.poll(retryBackoffMs, now); + return; + } + + // do not continue sending if the transaction manager is in a failed state or if there + // is no producer id (for the idempotent case). + if (transactionManager.isInErrorState() || !transactionManager.hasProducerId()) { + RuntimeException lastError = transactionManager.lastError(); + if (lastError != null) + maybeAbortBatches(lastError); + client.poll(retryBackoffMs, now); + return; + } } + long pollTimeout = sendProducerData(now); log.trace("waiting {}ms in poll", pollTimeout); - this.client.poll(pollTimeout, now); + client.poll(pollTimeout, now); } - private long sendProducerData(long now) { Cluster cluster = metadata.fetch(); - maybeWaitForProducerId(); - - if (transactionManager != null && transactionManager.isInErrorState()) { - final KafkaException exception = transactionManager.lastError() instanceof KafkaException - ? (KafkaException) transactionManager.lastError() - : new KafkaException(transactionManager.lastError()); - log.error("aborting producer batches because the transaction manager is in an error state.", exception); - this.accumulator.abortBatches(exception); - return Long.MAX_VALUE; - } // get the list of partitions with data ready to send RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); @@ -286,22 +295,13 @@ public class Sender implements Runnable { } private boolean maybeSendTransactionalRequest(long now) { - if (transactionManager == null || !transactionManager.isTransactional()) - return false; - - if (transactionManager.hasInflightRequest()) { - log.trace("TransactionalId: {} -- There is already an inflight transactional request. Going to wait for the response.", + TransactionManager.TxnRequestHandler nextRequestHandler = transactionManager.nextRequestHandler(); + if (nextRequestHandler == null) { + log.trace("TransactionalId: {} -- There are no pending transactional requests to send", transactionManager.transactionalId()); - return true; - } - - if (!transactionManager.hasPendingTransactionalRequests()) { - log.trace("TransactionalId: {} -- There are no pending transactional requests to send", transactionManager.transactionalId()); return false; } - TransactionManager.TxnRequestHandler nextRequestHandler = transactionManager.nextRequestHandler(); - if (nextRequestHandler.isEndTxn() && transactionManager.isCompletingTransaction() && accumulator.hasUnflushedBatches()) { if (!accumulator.flushInProgress()) accumulator.beginFlush(); @@ -311,15 +311,11 @@ public class Sender implements Runnable { return false; } - if (transactionManager.maybeTerminateRequestWithError(nextRequestHandler)) { - log.trace("TransactionalId: {} -- Not sending a transactional request because we are in an error state", - transactionManager.transactionalId()); - return false; - } - - Node targetNode = null; + log.debug("TransactionalId: {} -- Sending transactional request {}", transactionManager.transactionalId(), + nextRequestHandler.requestBuilder()); - while (targetNode == null) { + while (true) { + Node targetNode = null; try { if (nextRequestHandler.needsCoordinator()) { targetNode = transactionManager.coordinator(nextRequestHandler.coordinatorType()); @@ -340,8 +336,8 @@ public class Sender implements Runnable { transactionManager.transactionalId(), retryBackoffMs, nextRequestHandler.requestBuilder()); time.sleep(retryBackoffMs); } - ClientRequest clientRequest = client.newClientRequest(targetNode.idString(), nextRequestHandler.requestBuilder(), - now, true, nextRequestHandler); + ClientRequest clientRequest = client.newClientRequest(targetNode.idString(), + nextRequestHandler.requestBuilder(), now, true, nextRequestHandler); transactionManager.setInFlightRequestCorrelationId(clientRequest.correlationId()); log.trace("TransactionalId: {} -- Sending transactional request {} to node {}", transactionManager.transactionalId(), nextRequestHandler.requestBuilder(), clientRequest.destination()); @@ -349,9 +345,9 @@ public class Sender implements Runnable { return true; } } catch (IOException e) { - targetNode = null; - log.warn("TransactionalId: " + transactionManager.transactionalId() + " -- Got an exception when trying " + - "to find a node to send transactional request " + nextRequestHandler.requestBuilder() + ". Going to back off and retry", e); + log.debug("TransactionalId: {} -- Disconnect from {} while trying to send transactional " + + "request {}. Going to back off and retry", transactionManager.transactionalId(), + targetNode, nextRequestHandler.requestBuilder()); } log.trace("TransactionalId: {}. About to wait for {}ms before trying to send another transactional request.", transactionManager.transactionalId(), retryBackoffMs); @@ -364,6 +360,13 @@ public class Sender implements Runnable { return true; } + private void maybeAbortBatches(RuntimeException exception) { + if (accumulator.hasUnflushedBatches()) { + log.error("Aborting producer batches due to fatal error", exception); + accumulator.abortBatches(exception); + } + } + /** * Start closing the sender (won't actually complete until all data is sent out) */ @@ -383,7 +386,7 @@ public class Sender implements Runnable { initiateClose(); } - private ClientResponse sendAndAwaitInitPidRequest(Node node) throws IOException { + private ClientResponse sendAndAwaitInitProducerIdRequest(Node node) throws IOException { String nodeId = node.idString(); InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(null); ClientRequest request = client.newClientRequest(nodeId, builder, time.milliseconds(), true, null); @@ -399,43 +402,37 @@ public class Sender implements Runnable { } private void maybeWaitForProducerId() { - // If this is a transactional producer, the producer id will be received when recovering transactions in the - // initTransactions() method of the producer. - if (transactionManager == null || transactionManager.isTransactional()) - return; - while (!transactionManager.hasProducerId() && !transactionManager.isInErrorState()) { try { Node node = awaitLeastLoadedNodeReady(requestTimeout); if (node != null) { - ClientResponse response = sendAndAwaitInitPidRequest(node); - - if (response.hasResponse() && (response.responseBody() instanceof InitProducerIdResponse)) { - InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) response.responseBody(); - Exception exception = initProducerIdResponse.error().exception(); - if (exception != null && !(exception instanceof RetriableException)) { - transactionManager.setError(exception); - return; - } + ClientResponse response = sendAndAwaitInitProducerIdRequest(node); + InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) response.responseBody(); + Errors error = initProducerIdResponse.error(); + if (error == Errors.NONE) { ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch( initProducerIdResponse.producerId(), initProducerIdResponse.epoch()); transactionManager.setProducerIdAndEpoch(producerIdAndEpoch); + } else if (error.exception() instanceof RetriableException) { + log.debug("Retriable error from InitProducerId response", error.message()); } else { - log.error("Received an unexpected response type for an InitProducerIdRequest from {}. " + - "We will back off and try again.", node); + transactionManager.transitionToFatalError(error.exception()); + break; } } else { log.debug("Could not find an available broker to send InitProducerIdRequest to. " + "We will back off and try again."); } - } catch (Exception e) { - log.warn("Received an exception while trying to get a producer id. Will back off and retry.", e); + } catch (UnsupportedVersionException e) { + transactionManager.transitionToFatalError(e); + break; + } catch (IOException e) { + log.debug("Broker {} disconnected while awaiting InitProducerId response", e); } log.trace("Retry InitProducerIdRequest in {}ms.", retryBackoffMs); time.sleep(retryBackoffMs); metadata.requestUpdate(); } - } /** @@ -507,9 +504,9 @@ public class Sender implements Runnable { error); if (transactionManager == null) { reenqueueBatch(batch, now); - } else if (transactionManager.producerIdAndEpoch().producerId == batch.producerId() && - transactionManager.producerIdAndEpoch().epoch == batch.producerEpoch()) { - // If idempotence is enabled only retry the request if the current producer id is the same as the producer id of the batch. + } else if (transactionManager.hasProducerIdAndEpoch(batch.producerId(), batch.producerEpoch())) { + // If idempotence is enabled only retry the request if the current producer id is the same as + // the producer id of the batch. log.debug("Retrying batch to topic-partition {}. Sequence number : {}", batch.topicPartition, transactionManager.sequenceNumber(batch.topicPartition)); reenqueueBatch(batch, now); @@ -523,12 +520,10 @@ public class Sender implements Runnable { final RuntimeException exception; if (error == Errors.TOPIC_AUTHORIZATION_FAILED) exception = new TopicAuthorizationException(batch.topicPartition.topic()); + else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED) + exception = new ClusterAuthorizationException("The producer is not authorized to do idempotent sends"); else exception = error.exception(); - if (error == Errors.OUT_OF_ORDER_SEQUENCE_NUMBER && batch.producerId() == transactionManager.producerIdAndEpoch().producerId) - log.error("The broker received an out of order sequence number for correlation id {}, topic-partition " + - "{} at offset {}. This indicates data loss on the broker, and should be investigated.", - correlationId, batch.topicPartition, response.baseOffset); // tell the user the result of their request failBatch(batch, response, exception); this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount); @@ -543,12 +538,6 @@ public class Sender implements Runnable { } else { completeBatch(batch, response); - if (transactionManager != null && transactionManager.producerIdAndEpoch().producerId == batch.producerId() - && transactionManager.producerIdAndEpoch().epoch == batch.producerEpoch()) { - transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount); - log.debug("Incremented sequence number for topic-partition {} to {}", batch.topicPartition, - transactionManager.sequenceNumber(batch.topicPartition)); - } } // Unmute the completed partition. @@ -562,18 +551,38 @@ public class Sender implements Runnable { } private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response) { + if (transactionManager != null && transactionManager.hasProducerIdAndEpoch(batch.producerId(), batch.producerEpoch())) { + transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount); + log.debug("Incremented sequence number for topic-partition {} to {}", batch.topicPartition, + transactionManager.sequenceNumber(batch.topicPartition)); + } + batch.done(response.baseOffset, response.logAppendTime, null); this.accumulator.deallocate(batch); } private void failBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, RuntimeException exception) { - if (transactionManager != null && !transactionManager.isTransactional() - && batch.producerId() == transactionManager.producerIdAndEpoch().producerId) { - // Reset the transaction state since we have hit an irrecoverable exception and cannot make any guarantees - // about the previously committed message. Note that this will discard the producer id and sequence - // numbers for all existing partitions. - transactionManager.resetProducerId(); + if (transactionManager != null) { + if (exception instanceof OutOfOrderSequenceException + && !transactionManager.isTransactional() + && transactionManager.hasProducerId(batch.producerId())) { + log.error("The broker received an out of order sequence number for topic-partition " + + "{} at offset {}. This indicates data loss on the broker, and should be investigated.", + batch.topicPartition, response.baseOffset); + + // Reset the transaction state since we have hit an irrecoverable exception and cannot make any guarantees + // about the previously committed message. Note that this will discard the producer id and sequence + // numbers for all existing partitions. + transactionManager.resetProducerId(); + } else if (exception instanceof ClusterAuthorizationException + || exception instanceof TransactionalIdAuthorizationException + || exception instanceof ProducerFencedException) { + transactionManager.transitionToFatalError(exception); + } else if (transactionManager.isTransactional()) { + transactionManager.transitionToAbortableError(exception); + } } + batch.done(response.baseOffset, response.logAppendTime, exception); this.accumulator.deallocate(batch); } http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index d84a88e..d674697 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -23,7 +23,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.GroupAuthorizationException; -import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.AbstractResponse; @@ -77,7 +77,7 @@ public class TransactionManager { private Node consumerGroupCoordinator; private volatile State currentState = State.UNINITIALIZED; - private volatile Exception lastError = null; + private volatile RuntimeException lastError = null; private volatile ProducerIdAndEpoch producerIdAndEpoch; private enum State { @@ -87,32 +87,34 @@ public class TransactionManager { IN_TRANSACTION, COMMITTING_TRANSACTION, ABORTING_TRANSACTION, - FENCED, - ERROR; + ABORTABLE_ERROR, + FATAL_ERROR; private boolean isTransitionValid(State source, State target) { switch (target) { case INITIALIZING: - return source == UNINITIALIZED || source == ERROR; + return source == UNINITIALIZED; case READY: return source == INITIALIZING || source == COMMITTING_TRANSACTION - || source == ABORTING_TRANSACTION || source == ERROR; + || source == ABORTING_TRANSACTION || source == ABORTABLE_ERROR; case IN_TRANSACTION: return source == READY; case COMMITTING_TRANSACTION: return source == IN_TRANSACTION; case ABORTING_TRANSACTION: - return source == IN_TRANSACTION || source == ERROR; + return source == IN_TRANSACTION || source == ABORTABLE_ERROR; + case ABORTABLE_ERROR: + return source == IN_TRANSACTION || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION; + case FATAL_ERROR: default: - // We can transition to FENCED or ERROR unconditionally. - // FENCED is never a valid starting state for any transition. So the only option is to close the + // We can transition to FATAL_ERROR unconditionally. + // FATAL_ERROR is never a valid starting state for any transition. So the only option is to close the // producer or do purely non transactional requests. return true; } } } - // We use the priority to determine the order in which requests need to be sent out. For instance, if we have // a pending FindCoordinator request, that must always go first. Next, If we need a producer id, that must go second. // The endTxn request must always go last. @@ -149,7 +151,7 @@ public class TransactionManager { } TransactionManager() { - this("", 0); + this(null, 0); } public synchronized TransactionalRequestResult initializeTransactions() { @@ -178,8 +180,8 @@ public class TransactionManager { public synchronized TransactionalRequestResult beginAbortingTransaction() { ensureTransactional(); - if (isFenced()) - throw Errors.INVALID_PRODUCER_EPOCH.exception(); + if (currentState != State.ABORTABLE_ERROR) + maybeFailWithError(); transitionTo(State.ABORTING_TRANSACTION); return beginCompletingTransaction(false); } @@ -213,12 +215,16 @@ public class TransactionManager { } public synchronized void maybeAddPartitionToTransaction(TopicPartition topicPartition) { - if (!isInTransaction() || partitionsInTransaction.contains(topicPartition)) + if (!isInTransaction()) + throw new IllegalArgumentException("Cannot add partitions to a transaction in state " + currentState); + + if (partitionsInTransaction.contains(topicPartition)) return; + newPartitionsToBeAddedToTransaction.add(topicPartition); } - public Exception lastError() { + public RuntimeException lastError() { return lastError; } @@ -231,11 +237,7 @@ public class TransactionManager { } public boolean isTransactional() { - return transactionalId != null && !transactionalId.isEmpty(); - } - - public boolean isFenced() { - return currentState == State.FENCED; + return transactionalId != null; } public boolean isCompletingTransaction() { @@ -247,31 +249,15 @@ public class TransactionManager { } public boolean isInErrorState() { - return currentState == State.ERROR || currentState == State.FENCED; - } - - public synchronized void setError(Exception exception) { - if (exception instanceof ProducerFencedException) - transitionTo(State.FENCED, exception); - else - transitionTo(State.ERROR, exception); - } - - boolean maybeTerminateRequestWithError(TxnRequestHandler requestHandler) { - if (isInErrorState() && requestHandler.isEndTxn()) { - // We shouldn't terminate abort requests from error states. - EndTxnHandler endTxnHandler = (EndTxnHandler) requestHandler; - if (endTxnHandler.builder.result() == TransactionResult.ABORT) - return false; - String errorMessage = "Cannot commit transaction because at least one previous transactional request " + - "was not completed successfully."; - if (lastError != null) - requestHandler.fatal(new KafkaException(errorMessage, lastError)); - else - requestHandler.fatal(new KafkaException(errorMessage)); - return true; - } - return false; + return currentState == State.ABORTABLE_ERROR || currentState == State.FATAL_ERROR; + } + + public synchronized void transitionToAbortableError(RuntimeException exception) { + transitionTo(State.ABORTABLE_ERROR, exception); + } + + public synchronized void transitionToFatalError(RuntimeException exception) { + transitionTo(State.FATAL_ERROR, exception); } /** @@ -284,6 +270,15 @@ public class TransactionManager { return producerIdAndEpoch; } + boolean hasProducerId(long producerId) { + return producerIdAndEpoch.producerId == producerId; + } + + boolean hasProducerIdAndEpoch(long producerId, short producerEpoch) { + ProducerIdAndEpoch idAndEpoch = this.producerIdAndEpoch; + return idAndEpoch.producerId == producerId && idAndEpoch.epoch == producerEpoch; + } + /** * Set the producer id and epoch atomically. */ @@ -337,26 +332,26 @@ public class TransactionManager { sequenceNumbers.put(topicPartition, currentSequenceNumber); } - boolean hasPendingTransactionalRequests() { - return !(pendingRequests.isEmpty() && newPartitionsToBeAddedToTransaction.isEmpty()); - } - - TxnRequestHandler nextRequestHandler() { - if (!hasPendingTransactionalRequests()) - return null; - + synchronized TxnRequestHandler nextRequestHandler() { if (!newPartitionsToBeAddedToTransaction.isEmpty()) pendingRequests.add(addPartitionsToTransactionHandler()); - return pendingRequests.poll(); + TxnRequestHandler nextRequestHandler = pendingRequests.poll(); + if (nextRequestHandler != null && maybeTerminateRequestWithError(nextRequestHandler)) { + log.trace("TransactionalId: {} -- Not sending transactional request {} because we are in an error state", + transactionalId, nextRequestHandler.requestBuilder()); + return null; + } + + return nextRequestHandler; } - void retry(TxnRequestHandler request) { + synchronized void retry(TxnRequestHandler request) { request.setRetry(); pendingRequests.add(request); } - void reenqueue(TxnRequestHandler request) { + synchronized void reenqueue(TxnRequestHandler request) { pendingRequests.add(request); } @@ -406,15 +401,21 @@ public class TransactionManager { transitionTo(target, null); } - private synchronized void transitionTo(State target, Exception error) { - if (currentState.isTransitionValid(currentState, target)) { - currentState = target; - if (target == State.ERROR && error != null) - lastError = error; - } else { + private synchronized void transitionTo(State target, RuntimeException error) { + if (!currentState.isTransitionValid(currentState, target)) throw new KafkaException("Invalid transition attempted from state " + currentState.name() + " to state " + target.name()); + + if (target == State.FATAL_ERROR || target == State.ABORTABLE_ERROR) { + if (error == null) + throw new IllegalArgumentException("Cannot transition to " + target + " with an null exception"); + lastError = error; + } else { + lastError = null; } + + log.debug("TransactionalId {} -- Transition from state {} to {}", transactionalId, currentState, target); + currentState = target; } private void ensureTransactional() { @@ -423,15 +424,23 @@ public class TransactionManager { } private void maybeFailWithError() { - if (isFenced()) - throw Errors.INVALID_PRODUCER_EPOCH.exception(); + if (isInErrorState()) + throw new KafkaException("Cannot execute transactional method because we are in an error state", lastError); + } + + private boolean maybeTerminateRequestWithError(TxnRequestHandler requestHandler) { if (isInErrorState()) { - String errorMessage = "Cannot execute transactional method because we are in an error state."; - if (lastError != null) - throw new KafkaException(errorMessage, lastError); - else - throw new KafkaException(errorMessage); + if (requestHandler instanceof EndTxnHandler) { + // we allow abort requests to break out of the error state. The state and the last error + // will be cleared when the request returns + EndTxnHandler endTxnHandler = (EndTxnHandler) requestHandler; + if (endTxnHandler.builder.result() == TransactionResult.ABORT) + return false; + } + requestHandler.fail(lastError); + return true; } + return false; } private void lookupCoordinator(FindCoordinatorRequest.CoordinatorType type, String coordinatorKey) { @@ -443,12 +452,11 @@ public class TransactionManager { transactionCoordinator = null; break; default: - throw new IllegalStateException("Got an invalid coordinator type: " + type); + throw new IllegalStateException("Invalid coordinator type: " + type); } FindCoordinatorRequest.Builder builder = new FindCoordinatorRequest.Builder(type, coordinatorKey); - FindCoordinatorHandler request = new FindCoordinatorHandler(builder); - pendingRequests.add(request); + pendingRequests.add(new FindCoordinatorHandler(builder)); } private void completeTransaction() { @@ -473,9 +481,8 @@ public class TransactionManager { CommittedOffset committedOffset = new CommittedOffset(offsetAndMetadata.offset(), offsetAndMetadata.metadata()); pendingTxnOffsetCommits.put(entry.getKey(), committedOffset); } - TxnOffsetCommitRequest.Builder builder = new TxnOffsetCommitRequest.Builder(consumerGroupId, - producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, - pendingTxnOffsetCommits); + TxnOffsetCommitRequest.Builder builder = new TxnOffsetCommitRequest.Builder(transactionalId, consumerGroupId, + producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, pendingTxnOffsetCommits); return new TxnOffsetCommitHandler(result, builder); } @@ -491,19 +498,20 @@ public class TransactionManager { this(new TransactionalRequestResult()); } - void fatal(RuntimeException e) { + void fatalError(RuntimeException e) { + result.setError(e); + transitionToFatalError(e); + result.done(); + } + + void abortableError(RuntimeException e) { result.setError(e); - transitionTo(State.ERROR, e); + transitionToAbortableError(e); result.done(); } - void fenced() { - log.error("Producer has become invalid, which typically means another producer with the same " + - "transactional.id has been started: producerId: {}. epoch: {}.", - producerIdAndEpoch.producerId, producerIdAndEpoch.epoch); - result.setError(Errors.INVALID_PRODUCER_EPOCH.exception()); - lastError = Errors.INVALID_PRODUCER_EPOCH.exception(); - transitionTo(State.FENCED, Errors.INVALID_PRODUCER_EPOCH.exception()); + void fail(RuntimeException e) { + result.setError(e); result.done(); } @@ -516,19 +524,19 @@ public class TransactionManager { @SuppressWarnings("unchecked") public void onComplete(ClientResponse response) { if (response.requestHeader().correlationId() != inFlightRequestCorrelationId) { - fatal(new RuntimeException("Detected more than one in-flight transactional request.")); + fatalError(new RuntimeException("Detected more than one in-flight transactional request.")); } else { clearInFlightRequestCorrelationId(); if (response.wasDisconnected()) { log.trace("disconnected from " + response.destination() + ". Will retry."); reenqueue(); } else if (response.versionMismatch() != null) { - fatal(response.versionMismatch()); + fatalError(response.versionMismatch()); } else if (response.hasResponse()) { log.trace("Got transactional response for request:" + requestBuilder()); handleResponse(response.responseBody()); } else { - fatal(new KafkaException("Could not execute transactional request for unknown reasons")); + fatalError(new KafkaException("Could not execute transactional request for unknown reasons")); } } } @@ -585,6 +593,10 @@ public class TransactionManager { public void handleResponse(AbstractResponse response) { InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) response; Errors error = initProducerIdResponse.error(); + + log.debug("TransactionalId {} -- Received InitProducerId response with error {}", + transactionalId, error); + if (error == Errors.NONE) { ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(initProducerIdResponse.producerId(), initProducerIdResponse.epoch()); setProducerIdAndEpoch(producerIdAndEpoch); @@ -597,9 +609,9 @@ public class TransactionManager { } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) { reenqueue(); } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) { - fatal(error.exception()); + fatalError(error.exception()); } else { - fatal(new KafkaException("Unexpected error in InitProducerIdResponse; " + error.message())); + fatalError(new KafkaException("Unexpected error in InitProducerIdResponse; " + error.message())); } } } @@ -626,6 +638,11 @@ public class TransactionManager { AddPartitionsToTxnResponse addPartitionsToTxnResponse = (AddPartitionsToTxnResponse) response; Map<TopicPartition, Errors> errors = addPartitionsToTxnResponse.errors(); boolean hasPartitionErrors = false; + Set<String> unauthorizedTopics = new HashSet<>(); + + log.debug("TransactionalId {} -- Received AddPartitionsToTxn response with errors {}", + transactionalId, errors); + for (TopicPartition topicPartition : pendingPartitionsToBeAddedToTransaction) { final Errors error = errors.get(topicPartition); if (error == Errors.NONE || error == null) { @@ -640,23 +657,28 @@ public class TransactionManager { reenqueue(); return; } else if (error == Errors.INVALID_PRODUCER_EPOCH) { - fenced(); + fatalError(error.exception()); return; } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) { - fatal(error.exception()); + fatalError(error.exception()); return; } else if (error == Errors.INVALID_PRODUCER_ID_MAPPING || error == Errors.INVALID_TXN_STATE) { - fatal(new KafkaException(error.exception())); + fatalError(new KafkaException(error.exception())); return; + } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { + unauthorizedTopics.add(topicPartition.topic()); } else { - log.error("Could not add partitions to transaction due to partition error. partition={}, error={}", topicPartition, error); + log.error("TransactionalId: {} -- Could not add partition {} due to unexpected error {}", + transactionalId, topicPartition, error); hasPartitionErrors = true; } } - if (hasPartitionErrors) { - fatal(new KafkaException("Could not add partitions to transaction due to partition level errors")); + if (!unauthorizedTopics.isEmpty()) { + abortableError(new TopicAuthorizationException(unauthorizedTopics)); + } else if (hasPartitionErrors) { + abortableError(new KafkaException("Could not add partitions to transaction due to partition level errors")); } else { partitionsInTransaction.addAll(pendingPartitionsToBeAddedToTransaction); pendingPartitionsToBeAddedToTransaction.clear(); @@ -695,7 +717,12 @@ public class TransactionManager { @Override public void handleResponse(AbstractResponse response) { FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) response; - if (findCoordinatorResponse.error() == Errors.NONE) { + Errors error = findCoordinatorResponse.error(); + + log.debug("TransactionalId {} -- Received FindCoordinator response with error {}", + transactionalId, error); + + if (error == Errors.NONE) { Node node = findCoordinatorResponse.node(); switch (builder.coordinatorType()) { case GROUP: @@ -705,12 +732,14 @@ public class TransactionManager { transactionCoordinator = node; } result.done(); - } else if (findCoordinatorResponse.error() == Errors.COORDINATOR_NOT_AVAILABLE) { + } else if (error == Errors.COORDINATOR_NOT_AVAILABLE) { reenqueue(); + } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) { + fatalError(error.exception()); } else if (findCoordinatorResponse.error() == Errors.GROUP_AUTHORIZATION_FAILED) { - fatal(new GroupAuthorizationException("Not authorized to commit offsets " + builder.coordinatorKey())); + abortableError(new GroupAuthorizationException(builder.coordinatorKey())); } else { - fatal(new KafkaException(String.format("Could not find a coordinator with type %s with key %s due to" + + fatalError(new KafkaException(String.format("Could not find a coordinator with type %s with key %s due to" + "unexpected error: %s", builder.coordinatorType(), builder.coordinatorKey(), findCoordinatorResponse.error().message()))); } @@ -743,6 +772,10 @@ public class TransactionManager { public void handleResponse(AbstractResponse response) { EndTxnResponse endTxnResponse = (EndTxnResponse) response; Errors error = endTxnResponse.error(); + + log.debug("TransactionalId {} -- Received EndTxn response with error {}", + transactionalId, error); + if (error == Errors.NONE) { completeTransaction(); result.done(); @@ -752,11 +785,13 @@ public class TransactionManager { } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) { reenqueue(); } else if (error == Errors.INVALID_PRODUCER_EPOCH) { - fenced(); + fatalError(error.exception()); } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) { - fatal(error.exception()); + fatalError(error.exception()); + } else if (error == Errors.INVALID_TXN_STATE) { + fatalError(error.exception()); } else { - fatal(new KafkaException("Unhandled error in EndTxnResponse: " + error.message())); + fatalError(new KafkaException("Unhandled error in EndTxnResponse: " + error.message())); } } } @@ -785,6 +820,10 @@ public class TransactionManager { public void handleResponse(AbstractResponse response) { AddOffsetsToTxnResponse addOffsetsToTxnResponse = (AddOffsetsToTxnResponse) response; Errors error = addOffsetsToTxnResponse.error(); + + log.debug("TransactionalId {} -- Received AddOffsetsToTxn response with error {}", + transactionalId, error); + if (error == Errors.NONE) { // note the result is not completed until the TxnOffsetCommit returns pendingRequests.add(txnOffsetCommitHandler(result, offsets, builder.consumerGroupId())); @@ -794,11 +833,13 @@ public class TransactionManager { } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) { reenqueue(); } else if (error == Errors.INVALID_PRODUCER_EPOCH) { - fenced(); + fatalError(error.exception()); } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) { - fatal(error.exception()); + fatalError(error.exception()); + } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { + abortableError(new GroupAuthorizationException(builder.consumerGroupId())); } else { - fatal(new KafkaException("Unexpected error in AddOffsetsToTxnResponse: " + error.message())); + fatalError(new KafkaException("Unexpected error in AddOffsetsToTxnResponse: " + error.message())); } } } @@ -837,7 +878,12 @@ public class TransactionManager { TxnOffsetCommitResponse txnOffsetCommitResponse = (TxnOffsetCommitResponse) response; boolean coordinatorReloaded = false; boolean hadFailure = false; - for (Map.Entry<TopicPartition, Errors> entry : txnOffsetCommitResponse.errors().entrySet()) { + Map<TopicPartition, Errors> errors = txnOffsetCommitResponse.errors(); + + log.debug("TransactionalId {} -- Received TxnOffsetCommit response with errors {}", + transactionalId, errors); + + for (Map.Entry<TopicPartition, Errors> entry : errors.entrySet()) { TopicPartition topicPartition = entry.getKey(); Errors error = entry.getValue(); if (error == Errors.NONE) { @@ -850,11 +896,17 @@ public class TransactionManager { } } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { hadFailure = true; + } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { + abortableError(new GroupAuthorizationException(builder.consumerGroupId())); + return; + } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) { + fatalError(error.exception()); + return; } else if (error == Errors.INVALID_PRODUCER_EPOCH) { - fenced(); + fatalError(error.exception()); return; } else { - fatal(new KafkaException("Unexpected error in TxnOffsetCommitResponse: " + error.message())); + fatalError(new KafkaException("Unexpected error in TxnOffsetCommitResponse: " + error.message())); return; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/common/errors/ProducerIdAuthorizationException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ProducerIdAuthorizationException.java b/clients/src/main/java/org/apache/kafka/common/errors/ProducerIdAuthorizationException.java deleted file mode 100644 index 2da9158..0000000 --- a/clients/src/main/java/org/apache/kafka/common/errors/ProducerIdAuthorizationException.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.common.errors; - -public class ProducerIdAuthorizationException extends ApiException { - public ProducerIdAuthorizationException(final String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/common/errors/TransactionalIdAuthorizationException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/TransactionalIdAuthorizationException.java b/clients/src/main/java/org/apache/kafka/common/errors/TransactionalIdAuthorizationException.java index 9bf1fbb..3f85513 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/TransactionalIdAuthorizationException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/TransactionalIdAuthorizationException.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.common.errors; -public class TransactionalIdAuthorizationException extends ApiException { +public class TransactionalIdAuthorizationException extends AuthorizationException { public TransactionalIdAuthorizationException(final String message) { super(message); } http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index f94fb4d..9444eb5 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -57,7 +57,6 @@ import org.apache.kafka.common.errors.OffsetOutOfRangeException; import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.PolicyViolationException; import org.apache.kafka.common.errors.ProducerFencedException; -import org.apache.kafka.common.errors.ProducerIdAuthorizationException; import org.apache.kafka.common.errors.RebalanceInProgressException; import org.apache.kafka.common.errors.RecordBatchTooLargeException; import org.apache.kafka.common.errors.RecordTooLargeException; @@ -483,21 +482,13 @@ public enum Errors { return new TransactionalIdAuthorizationException(message); } }), - PRODUCER_ID_AUTHORIZATION_FAILED(54, "Producer is not authorized to use producer Ids, " + - "which is required to write idempotent data.", - new ApiExceptionBuilder() { - @Override - public ApiException build(String message) { - return new ProducerIdAuthorizationException(message); - } - }), - SECURITY_DISABLED(55, "Security features are disabled.", new ApiExceptionBuilder() { + SECURITY_DISABLED(54, "Security features are disabled.", new ApiExceptionBuilder() { @Override public ApiException build(String message) { return new SecurityDisabledException(message); } }), - BROKER_AUTHORIZATION_FAILED(56, "Broker authorization failed", new ApiExceptionBuilder() { + BROKER_AUTHORIZATION_FAILED(55, "Broker authorization failed", new ApiExceptionBuilder() { @Override public ApiException build(String message) { return new BrokerAuthorizationException(message); http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index d5ce469..91391e9 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -1516,6 +1516,9 @@ public class Protocol { ); public static final Schema TXN_OFFSET_COMMIT_REQUEST_V0 = new Schema( + new Field("transactional_id", + STRING, + "The transactional id corresponding to the transaction."), new Field("consumer_group_id", STRING, "Id of the associated consumer group to commit offsets for."), http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java index 4bf8b3e..3339470 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java @@ -50,6 +50,17 @@ public class AddOffsetsToTxnRequest extends AbstractRequest { public AddOffsetsToTxnRequest build(short version) { return new AddOffsetsToTxnRequest(version, transactionalId, producerId, producerEpoch, consumerGroupId); } + + @Override + public String toString() { + StringBuilder bld = new StringBuilder(); + bld.append("(transactionalId=").append(transactionalId). + append(", producerId=").append(producerId). + append(", producerEpoch=").append(producerEpoch). + append(", consumerGroupId=").append(consumerGroupId). + append(")"); + return bld.toString(); + } } private final String transactionalId; http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java index 8b3a589..754f242 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java @@ -30,10 +30,11 @@ public class AddOffsetsToTxnResponse extends AbstractResponse { // NotCoordinator // CoordinatorNotAvailable // CoordinatorLoadInProgress - // InvalidPidMapping + // InvalidProducerIdMapping + // InvalidProducerEpoch // InvalidTxnState // GroupAuthorizationFailed - // InvalidProducerEpoch + // TransactionalIdAuthorizationFailed private final Errors error; private final int throttleTimeMs; http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java index 148ebec..e24fa5a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java @@ -54,6 +54,17 @@ public class AddPartitionsToTxnRequest extends AbstractRequest { public AddPartitionsToTxnRequest build(short version) { return new AddPartitionsToTxnRequest(version, transactionalId, producerId, producerEpoch, partitions); } + + @Override + public String toString() { + StringBuilder bld = new StringBuilder(); + bld.append("(transactionalId=").append(transactionalId). + append(", producerId=").append(producerId). + append(", producerEpoch=").append(producerEpoch). + append(", partitions=").append(partitions). + append(")"); + return bld.toString(); + } } private final String transactionalId; http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java index 697142b..39172ee 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java @@ -43,11 +43,12 @@ public class AddPartitionsToTxnResponse extends AbstractResponse { // CoordinatorNotAvailable // CoordinatorLoadInProgress // InvalidTxnState - // InvalidPidMapping + // InvalidProducerIdMapping // TopicAuthorizationFailed // InvalidProducerEpoch // UnknownTopicOrPartition // TopicAuthorizationFailed + // TransactionalIdAuthorizationFailed private final Map<TopicPartition, Errors> errors; public AddPartitionsToTxnResponse(int throttleTimeMs, Map<TopicPartition, Errors> errors) { http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java index 77ec137..b9f052c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java @@ -50,6 +50,17 @@ public class EndTxnRequest extends AbstractRequest { public EndTxnRequest build(short version) { return new EndTxnRequest(version, transactionalId, producerId, producerEpoch, result); } + + @Override + public String toString() { + StringBuilder bld = new StringBuilder(); + bld.append("(transactionalId=").append(transactionalId). + append(", producerId=").append(producerId). + append(", producerEpoch=").append(producerEpoch). + append(", result=").append(result). + append(")"); + return bld.toString(); + } } private final String transactionalId; http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java index 99e4e8c..17cf68d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java @@ -31,8 +31,9 @@ public class EndTxnResponse extends AbstractResponse { // CoordinatorNotAvailable // CoordinatorLoadInProgress // InvalidTxnState - // InvalidPidMapping + // InvalidProducerIdMapping // InvalidProducerEpoch + // TransactionalIdAuthorizationFailed private final Errors error; private final int throttleTimeMs; http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java index 7c8a6e5..96e1cdf 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java @@ -24,11 +24,13 @@ import org.apache.kafka.common.record.RecordBatch; import java.nio.ByteBuffer; public class InitProducerIdResponse extends AbstractResponse { - /** - * Possible Error codes: - * OK - * - */ + // Possible error codes: + // NotCoordinator + // CoordinatorNotAvailable + // CoordinatorLoadInProgress + // TransactionalIdAuthorizationFailed + // ClusterAuthorizationFailed + private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; private static final String PRODUCER_ID_KEY_NAME = "producer_id"; private static final String EPOCH_KEY_NAME = "producer_epoch"; http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index 3377f91..3d696c1 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -228,13 +228,14 @@ public class ProduceRequest extends AbstractRequest { } @Override - public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { + public ProduceResponse getErrorResponse(int throttleTimeMs, Throwable e) { /* In case the producer doesn't actually want any response */ if (acks == 0) return null; + Errors error = Errors.forException(e); Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<>(); - ProduceResponse.PartitionResponse partitionResponse = new ProduceResponse.PartitionResponse(Errors.forException(e)); + ProduceResponse.PartitionResponse partitionResponse = new ProduceResponse.PartitionResponse(error); for (TopicPartition tp : partitions()) responseMap.put(tp, partitionResponse); http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 42ae434..55332f6 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -61,6 +61,9 @@ public class ProduceResponse extends AbstractResponse { * INVALID_REQUIRED_ACKS (21) * TOPIC_AUTHORIZATION_FAILED (29) * UNSUPPORTED_FOR_MESSAGE_FORMAT (43) + * INVALID_PRODUCER_EPOCH (47) + * CLUSTER_AUTHORIZATION_FAILED (31) + * TRANSACTIONAL_ID_AUTHORIZATION_FAILED (53) */ private static final String BASE_OFFSET_KEY_NAME = "base_offset"; http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java index f5334f2..68fa3d2 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.Map; public class TxnOffsetCommitRequest extends AbstractRequest { + private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id"; private static final String CONSUMER_GROUP_ID_KEY_NAME = "consumer_group_id"; private static final String PRODUCER_ID_KEY_NAME = "producer_id"; private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch"; @@ -38,14 +39,16 @@ public class TxnOffsetCommitRequest extends AbstractRequest { private static final String METADATA_KEY_NAME = "metadata"; public static class Builder extends AbstractRequest.Builder<TxnOffsetCommitRequest> { + private final String transactionalId; private final String consumerGroupId; private final long producerId; private final short producerEpoch; private final Map<TopicPartition, CommittedOffset> offsets; - public Builder(String consumerGroupId, long producerId, short producerEpoch, + public Builder(String transactionalId, String consumerGroupId, long producerId, short producerEpoch, Map<TopicPartition, CommittedOffset> offsets) { super(ApiKeys.TXN_OFFSET_COMMIT); + this.transactionalId = transactionalId; this.consumerGroupId = consumerGroupId; this.producerId = producerId; this.producerEpoch = producerEpoch; @@ -58,18 +61,32 @@ public class TxnOffsetCommitRequest extends AbstractRequest { @Override public TxnOffsetCommitRequest build(short version) { - return new TxnOffsetCommitRequest(version, consumerGroupId, producerId, producerEpoch, offsets); + return new TxnOffsetCommitRequest(version, transactionalId, consumerGroupId, producerId, producerEpoch, offsets); + } + + @Override + public String toString() { + StringBuilder bld = new StringBuilder(); + bld.append("(transactionalId=").append(transactionalId). + append(", producerId=").append(producerId). + append(", producerEpoch=").append(producerEpoch). + append(", consumerGroupId=").append(consumerGroupId). + append(", offsets=").append(offsets). + append(")"); + return bld.toString(); } } + private final String transactionalId; private final String consumerGroupId; private final long producerId; private final short producerEpoch; private final Map<TopicPartition, CommittedOffset> offsets; - public TxnOffsetCommitRequest(short version, String consumerGroupId, long producerId, short producerEpoch, - Map<TopicPartition, CommittedOffset> offsets) { + public TxnOffsetCommitRequest(short version, String transactionalId, String consumerGroupId, long producerId, + short producerEpoch, Map<TopicPartition, CommittedOffset> offsets) { super(version); + this.transactionalId = transactionalId; this.consumerGroupId = consumerGroupId; this.producerId = producerId; this.producerEpoch = producerEpoch; @@ -78,6 +95,7 @@ public class TxnOffsetCommitRequest extends AbstractRequest { public TxnOffsetCommitRequest(Struct struct, short version) { super(version); + this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME); this.consumerGroupId = struct.getString(CONSUMER_GROUP_ID_KEY_NAME); this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME); this.producerEpoch = struct.getShort(PRODUCER_EPOCH_KEY_NAME); @@ -98,6 +116,10 @@ public class TxnOffsetCommitRequest extends AbstractRequest { this.offsets = offsets; } + public String transactionalId() { + return transactionalId; + } + public String consumerGroupId() { return consumerGroupId; } @@ -117,6 +139,7 @@ public class TxnOffsetCommitRequest extends AbstractRequest { @Override protected Struct toStruct() { Struct struct = new Struct(ApiKeys.TXN_OFFSET_COMMIT.requestSchema(version())); + struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId); struct.set(CONSUMER_GROUP_ID_KEY_NAME, consumerGroupId); struct.set(PRODUCER_ID_KEY_NAME, producerId); struct.set(PRODUCER_EPOCH_KEY_NAME, producerEpoch); http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java index 37b9a50..a62568f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java @@ -42,6 +42,7 @@ public class TxnOffsetCommitResponse extends AbstractResponse { // OffsetMetadataTooLarge // GroupAuthorizationFailed // InvalidCommitOffsetSize + // TransactionalIdAuthorizationFailed private final Map<TopicPartition, Errors> errors; private final int throttleTimeMs; http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java index 06f6662..ddddc42 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java @@ -48,6 +48,7 @@ public class WriteTxnMarkersResponse extends AbstractResponse { // InvalidRequiredAcks // TransactionCoordinatorFenced // RequestTimeout + // ClusterAuthorizationFailed private final Map<Long, Map<TopicPartition, Errors>> errors; http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java index 06ace63..0e3441f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java @@ -47,7 +47,8 @@ public class AclOperationTest { new AclOperationTestInfo(AclOperation.DESCRIBE, 8, "describe", false), new AclOperationTestInfo(AclOperation.CLUSTER_ACTION, 9, "cluster_action", false), new AclOperationTestInfo(AclOperation.DESCRIBE_CONFIGS, 10, "describe_configs", false), - new AclOperationTestInfo(AclOperation.ALTER_CONFIGS, 11, "alter_configs", false) + new AclOperationTestInfo(AclOperation.ALTER_CONFIGS, 11, "alter_configs", false), + new AclOperationTestInfo(AclOperation.IDEMPOTENT_WRITE, 12, "idempotent_write", false) }; @Test
