Repository: kafka Updated Branches: refs/heads/trunk a281fe17f -> d66e7af65
KAFKA-5129; Add ACL checks for Transactional APIs Add ACL checks for Transactional APIs Author: Damian Guy <[email protected]> Reviewers: Apurva Mehta <[email protected]>, Ismael Juma <[email protected]>, Jason Gustafson <[email protected]> Closes #2979 from dguy/kafka-5129 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d66e7af6 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d66e7af6 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d66e7af6 Branch: refs/heads/trunk Commit: d66e7af6526f208900a5d6cb588cf47058800804 Parents: a281fe1 Author: Damian Guy <[email protected]> Authored: Tue May 16 09:57:15 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Tue May 16 09:57:15 2017 -0700 ---------------------------------------------------------------------- .../kafka/clients/producer/KafkaProducer.java | 25 +- .../producer/internals/ProducerBatch.java | 4 + .../producer/internals/RecordAccumulator.java | 8 +- .../clients/producer/internals/Sender.java | 16 +- .../producer/internals/TransactionManager.java | 72 +++-- .../ProducerIdAuthorizationException.java | 23 ++ .../TransactionalIdAuthorizationException.java | 23 ++ .../apache/kafka/common/protocol/Errors.java | 21 +- .../apache/kafka/common/protocol/Protocol.java | 10 +- .../common/record/MemoryRecordsBuilder.java | 17 +- .../requests/AddPartitionsToTxnRequest.java | 7 +- .../requests/AddPartitionsToTxnResponse.java | 56 +++- .../requests/FindCoordinatorResponse.java | 10 + .../kafka/common/requests/ProduceRequest.java | 12 + .../internals/TransactionManagerTest.java | 51 +++- .../common/record/MemoryRecordsBuilderTest.java | 66 +++++ .../common/requests/ProduceRequestTest.java | 95 ++++++ .../common/requests/RequestResponseTest.java | 2 +- .../scala/kafka/security/auth/Resource.scala | 2 + .../kafka/security/auth/ResourceType.scala | 11 +- .../src/main/scala/kafka/server/KafkaApis.scala | 290 +++++++++++-------- .../kafka/api/AuthorizerIntegrationTest.scala | 113 +++++++- 22 files changed, 752 insertions(+), 182 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 05edf65..aeef92f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -694,19 +694,18 @@ public class KafkaProducer<K, V> implements Producer<K, V> { if (transactionManager.isFenced()) throw new ProducerFencedException("The current producer has been fenced off by a another producer using the same transactional id."); - if (transactionManager.isInTransaction()) { - if (transactionManager.isInErrorState()) { - String errorMessage = - "Cannot perform a transactional send because at least one previous transactional request has failed with errors."; - Exception lastError = transactionManager.lastError(); - if (lastError != null) - throw new KafkaException(errorMessage, lastError); - else - throw new KafkaException(errorMessage); - } - if (transactionManager.isCompletingTransaction()) - throw new IllegalStateException("Cannot call send while a commit or abort is in progress."); + if (transactionManager.isInErrorState()) { + String errorMessage = + "Cannot perform send because at least one previous transactional or idempotent request has failed with errors."; + Exception lastError = transactionManager.lastError(); + if (lastError != null) + throw new KafkaException(errorMessage, lastError); + else + throw new KafkaException(errorMessage); } + if (transactionManager.isCompletingTransaction()) + throw new IllegalStateException("Cannot call send while a commit or abort is in progress."); + } private void setReadOnly(Headers headers) { @@ -1032,7 +1031,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { this.userCallback.onCompletion(metadata, exception); if (exception != null && transactionManager != null) - transactionManager.maybeSetError(exception); + transactionManager.setError(exception); } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index 3c5965a..1c078c8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -247,6 +247,10 @@ public final class ProducerBatch { recordsBuilder.close(); } + public void abort() { + recordsBuilder.abort(); + } + public boolean isClosed() { return recordsBuilder.isClosed(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index d53c19d..5b8fb96 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -580,14 +580,18 @@ public final class RecordAccumulator { * Go through incomplete batches and abort them. */ private void abortBatches() { + abortBatches(new IllegalStateException("Producer is closed forcefully.")); + } + + void abortBatches(final RuntimeException reason) { for (ProducerBatch batch : incomplete.all()) { Deque<ProducerBatch> dq = getDeque(batch.topicPartition); // Close the batch before aborting synchronized (dq) { - batch.close(); + batch.abort(); dq.remove(batch); } - batch.done(-1L, RecordBatch.NO_TIMESTAMP, new IllegalStateException("Producer is closed forcefully.")); + batch.done(-1L, RecordBatch.NO_TIMESTAMP, reason); deallocate(batch); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index da09a1a..209a979 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -199,6 +199,13 @@ public class Sender implements Runnable { Cluster cluster = metadata.fetch(); maybeWaitForProducerId(); + if (transactionManager != null && transactionManager.isInErrorState()) { + final KafkaException exception = transactionManager.lastError() instanceof KafkaException + ? (KafkaException) transactionManager.lastError() + : new KafkaException(transactionManager.lastError()); + this.accumulator.abortBatches(exception); + return Long.MAX_VALUE; + } // get the list of partitions with data ready to send RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); @@ -376,13 +383,19 @@ public class Sender implements Runnable { if (transactionManager == null || transactionManager.isTransactional()) return; - while (!transactionManager.hasProducerId()) { + while (!transactionManager.hasProducerId() && !transactionManager.isInErrorState()) { try { Node node = awaitLeastLoadedNodeReady(requestTimeout); if (node != null) { ClientResponse response = sendAndAwaitInitPidRequest(node); + if (response.hasResponse() && (response.responseBody() instanceof InitProducerIdResponse)) { InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) response.responseBody(); + Exception exception = initProducerIdResponse.error().exception(); + if (exception != null && !(exception instanceof RetriableException)) { + transactionManager.setError(exception); + return; + } ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch( initProducerIdResponse.producerId(), initProducerIdResponse.epoch()); transactionManager.setProducerIdAndEpoch(producerIdAndEpoch); @@ -401,6 +414,7 @@ public class Sender implements Runnable { time.sleep(retryBackoffMs); metadata.requestUpdate(); } + } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index f3ed252..551a75a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -249,15 +249,11 @@ public class TransactionManager { return currentState == State.ERROR; } - public synchronized boolean maybeSetError(Exception exception) { - if (isTransactional() && isInTransaction()) { - if (exception instanceof ProducerFencedException) - transitionTo(State.FENCED, exception); - else - transitionTo(State.ERROR, exception); - return true; - } - return false; + public synchronized void setError(Exception exception) { + if (exception instanceof ProducerFencedException) + transitionTo(State.FENCED, exception); + else + transitionTo(State.ERROR, exception); } /** @@ -579,6 +575,8 @@ public class TransactionManager { reenqueue(); } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) { reenqueue(); + } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) { + fatal(error.exception()); } else { fatal(new KafkaException("Unexpected error in InitProducerIdResponse; " + error.message())); } @@ -605,27 +603,43 @@ public class TransactionManager { @Override public void handleResponse(AbstractResponse response) { AddPartitionsToTxnResponse addPartitionsToTxnResponse = (AddPartitionsToTxnResponse) response; - Errors error = addPartitionsToTxnResponse.error(); - if (error == Errors.NONE) { + Map<TopicPartition, Errors> errors = addPartitionsToTxnResponse.errors(); + boolean hasPartitionErrors = false; + for (TopicPartition topicPartition : pendingPartitionsToBeAddedToTransaction) { + final Errors error = errors.get(topicPartition); + if (error == Errors.NONE || error == null) { + continue; + } + + if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) { + lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); + reenqueue(); + return; + } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) { + reenqueue(); + return; + } else if (error == Errors.INVALID_PRODUCER_EPOCH) { + fenced(); + return; + } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) { + fatal(error.exception()); + return; + } else if (error == Errors.INVALID_PRODUCER_ID_MAPPING + || error == Errors.INVALID_TXN_STATE) { + fatal(new KafkaException(error.exception())); + return; + } else { + log.error("Could not add partitions to transaction due to partition error. partition={}, error={}", topicPartition, error); + hasPartitionErrors = true; + } + } + + if (hasPartitionErrors) { + fatal(new KafkaException("Could not add partitions to transaction due to partition level errors")); + } else { partitionsInTransaction.addAll(pendingPartitionsToBeAddedToTransaction); pendingPartitionsToBeAddedToTransaction.clear(); result.done(); - } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) { - lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); - reenqueue(); - } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) { - reenqueue(); - } else if (error == Errors.INVALID_PRODUCER_ID_MAPPING) { - fatal(new KafkaException(error.exception())); - } else if (error == Errors.INVALID_TXN_STATE) { - fatal(new KafkaException(error.exception())); - } else if (error == Errors.INVALID_PRODUCER_EPOCH) { - fenced(); - } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { - fatal(error.exception()); - } else { - fatal(new KafkaException("Could not add partitions to transaction due to unknown error: " + - error.message())); } } } @@ -718,6 +732,8 @@ public class TransactionManager { reenqueue(); } else if (error == Errors.INVALID_PRODUCER_EPOCH) { fenced(); + } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) { + fatal(error.exception()); } else { fatal(new KafkaException("Unhandled error in EndTxnResponse: " + error.message())); } @@ -758,6 +774,8 @@ public class TransactionManager { reenqueue(); } else if (error == Errors.INVALID_PRODUCER_EPOCH) { fenced(); + } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) { + fatal(error.exception()); } else { fatal(new KafkaException("Unexpected error in AddOffsetsToTxnResponse: " + error.message())); } http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/main/java/org/apache/kafka/common/errors/ProducerIdAuthorizationException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ProducerIdAuthorizationException.java b/clients/src/main/java/org/apache/kafka/common/errors/ProducerIdAuthorizationException.java new file mode 100644 index 0000000..2da9158 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/ProducerIdAuthorizationException.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class ProducerIdAuthorizationException extends ApiException { + public ProducerIdAuthorizationException(final String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/main/java/org/apache/kafka/common/errors/TransactionalIdAuthorizationException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/TransactionalIdAuthorizationException.java b/clients/src/main/java/org/apache/kafka/common/errors/TransactionalIdAuthorizationException.java new file mode 100644 index 0000000..9bf1fbb --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/TransactionalIdAuthorizationException.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class TransactionalIdAuthorizationException extends ApiException { + public TransactionalIdAuthorizationException(final String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index a0922cf..7780fbe 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -56,6 +56,7 @@ import org.apache.kafka.common.errors.OffsetOutOfRangeException; import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.PolicyViolationException; import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.ProducerIdAuthorizationException; import org.apache.kafka.common.errors.RebalanceInProgressException; import org.apache.kafka.common.errors.RecordBatchTooLargeException; import org.apache.kafka.common.errors.RecordTooLargeException; @@ -64,6 +65,7 @@ import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.TopicExistsException; +import org.apache.kafka.common.errors.TransactionalIdAuthorizationException; import org.apache.kafka.common.errors.TransactionCoordinatorFencedException; import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.errors.UnknownServerException; @@ -470,7 +472,24 @@ public enum Errors { public ApiException build(String message) { return new TransactionCoordinatorFencedException(message); } - }); + }), + TRANSACTIONAL_ID_AUTHORIZATION_FAILED(53, "Transactional Id authorization failed", + new ApiExceptionBuilder() { + @Override + public ApiException build(String message) { + return new TransactionalIdAuthorizationException(message); + } + }), + PRODUCER_ID_AUTHORIZATION_FAILED(54, "Producer is not authorized to use producer Ids, " + + "which is required to write idempotent data.", + new ApiExceptionBuilder() { + @Override + public ApiException build(String message) { + return new ProducerIdAuthorizationException(message); + } + }); + + private interface ApiExceptionBuilder { ApiException build(String message); http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 5e05738..08aef4b 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -1399,9 +1399,13 @@ public class Protocol { ); public static final Schema ADD_PARTITIONS_TO_TXN_RESPONSE_V0 = new Schema( newThrottleTimeField(), - new Field("error_code", - INT16, - "An integer error code.") + new Field("errors", + new ArrayOf(new Schema(new Field("topic", STRING), + new Field("partition_errors", + new ArrayOf(new Schema(new Field("partition", + INT32), + new Field("error_code", + INT16))))))) ); public static final Schema[] ADD_PARTITIONS_TO_TXN_REQUEST = new Schema[] {ADD_PARTITIONS_TO_TXN_REQUEST_V0}; http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index e52df76..6f90fac 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -84,6 +84,7 @@ public class MemoryRecordsBuilder { private Long baseTimestamp = null; private MemoryRecords builtRecords; + private boolean aborted = false; /** * Construct a new builder. @@ -175,6 +176,9 @@ public class MemoryRecordsBuilder { * @return The built log buffer */ public MemoryRecords build() { + if (aborted) { + throw new KafkaException("Attempting to build an aborted record batch"); + } close(); return builtRecords; } @@ -246,7 +250,16 @@ public class MemoryRecordsBuilder { } } + public void abort() { + closeForRecordAppends(); + buffer().position(initPos); + aborted = true; + } + public void close() { + if (aborted) + throw new IllegalStateException("Cannot close MemoryRecordsBuilder as it has already been aborted"); + if (builtRecords != null) return; @@ -605,13 +618,13 @@ public class MemoryRecordsBuilder { private void ensureOpenForRecordAppend() { if (appendStreamIsClosed) throw new IllegalStateException("Tried to append a record, but MemoryRecordsBuilder is closed for record appends"); - if (isClosed()) - throw new IllegalStateException("Tried to append a record, but MemoryRecordsBuilder is closed"); } private void ensureOpenForRecordBatchWrite() { if (isClosed()) throw new IllegalStateException("Tried to write record batch header, but MemoryRecordsBuilder is closed"); + if (aborted) + throw new IllegalStateException("Tried to write record batch header, but MemoryRecordsBuilder is aborted"); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java index 69ae25c..148ebec 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.utils.CollectionUtils; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -126,7 +127,11 @@ public class AddPartitionsToTxnRequest extends AbstractRequest { @Override public AddPartitionsToTxnResponse getErrorResponse(int throttleTimeMs, Throwable e) { - return new AddPartitionsToTxnResponse(throttleTimeMs, Errors.forException(e)); + final HashMap<TopicPartition, Errors> errors = new HashMap<>(); + for (TopicPartition partition : partitions) { + errors.put(partition, Errors.forException(e)); + } + return new AddPartitionsToTxnResponse(throttleTimeMs, errors); } public static AddPartitionsToTxnRequest parse(ByteBuffer buffer, short version) { http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java index 893fcda..697142b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java @@ -16,15 +16,27 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.utils.CollectionUtils; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; public class AddPartitionsToTxnResponse extends AbstractResponse { private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; private static final String ERROR_CODE_KEY_NAME = "error_code"; + private static final String ERRORS_KEY_NAME = "errors"; + private static final String TOPIC_NAME = "topic"; + private static final String PARTITION = "partition"; + private static final String PARTITION_ERRORS = "partition_errors"; + + private final int throttleTimeMs; // Possible error codes: // NotCoordinator @@ -34,33 +46,59 @@ public class AddPartitionsToTxnResponse extends AbstractResponse { // InvalidPidMapping // TopicAuthorizationFailed // InvalidProducerEpoch + // UnknownTopicOrPartition + // TopicAuthorizationFailed + private final Map<TopicPartition, Errors> errors; - private final Errors error; - private final int throttleTimeMs; - - public AddPartitionsToTxnResponse(int throttleTimeMs, Errors error) { + public AddPartitionsToTxnResponse(int throttleTimeMs, Map<TopicPartition, Errors> errors) { this.throttleTimeMs = throttleTimeMs; - this.error = error; + this.errors = errors; } public AddPartitionsToTxnResponse(Struct struct) { this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME); - this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); + errors = new HashMap<>(); + for (Object topic : struct.getArray(ERRORS_KEY_NAME)) { + Struct topicStruct = (Struct) topic; + final String topicName = topicStruct.getString(TOPIC_NAME); + for (Object partition : topicStruct.getArray(PARTITION_ERRORS)) { + Struct partitionStruct = (Struct) partition; + TopicPartition topicPartition = new TopicPartition(topicName, partitionStruct.getInt(PARTITION)); + errors.put(topicPartition, Errors.forCode(partitionStruct.getShort(ERROR_CODE_KEY_NAME))); + } + } } public int throttleTimeMs() { return throttleTimeMs; } - public Errors error() { - return error; + public Map<TopicPartition, Errors> errors() { + return errors; } @Override protected Struct toStruct(short version) { Struct struct = new Struct(ApiKeys.ADD_PARTITIONS_TO_TXN.responseSchema(version)); struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); - struct.set(ERROR_CODE_KEY_NAME, error.code()); + + Map<String, Map<Integer, Errors>> errorsByTopic = CollectionUtils.groupDataByTopic(errors); + List<Struct> topics = new ArrayList<>(errorsByTopic.size()); + for (Map.Entry<String, Map<Integer, Errors>> entry : errorsByTopic.entrySet()) { + Struct topicErrorCodes = struct.instance(ERRORS_KEY_NAME); + topicErrorCodes.set(TOPIC_NAME, entry.getKey()); + List<Struct> partitionArray = new ArrayList<>(); + for (Map.Entry<Integer, Errors> partitionErrors : entry.getValue().entrySet()) { + final Struct partitionData = topicErrorCodes.instance(PARTITION_ERRORS) + .set(PARTITION, partitionErrors.getKey()) + .set(ERROR_CODE_KEY_NAME, partitionErrors.getValue().code()); + partitionArray.add(partitionData); + + } + topicErrorCodes.set(PARTITION_ERRORS, partitionArray.toArray()); + topics.add(topicErrorCodes); + } + struct.set(ERRORS_KEY_NAME, topics.toArray()); return struct; } http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java index b558b62..e7df8e8 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java @@ -106,4 +106,14 @@ public class FindCoordinatorResponse extends AbstractResponse { public static FindCoordinatorResponse parse(ByteBuffer buffer, short version) { return new FindCoordinatorResponse(ApiKeys.FIND_COORDINATOR.responseSchema(version).read(buffer)); } + + @Override + public String toString() { + return "FindCoordinatorResponse{" + + "throttleTimeMs=" + throttleTimeMs + + ", errorMessage='" + errorMessage + '\'' + + ", error=" + error + + ", node=" + node + + '}'; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index b63f6c2..3377f91 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -109,6 +109,8 @@ public class ProduceRequest extends AbstractRequest { // put in the purgatory (due to client throttling, it can take a while before the response is sent). // Care should be taken in methods that use this field. private volatile Map<TopicPartition, MemoryRecords> partitionRecords; + private boolean transactional = false; + private boolean idempotent = false; private ProduceRequest(short version, short acks, int timeout, Map<TopicPartition, MemoryRecords> partitionRecords, String transactionalId) { super(version); @@ -165,6 +167,8 @@ public class ProduceRequest extends AbstractRequest { if (iterator.hasNext()) throw new InvalidRecordException("Produce requests with version " + version + " are only allowed to " + "contain exactly one record batch"); + idempotent = entry.hasProducerId(); + transactional = entry.isTransactional(); } // Note that we do not do similar validation for older versions to ensure compatibility with @@ -264,6 +268,14 @@ public class ProduceRequest extends AbstractRequest { return transactionalId; } + public boolean isTransactional() { + return transactional; + } + + public boolean isIdempotent() { + return idempotent; + } + /** * Returns the partition records or throws IllegalStateException if clearPartitionRecords() has been invoked. */ http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index c0acfec..4db0452 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -499,6 +499,53 @@ public class TransactionManagerTest { assertTrue(transactionManager.isReadyForTransaction()); // make sure we are ready for a transaction now. } + @Test + public void shouldNotAddPartitionsToTransactionWhenTopicAuthorizationFailed() throws Exception { + verifyAddPartitionsFailsWithPartitionLevelError(Errors.TOPIC_AUTHORIZATION_FAILED); + } + + @Test + public void shouldNotAddPartitionsToTransactionWhenUnknownTopicOrPartition() throws Exception { + verifyAddPartitionsFailsWithPartitionLevelError(Errors.UNKNOWN_TOPIC_OR_PARTITION); + } + + private void verifyAddPartitionsFailsWithPartitionLevelError(final Errors error) throws InterruptedException { + client.setNode(brokerNode); + transactionManager.initializeTransactions(); + prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); + + sender.run(time.milliseconds()); // find coordinator + sender.run(time.milliseconds()); + + final long pid = 1L; + final short epoch = 1; + prepareInitPidResponse(Errors.NONE, false, pid, epoch); + + sender.run(time.milliseconds()); // get pid. + + assertTrue(transactionManager.hasProducerId()); + transactionManager.beginTransaction(); + transactionManager.maybeAddPartitionToTransaction(tp0); + + Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), + "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future; + assertFalse(responseFuture.isDone()); + prepareAddPartitionsToTxnPartitionErrorResponse(tp0, error); + sender.run(time.milliseconds()); // attempt send addPartitions. + assertTrue(transactionManager.isInErrorState()); + assertFalse(transactionManager.transactionContainsPartition(tp0)); + } + + private void prepareAddPartitionsToTxnPartitionErrorResponse(final TopicPartition tp0, final Errors error) { + client.prepareResponse(new MockClient.RequestMatcher() { + @Override + public boolean matches(AbstractRequest body) { + assertTrue(body instanceof AddPartitionsToTxnRequest); + return true; + } + }, new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp0, error))); + } + private static class MockCallback implements Callback { private final TransactionManager transactionManager; public MockCallback(TransactionManager transactionManager) { @@ -507,7 +554,7 @@ public class TransactionManagerTest { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null && transactionManager != null) { - transactionManager.maybeSetError(exception); + transactionManager.setError(exception); } } } @@ -570,7 +617,7 @@ public class TransactionManagerTest { assertEquals(transactionalId, addPartitionsToTxnRequest.transactionalId()); return true; } - }, new AddPartitionsToTxnResponse(0, error)); + }, new AddPartitionsToTxnResponse(0, Collections.singletonMap(topicPartition, error))); } private void prepareEndTxnResponse(Errors error, final TransactionResult result, final long pid, final short epoch) { http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java index a300a65..c08a2f0 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.TestUtils; import org.junit.Test; @@ -31,6 +32,7 @@ import java.util.List; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; @RunWith(value = Parameterized.class) public class MemoryRecordsBuilderTest { @@ -471,6 +473,70 @@ public class MemoryRecordsBuilderTest { assertEquals(ByteBuffer.wrap("3".getBytes()), logRecords.get(2).key()); } + @Test + public void shouldThrowKafkaExceptionOnBuildWhenAborted() throws Exception { + ByteBuffer buffer = ByteBuffer.allocate(128); + buffer.position(bufferOffset); + + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType, + TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, + false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + builder.abort(); + try { + builder.build(); + fail("Should have thrown KafkaException"); + } catch (KafkaException e) { + // ok + } + } + + @Test + public void shouldResetBufferToInitialPositionOnAbort() throws Exception { + ByteBuffer buffer = ByteBuffer.allocate(128); + buffer.position(bufferOffset); + + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType, + TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, + false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + builder.append(0L, "a".getBytes(), "1".getBytes()); + builder.abort(); + assertEquals(bufferOffset, builder.buffer().position()); + } + + @Test + public void shouldThrowIllegalStateExceptionOnCloseWhenAborted() throws Exception { + ByteBuffer buffer = ByteBuffer.allocate(128); + buffer.position(bufferOffset); + + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType, + TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, + false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + builder.abort(); + try { + builder.close(); + fail("Should have thrown IllegalStateException"); + } catch (IllegalStateException e) { + // ok + } + } + + @Test + public void shouldThrowIllegalStateExceptionOnAppendWhenAborted() throws Exception { + ByteBuffer buffer = ByteBuffer.allocate(128); + buffer.position(bufferOffset); + + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType, + TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, + false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + builder.abort(); + try { + builder.append(0L, "a".getBytes(), "1".getBytes()); + fail("Should have thrown IllegalStateException"); + } catch (IllegalStateException e) { + // ok + } + } + @Parameterized.Parameters public static Collection<Object[]> data() { List<Object[]> values = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java new file mode 100644 index 0000000..0e8f382 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.SimpleRecord; +import org.junit.Test; + +import java.util.Collections; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ProduceRequestTest { + + private final SimpleRecord simpleRecord = new SimpleRecord(System.currentTimeMillis(), + "key".getBytes(), + "value".getBytes()); + + @Test + public void shouldBeFlaggedAsTransactionalWhenTransactionalRecords() throws Exception { + final MemoryRecords memoryRecords = MemoryRecords.withTransactionalRecords(0, + CompressionType.NONE, + 1L, + (short) 1, + 1, + 1, + simpleRecord); + final ProduceRequest request = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, + (short) -1, + 10, + Collections.singletonMap( + new TopicPartition("topic", 1), memoryRecords)).build(); + assertTrue(request.isTransactional()); + } + + @Test + public void shouldNotBeFlaggedAsTransactionalWhenNoRecords() throws Exception { + final ProduceRequest request = createNonIdempotentNonTransactionalRecords(); + assertFalse(request.isTransactional()); + } + + @Test + public void shouldNotBeFlaggedAsIdempotentWhenRecordsNotIdempotent() throws Exception { + final ProduceRequest request = createNonIdempotentNonTransactionalRecords(); + assertFalse(request.isTransactional()); + } + + @Test + public void shouldBeFlaggedAsIdempotentWhenIdempotentRecords() throws Exception { + final MemoryRecords memoryRecords = MemoryRecords.withIdempotentRecords(1, + CompressionType.NONE, + 1L, + (short) 1, + 1, + 1, + simpleRecord); + + final ProduceRequest request = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, + (short) -1, + 10, + Collections.singletonMap( + new TopicPartition("topic", 1), memoryRecords)).build(); + assertTrue(request.isIdempotent()); + + } + + private ProduceRequest createNonIdempotentNonTransactionalRecords() { + final MemoryRecords memoryRecords = MemoryRecords.withRecords(CompressionType.NONE, + simpleRecord); + return new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, + (short) -1, + 10, + Collections.singletonMap( + new TopicPartition("topic", 1), memoryRecords)).build(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 1cfd6a3..4946246 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -914,7 +914,7 @@ public class RequestResponseTest { } private AddPartitionsToTxnResponse createAddPartitionsToTxnResponse() { - return new AddPartitionsToTxnResponse(0, Errors.NONE); + return new AddPartitionsToTxnResponse(0, Collections.singletonMap(new TopicPartition("t", 0), Errors.NONE)); } private AddOffsetsToTxnRequest createAddOffsetsToTxnRequest() { http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/core/src/main/scala/kafka/security/auth/Resource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/security/auth/Resource.scala b/core/src/main/scala/kafka/security/auth/Resource.scala index 17d09ce..a0ed9f9 100644 --- a/core/src/main/scala/kafka/security/auth/Resource.scala +++ b/core/src/main/scala/kafka/security/auth/Resource.scala @@ -20,6 +20,8 @@ object Resource { val Separator = ":" val ClusterResourceName = "kafka-cluster" val ClusterResource = new Resource(Cluster, Resource.ClusterResourceName) + val ProducerIdResourceName = "producer-id" + val ProducerIdResource = new Resource(Cluster, Resource.ProducerIdResourceName) val WildCardResource = "*" def fromString(str: String): Resource = { http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/core/src/main/scala/kafka/security/auth/ResourceType.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/security/auth/ResourceType.scala b/core/src/main/scala/kafka/security/auth/ResourceType.scala index 9630c82..e58d8ec 100644 --- a/core/src/main/scala/kafka/security/auth/ResourceType.scala +++ b/core/src/main/scala/kafka/security/auth/ResourceType.scala @@ -41,6 +41,15 @@ case object Group extends ResourceType { val error = Errors.GROUP_AUTHORIZATION_FAILED } +case object ProducerTransactionalId extends ResourceType { + val name = "ProducerTransactionalId" + val error = Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED +} + +case object ProducerIdResource extends ResourceType { + val name = "ProducerIdResource" + val error = Errors.PRODUCER_ID_AUTHORIZATION_FAILED +} object ResourceType { @@ -49,5 +58,5 @@ object ResourceType { rType.getOrElse(throw new KafkaException(resourceType + " not a valid resourceType name. The valid names are " + values.mkString(","))) } - def values: Seq[ResourceType] = List(Cluster, Topic, Group) + def values: Seq[ResourceType] = List(Cluster, Topic, Group, ProducerTransactionalId, ProducerIdResource) } http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index c746365..5e9cd9f 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -35,9 +35,9 @@ import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinat import kafka.log.{Log, LogManager, TimestampOffset} import kafka.network.{RequestChannel, RequestOrResponseSend} import kafka.network.RequestChannel.{Response, Session} -import kafka.security.auth.{Authorizer, ClusterAction, Create, Delete, Describe, Group, Operation, Read, Resource, Topic, Write} +import kafka.security.auth._ import kafka.utils.{Exit, Logging, ZKGroupTopicDirs, ZkUtils} -import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidRequestException, NotLeaderForPartitionException, TopicExistsException, UnknownTopicOrPartitionException, UnsupportedForMessageFormatException} +import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.FatalExitError import org.apache.kafka.common.internals.Topic.{isInternal, GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME} import org.apache.kafka.common.metrics.Metrics @@ -364,88 +364,95 @@ class KafkaApis(val requestChannel: RequestChannel, val produceRequest = request.body[ProduceRequest] val numBytesAppended = request.header.toStruct.sizeOf + request.bodyAndSize.size - val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = - produceRequest.partitionRecordsOrFail.asScala.partition { case (tp, _) => - authorize(request.session, Describe, new Resource(Topic, tp.topic)) && metadataCache.contains(tp.topic) - } + if (produceRequest.isTransactional && !authorize(request.session, Write, new Resource(ProducerTransactionalId, produceRequest.transactionalId()))) + sendResponseMaybeThrottle(request, (throttleMs: Int) => produceRequest.getErrorResponse(throttleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception())) + else if (produceRequest.isIdempotent && !authorize(request.session, Write, Resource.ProducerIdResource)) + sendResponseMaybeThrottle(request, (throttleMs: Int) => produceRequest.getErrorResponse(throttleMs, Errors.PRODUCER_ID_AUTHORIZATION_FAILED.exception())) + else { + val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = + produceRequest.partitionRecordsOrFail.asScala.partition { case (tp, _) => + authorize(request.session, Describe, new Resource(Topic, tp.topic)) && metadataCache.contains(tp.topic) + } - val (authorizedRequestInfo, unauthorizedForWriteRequestInfo) = existingAndAuthorizedForDescribeTopics.partition { - case (tp, _) => authorize(request.session, Write, new Resource(Topic, tp.topic)) - } + val (authorizedRequestInfo, unauthorizedForWriteRequestInfo) = existingAndAuthorizedForDescribeTopics.partition { + case (tp, _) => authorize(request.session, Write, new Resource(Topic, tp.topic)) + } - // the callback for sending a produce response - def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) { + // the callback for sending a produce response + def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) { - val mergedResponseStatus = responseStatus ++ - unauthorizedForWriteRequestInfo.mapValues(_ => new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)) ++ - nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)) + val mergedResponseStatus = responseStatus ++ + unauthorizedForWriteRequestInfo.mapValues(_ => new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)) ++ + nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)) - var errorInResponse = false + var errorInResponse = false - mergedResponseStatus.foreach { case (topicPartition, status) => - if (status.error != Errors.NONE) { - errorInResponse = true - debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( - request.header.correlationId, - request.header.clientId, - topicPartition, - status.error.exceptionName)) + mergedResponseStatus.foreach { case (topicPartition, status) => + if (status.error != Errors.NONE) { + errorInResponse = true + debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( + request.header.correlationId, + request.header.clientId, + topicPartition, + status.error.exceptionName)) + } } - } - def produceResponseCallback(bandwidthThrottleTimeMs: Int) { - if (produceRequest.acks == 0) { - // no operation needed if producer request.required.acks = 0; however, if there is any error in handling - // the request, since no response is expected by the producer, the server will close socket server so that - // the producer client will know that some error has happened and will refresh its metadata - if (errorInResponse) { - val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) => - topicPartition -> status.error.exceptionName - }.mkString(", ") - info( - s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " + - s"from client id ${request.header.clientId} with ack=0\n" + - s"Topic and partition to exceptions: $exceptionsSummary" - ) - requestChannel.closeConnection(request.processor, request) + def produceResponseCallback(bandwidthThrottleTimeMs: Int) { + if (produceRequest.acks == 0) { + // no operation needed if producer request.required.acks = 0; however, if there is any error in handling + // the request, since no response is expected by the producer, the server will close socket server so that + // the producer client will know that some error has happened and will refresh its metadata + if (errorInResponse) { + val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) => + topicPartition -> status.error.exceptionName + }.mkString(", ") + info( + s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " + + s"from client id ${request.header.clientId} with ack=0\n" + + s"Topic and partition to exceptions: $exceptionsSummary" + ) + requestChannel.closeConnection(request.processor, request) + } else { + requestChannel.noOperation(request.processor, request) + } } else { - requestChannel.noOperation(request.processor, request) - } - } else { - def createResponseCallback(requestThrottleTimeMs: Int): AbstractResponse = { - new ProduceResponse(mergedResponseStatus.asJava, bandwidthThrottleTimeMs + requestThrottleTimeMs) + def createResponseCallback(requestThrottleTimeMs: Int): AbstractResponse = { + new ProduceResponse(mergedResponseStatus.asJava, bandwidthThrottleTimeMs + requestThrottleTimeMs) + } + + sendResponseMaybeThrottle(request, createResponseCallback) } - sendResponseMaybeThrottle(request, createResponseCallback) } - } - // When this callback is triggered, the remote API call has completed - request.apiRemoteCompleteTimeNanos = time.nanoseconds - - quotas.produce.recordAndMaybeThrottle( - request.session.sanitizedUser, - request.header.clientId, - numBytesAppended, - produceResponseCallback) - } + // When this callback is triggered, the remote API call has completed + request.apiRemoteCompleteTimeNanos = time.nanoseconds - if (authorizedRequestInfo.isEmpty) - sendResponseCallback(Map.empty) - else { - val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId + quotas.produce.recordAndMaybeThrottle( + request.session.sanitizedUser, + request.header.clientId, + numBytesAppended, + produceResponseCallback) + } - // call the replica manager to append messages to the replicas - replicaManager.appendRecords( - produceRequest.timeout.toLong, - produceRequest.acks, - internalTopicsAllowed, - isFromClient = true, - authorizedRequestInfo, - sendResponseCallback) + if (authorizedRequestInfo.isEmpty) + sendResponseCallback(Map.empty) + else { + val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId + + // call the replica manager to append messages to the replicas + replicaManager.appendRecords( + produceRequest.timeout.toLong, + produceRequest.acks, + internalTopicsAllowed, + isFromClient = true, + authorizedRequestInfo, + sendResponseCallback) - // if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected; - // hence we clear its data here inorder to let GC re-claim its memory since it is already appended to log - produceRequest.clearPartitionRecords() + // if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected; + // hence we clear its data here inorder to let GC re-claim its memory since it is already appended to log + produceRequest.clearPartitionRecords() + } } } @@ -1391,35 +1398,45 @@ class KafkaApis(val requestChannel: RequestChannel, val initProducerIdRequest = request.body[InitProducerIdRequest] val transactionalId = initProducerIdRequest.transactionalId - // Send response callback - def sendResponseCallback(result: InitProducerIdResult): Unit = { - def createResponse(throttleTimeMs: Int): AbstractResponse = { - val responseBody = new InitProducerIdResponse(throttleTimeMs, result.error, result.producerId, result.producerEpoch) - trace(s"Completed $transactionalId's InitProducerIdRequest with result $result from client ${request.header.clientId}.") - responseBody + + if (!authorize(request.session, Write, Resource.ProducerIdResource)) { + sendResponseMaybeThrottle(request, (throttleTime: Int) => new InitProducerIdResponse(throttleTime, Errors.PRODUCER_ID_AUTHORIZATION_FAILED)) + } else if (transactionalId == null || authorize(request.session, Write, new Resource(ProducerTransactionalId, transactionalId))) { + // Send response callback + def sendResponseCallback(result: InitProducerIdResult): Unit = { + def createResponse(throttleTimeMs: Int): AbstractResponse = { + val responseBody = new InitProducerIdResponse(throttleTimeMs, result.error, result.producerId, result.producerEpoch) + trace(s"Completed $transactionalId's InitProducerIdRequest with result $result from client ${request.header.clientId}.") + responseBody + } + sendResponseMaybeThrottle(request, createResponse) } - sendResponseMaybeThrottle(request, createResponse) - } - txnCoordinator.handleInitProducerId(transactionalId, initProducerIdRequest.transactionTimeoutMs, sendResponseCallback) + txnCoordinator.handleInitProducerId(transactionalId, initProducerIdRequest.transactionTimeoutMs, sendResponseCallback) + }else + sendResponseMaybeThrottle(request, (throttleTimeMs: Int) => new InitProducerIdResponse(throttleTimeMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED)) } def handleEndTxnRequest(request: RequestChannel.Request): Unit = { val endTxnRequest = request.body[EndTxnRequest] - - def sendResponseCallback(error: Errors) { - def createResponse(throttleTimeMs: Int): AbstractResponse = { - val responseBody = new EndTxnResponse(throttleTimeMs, error) - trace(s"Completed ${endTxnRequest.transactionalId}'s EndTxnRequest with command: ${endTxnRequest.command}, errors: $error from client ${request.header.clientId}.") - responseBody + val transactionalId = endTxnRequest.transactionalId + + if(authorize(request.session, Write, new Resource(ProducerTransactionalId, transactionalId))) { + def sendResponseCallback(error: Errors) { + def createResponse(throttleTimeMs: Int): AbstractResponse = { + val responseBody = new EndTxnResponse(throttleTimeMs, error) + trace(s"Completed ${endTxnRequest.transactionalId}'s EndTxnRequest with command: ${endTxnRequest.command}, errors: $error from client ${request.header.clientId}.") + responseBody + } + sendResponseMaybeThrottle(request, createResponse) } - sendResponseMaybeThrottle(request, createResponse) - } - txnCoordinator.handleEndTransaction(endTxnRequest.transactionalId(), - endTxnRequest.producerId(), - endTxnRequest.producerEpoch(), - endTxnRequest.command(), - sendResponseCallback) + txnCoordinator.handleEndTransaction(endTxnRequest.transactionalId, + endTxnRequest.producerId, + endTxnRequest.producerEpoch, + endTxnRequest.command, + sendResponseCallback) + } else + sendResponseMaybeThrottle(request, (throttleTimeMs: Int) => new EndTxnResponse(throttleTimeMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED)) } def handleWriteTxnMarkersRequest(request: RequestChannel.Request): Unit = { @@ -1484,21 +1501,53 @@ class KafkaApis(val requestChannel: RequestChannel, val transactionalId = addPartitionsToTxnRequest.transactionalId val partitionsToAdd = addPartitionsToTxnRequest.partitions - // Send response callback - def sendResponseCallback(error: Errors): Unit = { - def createResponse(throttleTimeMs: Int): AbstractResponse = { - val responseBody: AddPartitionsToTxnResponse = new AddPartitionsToTxnResponse(throttleTimeMs, error) - trace(s"Completed $transactionalId's AddPartitionsToTxnRequest with partitions $partitionsToAdd: errors: $error from client ${request.header.clientId}") - responseBody + if(!authorize(request.session, Write, new Resource(ProducerTransactionalId, transactionalId))) + sendResponseMaybeThrottle(request, (throttleTimeMs: Int) => addPartitionsToTxnRequest.getErrorResponse(1, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception())) + else { + val internalTopics = partitionsToAdd.asScala.filter {tp => org.apache.kafka.common.internals.Topic.isInternal(tp.topic())} + + val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = + partitionsToAdd.asScala.partition { tp => + authorize(request.session, Describe, new Resource(Topic, tp.topic)) && metadataCache.contains(tp.topic) + } + + val (authorizedRequestInfo, unauthorizedForWriteRequestInfo) = existingAndAuthorizedForDescribeTopics.partition { + tp => authorize(request.session, Write, new Resource(Topic, tp.topic)) + } + + if (nonExistingOrUnauthorizedForDescribeTopics.nonEmpty + || unauthorizedForWriteRequestInfo.nonEmpty + || internalTopics.nonEmpty) { + + // Only send back error responses for the partitions that failed. If there are any partition failures + // then the entire request fails + val partitionErrors = unauthorizedForWriteRequestInfo.map { tp => (tp, Errors.TOPIC_AUTHORIZATION_FAILED) }.toMap ++ + nonExistingOrUnauthorizedForDescribeTopics.map { tp => (tp, Errors.UNKNOWN_TOPIC_OR_PARTITION) }.toMap ++ + internalTopics.map { tp => (tp, Errors.TOPIC_AUTHORIZATION_FAILED) } + + sendResponseMaybeThrottle(request, (throttleTimeMs: Int) => new AddPartitionsToTxnResponse(throttleTimeMs, partitionErrors.asJava)) + } else { + // Send response callback + def sendResponseCallback(error: Errors): Unit = { + def createResponse(throttleTimeMs: Int): AbstractResponse = { + val responseBody: AddPartitionsToTxnResponse = new AddPartitionsToTxnResponse(throttleTimeMs, + partitionsToAdd.asScala.map{tp => (tp, error)}.toMap.asJava) + trace(s"Completed $transactionalId's AddPartitionsToTxnRequest with partitions $partitionsToAdd: errors: $error from client ${request.header.clientId}") + responseBody + } + + sendResponseMaybeThrottle(request, createResponse) + } + + txnCoordinator.handleAddPartitionsToTransaction(transactionalId, + addPartitionsToTxnRequest.producerId(), + addPartitionsToTxnRequest.producerEpoch(), + partitionsToAdd.asScala.toSet, + sendResponseCallback) } - sendResponseMaybeThrottle(request, createResponse) } - txnCoordinator.handleAddPartitionsToTransaction(transactionalId, - addPartitionsToTxnRequest.producerId(), - addPartitionsToTxnRequest.producerEpoch(), - partitionsToAdd.asScala.toSet, - sendResponseCallback) + } def handleAddOffsetsToTxnRequest(request: RequestChannel.Request): Unit = { @@ -1507,22 +1556,29 @@ class KafkaApis(val requestChannel: RequestChannel, val groupId = addOffsetsToTxnRequest.consumerGroupId val offsetTopicPartition = new TopicPartition(GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId)) - // Send response callback - def sendResponseCallback(error: Errors): Unit = { - def createResponse(throttleTimeMs: Int): AbstractResponse = { - val responseBody: AddOffsetsToTxnResponse = new AddOffsetsToTxnResponse(throttleTimeMs, error) - trace(s"Completed $transactionalId's AddOffsetsToTxnRequest for group $groupId as on partition $offsetTopicPartition: errors: $error from client ${request.header.clientId}") - responseBody + if (!authorize(request.session, Write, new Resource(ProducerTransactionalId, transactionalId))) + sendResponseMaybeThrottle(request, (throttleTimeMs: Int) => new AddOffsetsToTxnResponse(throttleTimeMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED)) + else if (!authorize(request.session, Read, new Resource(Group, groupId))) + sendResponseMaybeThrottle(request, (throttleTimeMs: Int) => new AddOffsetsToTxnResponse(throttleTimeMs, Errors.GROUP_AUTHORIZATION_FAILED)) + else { + // Send response callback + def sendResponseCallback(error: Errors): Unit = { + def createResponse(throttleTimeMs: Int): AbstractResponse = { + val responseBody: AddOffsetsToTxnResponse = new AddOffsetsToTxnResponse(throttleTimeMs, error) + trace(s"Completed $transactionalId's AddOffsetsToTxnRequest for group $groupId as on partition $offsetTopicPartition: errors: $error from client ${request.header.clientId}") + responseBody + } + sendResponseMaybeThrottle(request, createResponse) + } + + txnCoordinator.handleAddPartitionsToTransaction(transactionalId, + addOffsetsToTxnRequest.producerId, + addOffsetsToTxnRequest.producerEpoch, + Set(offsetTopicPartition), + sendResponseCallback) } - sendResponseMaybeThrottle(request, createResponse) } - txnCoordinator.handleAddPartitionsToTransaction(transactionalId, - addOffsetsToTxnRequest.producerId(), - addOffsetsToTxnRequest.producerEpoch(), - Set[TopicPartition](offsetTopicPartition), - sendResponseCallback) - } def handleTxnOffsetCommitRequest(request: RequestChannel.Request): Unit = { val header = request.header http://git-wip-us.apache.org/repos/asf/kafka/blob/d66e7af6/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 52a90d8..9eb1275 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -24,7 +24,7 @@ import kafka.server.{BaseRequestTest, KafkaConfig} import kafka.utils.TestUtils import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener import org.apache.kafka.clients.consumer._ -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol} @@ -42,7 +42,7 @@ import org.apache.kafka.common.KafkaException import kafka.admin.AdminUtils import kafka.network.SocketServer import org.apache.kafka.common.network.ListenerName -import org.apache.kafka.common.record.{CompressionType, SimpleRecord, RecordBatch, MemoryRecords} +import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, SimpleRecord} class AuthorizerIntegrationTest extends BaseRequestTest { @@ -53,6 +53,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val topicPattern = "topic.*" val createTopic = "topic-new" val deleteTopic = "topic-delete" + val transactionalId = "transactional.id" val part = 0 val correlationId = 0 val clientId = "client-Id" @@ -62,6 +63,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val topicResource = new Resource(Topic, topic) val groupResource = new Resource(Group, group) val deleteTopicResource = new Resource(Topic, deleteTopic) + val producerTransactionalIdResource = new Resource(ProducerTransactionalId, transactionalId) val GroupReadAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read))) val ClusterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction))) @@ -70,9 +72,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val TopicWriteAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write))) val TopicDescribeAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe))) val TopicDeleteAcl = Map(deleteTopicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete))) + val producerTransactionalIdWriteAcl = Map(producerTransactionalIdResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write))) val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]() val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]() + var transactionalProducer: KafkaProducer[Array[Byte], Array[Byte]] = _ val producerCount = 1 val consumerCount = 2 @@ -83,6 +87,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest { properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName) properties.put(KafkaConfig.BrokerIdProp, brokerId.toString) properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1") + properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1") + properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1") + properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1") } val RequestKeyToResponseDeserializer: Map[ApiKeys, Class[_ <: Any]] = @@ -158,6 +165,15 @@ class AuthorizerIntegrationTest extends BaseRequestTest { producers += TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers), maxBlockMs = 3000, acks = 1) + + val transactionalProperties = new Properties() + transactionalProperties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") + transactionalProperties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId) + transactionalProducer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers), + retries = 3, + props = Some(transactionalProperties) + ) + for (_ <- 0 until consumerCount) consumers += TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT) @@ -810,6 +826,99 @@ class AuthorizerIntegrationTest extends BaseRequestTest { assertEquals(Errors.NONE, deleteResponse.errors.asScala.head._2) } + @Test(expected = classOf[TransactionalIdAuthorizationException]) + def shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessOnInitTransactions(): Unit = { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), Resource.ProducerIdResource) + transactionalProducer.initTransactions() + } + + @Test + def shouldInitTransactionsWhenAclSet(): Unit = { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), producerTransactionalIdResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), Resource.ProducerIdResource) + transactionalProducer.initTransactions() + } + + + @Test + def shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessDuringSend(): Unit = { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), producerTransactionalIdResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), Resource.ProducerIdResource) + transactionalProducer.initTransactions() + removeAllAcls() + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + try { + transactionalProducer.beginTransaction() + transactionalProducer.send(new ProducerRecord(tp.topic(), tp.partition(), "1".getBytes, "1".getBytes)).get + Assert.fail("expected TransactionalIdAuthorizationException") + } catch { + case e: ExecutionException => assertTrue(s"expected TransactionalIdAuthorizationException, but got ${e.getCause}", e.getCause.isInstanceOf[TransactionalIdAuthorizationException]) + } + } + + @Test + def shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessOnEndTransaction(): Unit = { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), producerTransactionalIdResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), Resource.ProducerIdResource) + transactionalProducer.initTransactions() + transactionalProducer.beginTransaction() + removeAllAcls() + try { + transactionalProducer.commitTransaction() + Assert.fail("expected TransactionalIdAuthorizationException") + } catch { + case _: TransactionalIdAuthorizationException => // ok + } + } + + @Test + def shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessOnSendOffsetsToTxn(): Unit = { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), producerTransactionalIdResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), Resource.ProducerIdResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), groupResource) + transactionalProducer.initTransactions() + transactionalProducer.beginTransaction() + removeAllAcls() + try { + val offsets: util.Map[TopicPartition, OffsetAndMetadata] = Map(new TopicPartition(topicAndPartition.topic, topicAndPartition.partition) -> new OffsetAndMetadata(1L)).asJava + transactionalProducer.sendOffsetsToTransaction(offsets, group) + Assert.fail("expected TransactionalIdAuthorizationException") + } catch { + case _: TransactionalIdAuthorizationException => // ok + } + } + + + @Test + def shouldThrowProducerIdAuthorizationExceptionWhenAclNotSet(): Unit = { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + val idempotentProperties = new Properties() + idempotentProperties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") + val idempotentProducer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers), + retries = 3, + props = Some(idempotentProperties) + ) + try { + idempotentProducer.send(new ProducerRecord(tp.topic(), tp.partition(), "1".getBytes, "1".getBytes)).get + Assert.fail("expected ProducerIdAuthorizationException") + } catch { + case e: ExecutionException => assertTrue(s"expected ProducerIdAuthorizationException, but got ${e.getCause}", e.getCause.isInstanceOf[ProducerIdAuthorizationException]) + } + } + + @Test + def shouldSendSuccessfullyWhenIdempotentAndHasCorrectACL(): Unit = { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), Resource.ProducerIdResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + val idempotentProperties = new Properties() + idempotentProperties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") + val idempotentProducer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers), + retries = 3, + props = Some(idempotentProperties) + ) + idempotentProducer.send(new ProducerRecord(tp.topic(), tp.partition(), "1".getBytes, "1".getBytes)).get + } + def removeAllAcls() = { servers.head.apis.authorizer.get.getAcls().keys.foreach { resource => servers.head.apis.authorizer.get.removeAcls(resource)
