http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index b0d354b..380685f 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -93,7 +93,7 @@ class KafkaApis(val requestChannel: RequestChannel, trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s". format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal)) ApiKeys.forId(request.requestId) match { - case ApiKeys.PRODUCE => handleProducerRequest(request) + case ApiKeys.PRODUCE => handleProduceRequest(request) case ApiKeys.FETCH => handleFetchRequest(request) case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request) case ApiKeys.METADATA => handleTopicMetadataRequest(request) @@ -368,97 +368,107 @@ class KafkaApis(val requestChannel: RequestChannel, /** * Handle a produce request */ - def handleProducerRequest(request: RequestChannel.Request) { + def handleProduceRequest(request: RequestChannel.Request) { val produceRequest = request.body[ProduceRequest] val numBytesAppended = request.header.toStruct.sizeOf + request.bodyAndSize.size - if (produceRequest.isTransactional && !authorize(request.session, Write, new Resource(ProducerTransactionalId, produceRequest.transactionalId()))) - sendResponseMaybeThrottle(request, (throttleMs: Int) => produceRequest.getErrorResponse(throttleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)) - else if (produceRequest.isIdempotent && !authorize(request.session, Write, Resource.ProducerIdResource)) - sendResponseMaybeThrottle(request, (throttleMs: Int) => produceRequest.getErrorResponse(throttleMs, Errors.PRODUCER_ID_AUTHORIZATION_FAILED.exception)) - else { - val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = - produceRequest.partitionRecordsOrFail.asScala.partition { case (tp, _) => - authorize(request.session, Describe, new Resource(Topic, tp.topic)) && metadataCache.contains(tp.topic) - } + def sendErrorResponse(error: Errors): Unit = { + sendResponseMaybeThrottle(request, requestThrottleMs => + produceRequest.getErrorResponse(requestThrottleMs, error.exception)) + } - val (authorizedRequestInfo, unauthorizedForWriteRequestInfo) = existingAndAuthorizedForDescribeTopics.partition { - case (tp, _) => authorize(request.session, Write, new Resource(Topic, tp.topic)) + if (produceRequest.isTransactional) { + if (!authorize(request.session, Write, new Resource(TransactionalId, produceRequest.transactionalId))) { + sendErrorResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) + return } + // Note that authorization to a transactionalId implies ProducerId authorization - // the callback for sending a produce response - def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) { + } else if (produceRequest.isIdempotent && !authorize(request.session, IdempotentWrite, Resource.ClusterResource)) { + sendErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED) + return + } + + val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = + produceRequest.partitionRecordsOrFail.asScala.partition { case (tp, _) => + authorize(request.session, Describe, new Resource(Topic, tp.topic)) && metadataCache.contains(tp.topic) + } - val mergedResponseStatus = responseStatus ++ - unauthorizedForWriteRequestInfo.mapValues(_ => new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)) ++ - nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)) + val (authorizedRequestInfo, unauthorizedForWriteRequestInfo) = existingAndAuthorizedForDescribeTopics.partition { + case (tp, _) => authorize(request.session, Write, new Resource(Topic, tp.topic)) + } - var errorInResponse = false + // the callback for sending a produce response + def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) { - mergedResponseStatus.foreach { case (topicPartition, status) => - if (status.error != Errors.NONE) { - errorInResponse = true - debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( - request.header.correlationId, - request.header.clientId, - topicPartition, - status.error.exceptionName)) - } + val mergedResponseStatus = responseStatus ++ + unauthorizedForWriteRequestInfo.mapValues(_ => new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)) ++ + nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)) + + var errorInResponse = false + + mergedResponseStatus.foreach { case (topicPartition, status) => + if (status.error != Errors.NONE) { + errorInResponse = true + debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( + request.header.correlationId, + request.header.clientId, + topicPartition, + status.error.exceptionName)) } + } - def produceResponseCallback(bandwidthThrottleTimeMs: Int) { - if (produceRequest.acks == 0) { - // no operation needed if producer request.required.acks = 0; however, if there is any error in handling - // the request, since no response is expected by the producer, the server will close socket server so that - // the producer client will know that some error has happened and will refresh its metadata - val action = - if (errorInResponse) { - val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) => - topicPartition -> status.error.exceptionName - }.mkString(", ") - info( - s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " + - s"from client id ${request.header.clientId} with ack=0\n" + - s"Topic and partition to exceptions: $exceptionsSummary" - ) - RequestChannel.CloseConnectionAction - } else RequestChannel.NoOpAction - sendResponseExemptThrottle(new RequestChannel.Response(request, None, action)) - } else { - sendResponseMaybeThrottle(request, requestThrottleMs => - new ProduceResponse(mergedResponseStatus.asJava, bandwidthThrottleTimeMs + requestThrottleMs) + def produceResponseCallback(bandwidthThrottleTimeMs: Int) { + if (produceRequest.acks == 0) { + // no operation needed if producer request.required.acks = 0; however, if there is any error in handling + // the request, since no response is expected by the producer, the server will close socket server so that + // the producer client will know that some error has happened and will refresh its metadata + val action = + if (errorInResponse) { + val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) => + topicPartition -> status.error.exceptionName + }.mkString(", ") + info( + s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " + + s"from client id ${request.header.clientId} with ack=0\n" + + s"Topic and partition to exceptions: $exceptionsSummary" ) - } + RequestChannel.CloseConnectionAction + } else RequestChannel.NoOpAction + sendResponseExemptThrottle(new RequestChannel.Response(request, None, action)) + } else { + sendResponseMaybeThrottle(request, requestThrottleMs => + new ProduceResponse(mergedResponseStatus.asJava, bandwidthThrottleTimeMs + requestThrottleMs)) } + } - // When this callback is triggered, the remote API call has completed - request.apiRemoteCompleteTimeNanos = time.nanoseconds + // When this callback is triggered, the remote API call has completed + request.apiRemoteCompleteTimeNanos = time.nanoseconds - quotas.produce.recordAndMaybeThrottle( - request.session.sanitizedUser, - request.header.clientId, - numBytesAppended, - produceResponseCallback) - } + quotas.produce.recordAndMaybeThrottle( + request.session.sanitizedUser, + request.header.clientId, + numBytesAppended, + produceResponseCallback) + } - if (authorizedRequestInfo.isEmpty) - sendResponseCallback(Map.empty) - else { - val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId - - // call the replica manager to append messages to the replicas - replicaManager.appendRecords( - timeout = produceRequest.timeout.toLong, - requiredAcks = produceRequest.acks, - internalTopicsAllowed = internalTopicsAllowed, - isFromClient = true, - entriesPerPartition = authorizedRequestInfo, - responseCallback = sendResponseCallback) - - // if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected; - // hence we clear its data here inorder to let GC re-claim its memory since it is already appended to log - produceRequest.clearPartitionRecords() - } + if (authorizedRequestInfo.isEmpty) + sendResponseCallback(Map.empty) + else { + val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId + + // call the replica manager to append messages to the replicas + replicaManager.appendRecords( + timeout = produceRequest.timeout.toLong, + requiredAcks = produceRequest.acks, + internalTopicsAllowed = internalTopicsAllowed, + isFromClient = true, + entriesPerPartition = authorizedRequestInfo, + responseCallback = sendResponseCallback) + + // if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected; + // hence we clear its data here inorder to let GC re-claim its memory since it is already appended to log + produceRequest.clearPartitionRecords() } } @@ -1052,13 +1062,16 @@ class KafkaApis(val requestChannel: RequestChannel, def handleFindCoordinatorRequest(request: RequestChannel.Request) { val findCoordinatorRequest = request.body[FindCoordinatorRequest] - if (findCoordinatorRequest.coordinatorType == FindCoordinatorRequest.CoordinatorType.GROUP && - !authorize(request.session, Describe, new Resource(Group, findCoordinatorRequest.coordinatorKey))) { - sendResponseMaybeThrottle(request, requestThrottleMs => - new FindCoordinatorResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode)) - } else { - // TODO: Authorize by transactional id if coordinator type is TRANSACTION + def sendErrorResponse(error: Errors): Unit = + sendResponseMaybeThrottle(request, requestThrottleMs => new FindCoordinatorResponse(error, Node.noNode)) + if (findCoordinatorRequest.coordinatorType == FindCoordinatorRequest.CoordinatorType.GROUP && + !authorize(request.session, Describe, new Resource(Group, findCoordinatorRequest.coordinatorKey))) + sendErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED) + else if (findCoordinatorRequest.coordinatorType == FindCoordinatorRequest.CoordinatorType.TRANSACTION && + !authorize(request.session, Describe, new Resource(TransactionalId, findCoordinatorRequest.coordinatorKey))) + sendErrorResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) + else { // get metadata (and create the topic if necessary) val (partition, topicMetadata) = findCoordinatorRequest.coordinatorType match { case FindCoordinatorRequest.CoordinatorType.GROUP => @@ -1102,18 +1115,18 @@ class KafkaApis(val requestChannel: RequestChannel, val describeRequest = request.body[DescribeGroupsRequest] val groups = describeRequest.groupIds.asScala.map { groupId => - if (!authorize(request.session, Describe, new Resource(Group, groupId))) { - groupId -> DescribeGroupsResponse.GroupMetadata.forError(Errors.GROUP_AUTHORIZATION_FAILED) - } else { - val (error, summary) = groupCoordinator.handleDescribeGroup(groupId) - val members = summary.members.map { member => - val metadata = ByteBuffer.wrap(member.metadata) - val assignment = ByteBuffer.wrap(member.assignment) - new DescribeGroupsResponse.GroupMember(member.memberId, member.clientId, member.clientHost, metadata, assignment) - } - groupId -> new DescribeGroupsResponse.GroupMetadata(error, summary.state, summary.protocolType, - summary.protocol, members.asJava) + if (!authorize(request.session, Describe, new Resource(Group, groupId))) { + groupId -> DescribeGroupsResponse.GroupMetadata.forError(Errors.GROUP_AUTHORIZATION_FAILED) + } else { + val (error, summary) = groupCoordinator.handleDescribeGroup(groupId) + val members = summary.members.map { member => + val metadata = ByteBuffer.wrap(member.metadata) + val assignment = ByteBuffer.wrap(member.assignment) + new DescribeGroupsResponse.GroupMember(member.memberId, member.clientId, member.clientHost, metadata, assignment) } + groupId -> new DescribeGroupsResponse.GroupMetadata(error, summary.state, summary.protocolType, + summary.protocol, members.asJava) + } }.toMap sendResponseMaybeThrottle(request, requestThrottleMs => new DescribeGroupsResponse(requestThrottleMs, groups.asJava)) @@ -1414,31 +1427,36 @@ class KafkaApis(val requestChannel: RequestChannel, val initProducerIdRequest = request.body[InitProducerIdRequest] val transactionalId = initProducerIdRequest.transactionalId + def sendErrorResponse(error: Errors): Unit = { + sendResponseMaybeThrottle(request, requestThrottleMs => new InitProducerIdResponse(requestThrottleMs, error)) + } - if (!authorize(request.session, Write, Resource.ProducerIdResource)) { - sendResponseMaybeThrottle(request, requestThrottleMs => - new InitProducerIdResponse(requestThrottleMs, Errors.PRODUCER_ID_AUTHORIZATION_FAILED)) - } else if (transactionalId == null || authorize(request.session, Write, new Resource(ProducerTransactionalId, transactionalId))) { - // Send response callback - def sendResponseCallback(result: InitProducerIdResult): Unit = { - def createResponse(requestThrottleMs: Int): AbstractResponse = { - val responseBody = new InitProducerIdResponse(requestThrottleMs, result.error, result.producerId, result.producerEpoch) - trace(s"Completed $transactionalId's InitProducerIdRequest with result $result from client ${request.header.clientId}.") - responseBody - } - sendResponseMaybeThrottle(request, createResponse) + if (transactionalId != null) { + if (!authorize(request.session, Write, new Resource(TransactionalId, transactionalId))) { + sendErrorResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) + return } - txnCoordinator.handleInitProducerId(transactionalId, initProducerIdRequest.transactionTimeoutMs, sendResponseCallback) - }else - sendResponseMaybeThrottle(request, requestThrottleMs => - new InitProducerIdResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED)) + } else if (!authorize(request.session, IdempotentWrite, Resource.ClusterResource)) { + sendErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED) + return + } + + def sendResponseCallback(result: InitProducerIdResult): Unit = { + def createResponse(requestThrottleMs: Int): AbstractResponse = { + val responseBody = new InitProducerIdResponse(requestThrottleMs, result.error, result.producerId, result.producerEpoch) + trace(s"Completed $transactionalId's InitProducerIdRequest with result $result from client ${request.header.clientId}.") + responseBody + } + sendResponseMaybeThrottle(request, createResponse) + } + txnCoordinator.handleInitProducerId(transactionalId, initProducerIdRequest.transactionTimeoutMs, sendResponseCallback) } def handleEndTxnRequest(request: RequestChannel.Request): Unit = { val endTxnRequest = request.body[EndTxnRequest] val transactionalId = endTxnRequest.transactionalId - if(authorize(request.session, Write, new Resource(ProducerTransactionalId, transactionalId))) { + if (authorize(request.session, Write, new Resource(TransactionalId, transactionalId))) { def sendResponseCallback(error: Errors) { def createResponse(requestThrottleMs: Int): AbstractResponse = { val responseBody = new EndTxnResponse(requestThrottleMs, error) @@ -1471,6 +1489,7 @@ class KafkaApis(val requestChannel: RequestChannel, } def sendResponseCallback(producerId: Long, result: TransactionResult)(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = { + trace(s"End transaction marker append for producer id $producerId completed with status: $responseStatus") errors.put(producerId, responseStatus.mapValues(_.error).asJava) val successfulOffsetsPartitions = responseStatus.filter { case (topicPartition, partitionResponse) => @@ -1524,9 +1543,9 @@ class KafkaApis(val requestChannel: RequestChannel, val transactionalId = addPartitionsToTxnRequest.transactionalId val partitionsToAdd = addPartitionsToTxnRequest.partitions - if(!authorize(request.session, Write, new Resource(ProducerTransactionalId, transactionalId))) + if (!authorize(request.session, Write, new Resource(TransactionalId, transactionalId))) sendResponseMaybeThrottle(request, requestThrottleMs => - addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception())) + addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)) else { val internalTopics = partitionsToAdd.asScala.filter {tp => org.apache.kafka.common.internals.Topic.isInternal(tp.topic())} @@ -1535,24 +1554,24 @@ class KafkaApis(val requestChannel: RequestChannel, authorize(request.session, Describe, new Resource(Topic, tp.topic)) && metadataCache.contains(tp.topic) } - val (authorizedRequestInfo, unauthorizedForWriteRequestInfo) = existingAndAuthorizedForDescribeTopics.partition { - tp => authorize(request.session, Write, new Resource(Topic, tp.topic)) + val unauthorizedForWriteRequestInfo = existingAndAuthorizedForDescribeTopics.filterNot { tp => + authorize(request.session, Write, new Resource(Topic, tp.topic)) } if (nonExistingOrUnauthorizedForDescribeTopics.nonEmpty || unauthorizedForWriteRequestInfo.nonEmpty || internalTopics.nonEmpty) { - // Only send back error responses for the partitions that failed. If there are any partition failures - // then the entire request fails - val partitionErrors = unauthorizedForWriteRequestInfo.map { tp => (tp, Errors.TOPIC_AUTHORIZATION_FAILED) }.toMap ++ - nonExistingOrUnauthorizedForDescribeTopics.map { tp => (tp, Errors.UNKNOWN_TOPIC_OR_PARTITION) }.toMap ++ - internalTopics.map { tp => (tp, Errors.TOPIC_AUTHORIZATION_FAILED) } + // Any failed partition check causes the entire request to fail. We only send back error responses + // for the partitions that failed to avoid needing to send an ambiguous error code for the partitions + // which succeeded. + val partitionErrors = (unauthorizedForWriteRequestInfo.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED) ++ + nonExistingOrUnauthorizedForDescribeTopics.map(_ -> Errors.UNKNOWN_TOPIC_OR_PARTITION) ++ + internalTopics.map(_ ->Errors.TOPIC_AUTHORIZATION_FAILED)).toMap sendResponseMaybeThrottle(request, requestThrottleMs => new AddPartitionsToTxnResponse(requestThrottleMs, partitionErrors.asJava)) } else { - // Send response callback def sendResponseCallback(error: Errors): Unit = { def createResponse(requestThrottleMs: Int): AbstractResponse = { val responseBody: AddPartitionsToTxnResponse = new AddPartitionsToTxnResponse(requestThrottleMs, @@ -1565,14 +1584,12 @@ class KafkaApis(val requestChannel: RequestChannel, } txnCoordinator.handleAddPartitionsToTransaction(transactionalId, - addPartitionsToTxnRequest.producerId(), - addPartitionsToTxnRequest.producerEpoch(), + addPartitionsToTxnRequest.producerId, + addPartitionsToTxnRequest.producerEpoch, partitionsToAdd.asScala.toSet, sendResponseCallback) } } - - } def handleAddOffsetsToTxnRequest(request: RequestChannel.Request): Unit = { @@ -1581,44 +1598,47 @@ class KafkaApis(val requestChannel: RequestChannel, val groupId = addOffsetsToTxnRequest.consumerGroupId val offsetTopicPartition = new TopicPartition(GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId)) - if (!authorize(request.session, Write, new Resource(ProducerTransactionalId, transactionalId))) + if (!authorize(request.session, Write, new Resource(TransactionalId, transactionalId))) sendResponseMaybeThrottle(request, requestThrottleMs => new AddOffsetsToTxnResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED)) else if (!authorize(request.session, Read, new Resource(Group, groupId))) sendResponseMaybeThrottle(request, requestThrottleMs => new AddOffsetsToTxnResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED)) else { - // Send response callback - def sendResponseCallback(error: Errors): Unit = { - def createResponse(requestThrottleMs: Int): AbstractResponse = { - val responseBody: AddOffsetsToTxnResponse = new AddOffsetsToTxnResponse(requestThrottleMs, error) - trace(s"Completed $transactionalId's AddOffsetsToTxnRequest for group $groupId as on partition $offsetTopicPartition: errors: $error from client ${request.header.clientId}") - responseBody - } - sendResponseMaybeThrottle(request, createResponse) + def sendResponseCallback(error: Errors): Unit = { + def createResponse(requestThrottleMs: Int): AbstractResponse = { + val responseBody: AddOffsetsToTxnResponse = new AddOffsetsToTxnResponse(requestThrottleMs, error) + trace(s"Completed $transactionalId's AddOffsetsToTxnRequest for group $groupId on partition " + + s"$offsetTopicPartition: errors: $error from client ${request.header.clientId}") + responseBody } - - txnCoordinator.handleAddPartitionsToTransaction(transactionalId, - addOffsetsToTxnRequest.producerId, - addOffsetsToTxnRequest.producerEpoch, - Set(offsetTopicPartition), - sendResponseCallback) + sendResponseMaybeThrottle(request, createResponse) } - } + txnCoordinator.handleAddPartitionsToTransaction(transactionalId, + addOffsetsToTxnRequest.producerId, + addOffsetsToTxnRequest.producerEpoch, + Set(offsetTopicPartition), + sendResponseCallback) + } + } def handleTxnOffsetCommitRequest(request: RequestChannel.Request): Unit = { val header = request.header val txnOffsetCommitRequest = request.body[TxnOffsetCommitRequest] - // reject the request if not authorized to the group - if (!authorize(request.session, Read, new Resource(Group, txnOffsetCommitRequest.consumerGroupId))) { - val error = Errors.GROUP_AUTHORIZATION_FAILED - val results = txnOffsetCommitRequest.offsets.keySet.asScala.map { topicPartition => - (topicPartition, error) - }.toMap + + def sendErrorResponse(error: Errors): Unit = { sendResponseMaybeThrottle(request, requestThrottleMs => - new TxnOffsetCommitResponse(requestThrottleMs, results.asJava)) - } else { + txnOffsetCommitRequest.getErrorResponse(requestThrottleMs, error.exception)) + } + + // authorize for the transactionalId and the consumer group. Note that we skip producerId authorization + // since it is implied by transactionalId authorization + if (!authorize(request.session, Write, new Resource(TransactionalId, txnOffsetCommitRequest.transactionalId))) + sendErrorResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) + else if (!authorize(request.session, Read, new Resource(Group, txnOffsetCommitRequest.consumerGroupId))) + sendErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED) + else { val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = txnOffsetCommitRequest.offsets.asScala.toMap.partition { case (topicPartition, _) => val authorizedForDescribe = authorize(request.session, Describe, new Resource(Topic, topicPartition.topic))
http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 0198d38..c464834 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -42,6 +42,7 @@ import org.apache.kafka.common.KafkaException import kafka.admin.AdminUtils import kafka.log.LogConfig import kafka.network.SocketServer +import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, SimpleRecord} @@ -55,6 +56,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val createTopic = "topic-new" val deleteTopic = "topic-delete" val transactionalId = "transactional.id" + val producerId = 83392L val part = 0 val correlationId = 0 val clientId = "client-Id" @@ -64,22 +66,24 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val topicResource = new Resource(Topic, topic) val groupResource = new Resource(Group, group) val deleteTopicResource = new Resource(Topic, deleteTopic) - val producerTransactionalIdResource = new Resource(ProducerTransactionalId, transactionalId) + val transactionalIdResource = new Resource(TransactionalId, transactionalId) val groupReadAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read))) + val groupDescribeAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe))) val clusterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction))) val clusterCreateAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create))) + val clusterIdempotentWriteAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, IdempotentWrite))) val topicReadAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read))) val topicWriteAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write))) val topicDescribeAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe))) val topicDeleteAcl = Map(deleteTopicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete))) val topicDescribeConfigsAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, DescribeConfigs))) val topicAlterConfigsAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, AlterConfigs))) - val producerTransactionalIdWriteAcl = Map(producerTransactionalIdResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write))) + val transactionIdWriteAcl = Map(transactionalIdResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write))) + val transactionalIdDescribeAcl = Map(transactionalIdResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe))) val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]() val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]() - var transactionalProducer: KafkaProducer[Array[Byte], Array[Byte]] = _ val producerCount = 1 val consumerCount = 2 @@ -115,7 +119,13 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.DELETE_TOPICS -> classOf[requests.DeleteTopicsResponse], ApiKeys.OFFSET_FOR_LEADER_EPOCH -> classOf[OffsetsForLeaderEpochResponse], ApiKeys.DESCRIBE_CONFIGS -> classOf[DescribeConfigsResponse], - ApiKeys.ALTER_CONFIGS -> classOf[AlterConfigsResponse] + ApiKeys.ALTER_CONFIGS -> classOf[AlterConfigsResponse], + ApiKeys.INIT_PRODUCER_ID -> classOf[InitProducerIdResponse], + ApiKeys.WRITE_TXN_MARKERS -> classOf[WriteTxnMarkersResponse], + ApiKeys.ADD_PARTITIONS_TO_TXN -> classOf[AddPartitionsToTxnResponse], + ApiKeys.ADD_OFFSETS_TO_TXN -> classOf[AddOffsetsToTxnResponse], + ApiKeys.END_TXN -> classOf[EndTxnResponse], + ApiKeys.TXN_OFFSET_COMMIT -> classOf[TxnOffsetCommitResponse] ) val requestKeyToError = Map[ApiKeys, Nothing => Errors]( @@ -140,17 +150,23 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.DESCRIBE_CONFIGS -> ((resp: DescribeConfigsResponse) => resp.configs.get(new RResource(RResourceType.TOPIC, tp.topic)).error.error), ApiKeys.ALTER_CONFIGS -> ((resp: AlterConfigsResponse) => - resp.errors.get(new RResource(RResourceType.TOPIC, tp.topic)).error) + resp.errors.get(new RResource(RResourceType.TOPIC, tp.topic)).error), + ApiKeys.INIT_PRODUCER_ID -> ((resp: InitProducerIdResponse) => resp.error), + ApiKeys.WRITE_TXN_MARKERS -> ((resp: WriteTxnMarkersResponse) => resp.errors(producerId).get(tp)), + ApiKeys.ADD_PARTITIONS_TO_TXN -> ((resp: AddPartitionsToTxnResponse) => resp.errors.get(tp)), + ApiKeys.ADD_OFFSETS_TO_TXN -> ((resp: AddOffsetsToTxnResponse) => resp.error), + ApiKeys.END_TXN -> ((resp: EndTxnResponse) => resp.error), + ApiKeys.TXN_OFFSET_COMMIT -> ((resp: TxnOffsetCommitResponse) => resp.errors.get(tp)) ) val requestKeysToAcls = Map[ApiKeys, Map[Resource, Set[Acl]]]( ApiKeys.METADATA -> topicDescribeAcl, - ApiKeys.PRODUCE -> topicWriteAcl, + ApiKeys.PRODUCE -> (topicWriteAcl ++ transactionIdWriteAcl ++ clusterIdempotentWriteAcl), ApiKeys.FETCH -> topicReadAcl, ApiKeys.LIST_OFFSETS -> topicDescribeAcl, ApiKeys.OFFSET_COMMIT -> (topicReadAcl ++ groupReadAcl), ApiKeys.OFFSET_FETCH -> (topicReadAcl ++ groupReadAcl), - ApiKeys.FIND_COORDINATOR -> (topicReadAcl ++ groupReadAcl), + ApiKeys.FIND_COORDINATOR -> (topicReadAcl ++ groupDescribeAcl ++ transactionalIdDescribeAcl), ApiKeys.UPDATE_METADATA_KEY -> clusterAcl, ApiKeys.JOIN_GROUP -> groupReadAcl, ApiKeys.SYNC_GROUP -> groupReadAcl, @@ -163,7 +179,13 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.DELETE_TOPICS -> topicDeleteAcl, ApiKeys.OFFSET_FOR_LEADER_EPOCH -> clusterAcl, ApiKeys.DESCRIBE_CONFIGS -> topicDescribeConfigsAcl, - ApiKeys.ALTER_CONFIGS -> topicAlterConfigsAcl + ApiKeys.ALTER_CONFIGS -> topicAlterConfigsAcl, + ApiKeys.INIT_PRODUCER_ID -> (transactionIdWriteAcl ++ clusterIdempotentWriteAcl), + ApiKeys.WRITE_TXN_MARKERS -> clusterAcl, + ApiKeys.ADD_PARTITIONS_TO_TXN -> (topicWriteAcl ++ transactionIdWriteAcl), + ApiKeys.ADD_OFFSETS_TO_TXN -> (groupReadAcl ++ transactionIdWriteAcl), + ApiKeys.END_TXN -> transactionIdWriteAcl, + ApiKeys.TXN_OFFSET_COMMIT -> (groupReadAcl ++ transactionIdWriteAcl) ) @Before @@ -177,14 +199,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest { maxBlockMs = 3000, acks = 1) - val transactionalProperties = new Properties() - transactionalProperties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") - transactionalProperties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId) - transactionalProducer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers), - retries = 3, - props = Some(transactionalProperties) - ) - for (_ <- 0 until consumerCount) consumers += TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT) @@ -204,7 +218,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest { producers.foreach(_.close()) consumers.foreach(_.wakeup()) consumers.foreach(_.close()) - transactionalProducer.close() removeAllAcls() super.tearDown() } @@ -338,17 +351,17 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ) for ((key, request) <- requestKeyToRequest) { - removeAllAcls + removeAllAcls() val resources = requestKeysToAcls(key).map(_._1.resourceType).toSet sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false, isAuthorizedTopicDescribe = false) val resourceToAcls = requestKeysToAcls(key) - resourceToAcls.get(topicResource).map { acls => + resourceToAcls.get(topicResource).foreach { acls => val describeAcls = topicDescribeAcl(topicResource) val isAuthorized = describeAcls == acls addAndVerifyAcls(describeAcls, topicResource) sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized, isAuthorizedTopicDescribe = true) - removeAllAcls + removeAllAcls() } for ((resource, acls) <- resourceToAcls) @@ -377,17 +390,17 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ) for ((key, request) <- requestKeyToRequest) { - removeAllAcls + removeAllAcls() val resources = requestKeysToAcls(key).map(_._1.resourceType).toSet sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false, isAuthorizedTopicDescribe = false, topicExists = false) val resourceToAcls = requestKeysToAcls(key) - resourceToAcls.get(topicResource).map { acls => + resourceToAcls.get(topicResource).foreach { acls => val describeAcls = topicDescribeAcl(topicResource) val isAuthorized = describeAcls == acls addAndVerifyAcls(describeAcls, topicResource) sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized, isAuthorizedTopicDescribe = true, topicExists = false) - removeAllAcls + removeAllAcls() } for ((resource, acls) <- resourceToAcls) @@ -852,44 +865,182 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } @Test(expected = classOf[TransactionalIdAuthorizationException]) - def shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessOnInitTransactions(): Unit = { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), Resource.ProducerIdResource) - transactionalProducer.initTransactions() + def testTransactionalProducerInitTransactionsNoWriteTransactionalIdAcl(): Unit = { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), transactionalIdResource) + val producer = buildTransactionalProducer() + producer.initTransactions() + } + + @Test(expected = classOf[TransactionalIdAuthorizationException]) + def testTransactionalProducerInitTransactionsNoDescribeTransactionalIdAcl(): Unit = { + val producer = buildTransactionalProducer() + producer.initTransactions() + } + + @Test + def testSendOffsetsWithNoConsumerGroupDescribeAccess(): Unit = { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)), Resource.ClusterResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource) + val producer = buildTransactionalProducer() + producer.initTransactions() + producer.beginTransaction() + try { + producer.sendOffsetsToTransaction(Map(new TopicPartition(topic, 0) -> new OffsetAndMetadata(0L)).asJava, group) + fail("Should have raised GroupAuthorizationException") + } catch { + case e: GroupAuthorizationException => + } + } + + @Test + def testSendOffsetsWithNoConsumerGroupWriteAccess(): Unit = { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), groupResource) + val producer = buildTransactionalProducer() + producer.initTransactions() + producer.beginTransaction() + try { + producer.sendOffsetsToTransaction(Map(new TopicPartition(topic, 0) -> new OffsetAndMetadata(0L)).asJava, group) + fail("Should have raised GroupAuthorizationException") + } catch { + case e: GroupAuthorizationException => + } + } + + @Test + def testIdempotentProducerNoIdempotentWriteAclInInitProducerId(): Unit = { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + val producer = buildIdempotentProducer() + try { + // the InitProducerId is sent asynchronously, so we expect the error either in the callback + // or raised from send itself + producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "hi".getBytes)).get() + fail("Should have raised ClusterAuthorizationException") + } catch { + case e: ExecutionException => + assertTrue(e.getCause.isInstanceOf[ClusterAuthorizationException]) + } + try { + // the second time, the call to send itself should fail (the producer becomes unusable + // if no producerId can be obtained) + producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "hi".getBytes)) + fail("Should have raised ClusterAuthorizationException") + } catch { + case e: KafkaException => + assertTrue(e.getCause.isInstanceOf[ClusterAuthorizationException]) + } + } + + @Test + def testIdempotentProducerNoIdempotentWriteAclInProduce(): Unit = { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, IdempotentWrite)), Resource.ClusterResource) + + val producer = buildIdempotentProducer() + + // first send should be fine since we have permission to get a ProducerId + producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "hi".getBytes)).get() + + // revoke the IdempotentWrite permission + removeAllAcls() + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + + try { + // the send should now fail with a cluster auth error + producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "hi".getBytes)).get() + fail("Should have raised ClusterAuthorizationException") + } catch { + case e: ExecutionException => + assertTrue(e.getCause.isInstanceOf[ClusterAuthorizationException]) + } + try { + // the second time, the call to send itself should fail (the producer becomes unusable + // if no producerId can be obtained) + producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "hi".getBytes)) + fail("Should have raised ClusterAuthorizationException") + } catch { + case e: KafkaException => + assertTrue(e.getCause.isInstanceOf[ClusterAuthorizationException]) + } } @Test def shouldInitTransactionsWhenAclSet(): Unit = { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), producerTransactionalIdResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), Resource.ProducerIdResource) - transactionalProducer.initTransactions() + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource) + val producer = buildTransactionalProducer() + producer.initTransactions() } + @Test + def testTransactionalProducerTopicAuthorizationExceptionInSendCallback(): Unit = { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource) + // add describe access so that we can fetch metadata + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) + val producer = buildTransactionalProducer() + producer.initTransactions() + producer.beginTransaction() + try { + producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, "1".getBytes)).get + Assert.fail("expected TopicAuthorizationException") + } catch { + case e: ExecutionException => + e.getCause match { + case cause: TopicAuthorizationException => + assertEquals(Set(topic), cause.unauthorizedTopics().asScala) + case other => + fail("Unexpected failure cause in send callback") + } + } + } + + @Test + def testTransactionalProducerTopicAuthorizationExceptionInCommit(): Unit = { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource) + // add describe access so that we can fetch metadata + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) + val producer = buildTransactionalProducer() + producer.initTransactions() + producer.beginTransaction() + try { + producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, "1".getBytes)) + producer.commitTransaction() + Assert.fail("expected TopicAuthorizationException") + } catch { + case e: TopicAuthorizationException => + assertEquals(Set(topic), e.unauthorizedTopics().asScala) + } + } @Test def shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessDuringSend(): Unit = { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), producerTransactionalIdResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), Resource.ProducerIdResource) - transactionalProducer.initTransactions() + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource) + val producer = buildTransactionalProducer() + producer.initTransactions() removeAllAcls() addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) try { - transactionalProducer.beginTransaction() - transactionalProducer.send(new ProducerRecord(tp.topic(), tp.partition(), "1".getBytes, "1".getBytes)).get + producer.beginTransaction() + producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, "1".getBytes)).get Assert.fail("expected TransactionalIdAuthorizationException") } catch { - case e: ExecutionException => assertTrue(s"expected TransactionalIdAuthorizationException, but got ${e.getCause}", e.getCause.isInstanceOf[TransactionalIdAuthorizationException]) + case e: ExecutionException => assertTrue(s"expected TransactionalIdAuthorizationException, but got ${e.getCause}", + e.getCause.isInstanceOf[TransactionalIdAuthorizationException]) } } @Test def shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessOnEndTransaction(): Unit = { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), producerTransactionalIdResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), Resource.ProducerIdResource) - transactionalProducer.initTransactions() - transactionalProducer.beginTransaction() + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + val producer = buildTransactionalProducer() + producer.initTransactions() + producer.beginTransaction() + producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, "1".getBytes)).get + producer.flush() removeAllAcls() try { - transactionalProducer.commitTransaction() + producer.commitTransaction() Assert.fail("expected TransactionalIdAuthorizationException") } catch { case _: TransactionalIdAuthorizationException => // ok @@ -898,50 +1049,27 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessOnSendOffsetsToTxn(): Unit = { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), producerTransactionalIdResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), Resource.ProducerIdResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), transactionalIdResource) addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), groupResource) - transactionalProducer.initTransactions() - transactionalProducer.beginTransaction() + val producer = buildTransactionalProducer() + producer.initTransactions() + producer.beginTransaction() removeAllAcls() try { - val offsets: util.Map[TopicPartition, OffsetAndMetadata] = Map(new TopicPartition(topicAndPartition.topic, topicAndPartition.partition) -> new OffsetAndMetadata(1L)).asJava - transactionalProducer.sendOffsetsToTransaction(offsets, group) + val offsets: util.Map[TopicPartition, OffsetAndMetadata] = Map(new TopicPartition(tp.topic, tp.partition) -> new OffsetAndMetadata(1L)).asJava + producer.sendOffsetsToTransaction(offsets, group) Assert.fail("expected TransactionalIdAuthorizationException") } catch { case _: TransactionalIdAuthorizationException => // ok } } - - @Test - def shouldThrowProducerIdAuthorizationExceptionWhenAclNotSet(): Unit = { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) - val idempotentProperties = new Properties() - idempotentProperties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") - val idempotentProducer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers), - retries = 3, - props = Some(idempotentProperties) - ) - try { - idempotentProducer.send(new ProducerRecord(tp.topic(), tp.partition(), "1".getBytes, "1".getBytes)).get - Assert.fail("expected ProducerIdAuthorizationException") - } catch { - case e: ExecutionException => assertTrue(s"expected ProducerIdAuthorizationException, but got ${e.getCause}", e.getCause.isInstanceOf[ProducerIdAuthorizationException]) - } - } - @Test def shouldSendSuccessfullyWhenIdempotentAndHasCorrectACL(): Unit = { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), Resource.ProducerIdResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, IdempotentWrite)), Resource.ClusterResource) addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) - val idempotentProperties = new Properties() - idempotentProperties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") - val idempotentProducer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers), - retries = 3, - props = Some(idempotentProperties) - ) - idempotentProducer.send(new ProducerRecord(tp.topic(), tp.partition(), "1".getBytes, "1".getBytes)).get + val producer = buildIdempotentProducer() + producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, "1".getBytes)).get } def removeAllAcls() = { @@ -1032,4 +1160,24 @@ class AuthorizerIntegrationTest extends BaseRequestTest { requests.OffsetFetchResponse.parse(response, request.version) } + private def buildTransactionalProducer(): KafkaProducer[Array[Byte], Array[Byte]] = { + val transactionalProperties = new Properties() + transactionalProperties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId) + val producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers), + retries = 3, + props = Some(transactionalProperties)) + producers += producer + producer + } + + private def buildIdempotentProducer(): KafkaProducer[Array[Byte], Array[Byte]] = { + val idempotentProperties = new Properties() + idempotentProperties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") + val producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers), + retries = 3, + props = Some(idempotentProperties)) + producers += producer + producer + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala index 5d46348..1b88f40 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala @@ -77,17 +77,17 @@ class TransactionsBounceTest extends KafkaServerTestHarness { def testBrokerFailure() { // basic idea is to seed a topic with 10000 records, and copy it transactionally while bouncing brokers // constantly through the period. - val consumerGroup= "myGroup" + val consumerGroup = "myGroup" val numInputRecords = 5000 createTopics() TestUtils.seedTopicWithNumberedRecords(inputTopic, numInputRecords, servers) - val consumer = createConsumerAndSubscribeToTopics(consumerGroup, List(inputTopic)) - val producer = TestUtils.createTransactionalProducer("my-test-producer-t.id", servers) + val producer = TestUtils.createTransactionalProducer("test-txn", servers) - val scheduler = new BounceScheduler producer.initTransactions() + + val scheduler = new BounceScheduler scheduler.start() var numMessagesProcessed = 0 @@ -97,16 +97,17 @@ class TransactionsBounceTest extends KafkaServerTestHarness { val toRead = Math.min(200, numInputRecords - numMessagesProcessed) trace(s"$iteration: About to read $toRead messages, processed $numMessagesProcessed so far..") val records = TestUtils.pollUntilAtLeastNumRecords(consumer, toRead) - trace(s"received ${records.size} messages. sending them transactionally to $outputTopic") + trace(s"Received ${records.size} messages, sending them transactionally to $outputTopic") + producer.beginTransaction() - val shouldAbort = iteration % 2 == 0 - records.zipWithIndex.foreach { case (record, i) => - producer.send( - TestUtils.producerRecordWithExpectedTransactionStatus(outputTopic, record.key, record.value, !shouldAbort), - new ErrorLoggingCallback(outputTopic, record.key, record.value, true)) + val shouldAbort = iteration % 3 == 0 + records.foreach { record => + producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(outputTopic, record.key, record.value, + !shouldAbort), new ErrorLoggingCallback(outputTopic, record.key, record.value, true)) } trace(s"Sent ${records.size} messages. Committing offsets.") producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer), consumerGroup) + if (shouldAbort) { trace(s"Committed offsets. Aborting transaction of ${records.size} messages.") producer.abortTransaction() @@ -125,8 +126,8 @@ class TransactionsBounceTest extends KafkaServerTestHarness { scheduler.shutdown() - val verifyingConsumer = createConsumerAndSubscribeToTopics("randoGroup", List(outputTopic), readCommitted = true) - val outputRecords = TestUtils.pollUntilAtLeastNumRecords(verifyingConsumer, numInputRecords).map { case(record) => + val verifyingConsumer = createConsumerAndSubscribeToTopics("randomGroup", List(outputTopic), readCommitted = true) + val outputRecords = TestUtils.pollUntilAtLeastNumRecords(verifyingConsumer, numInputRecords).map { record => TestUtils.assertCommittedAndGetValue(record).toInt } val recordSet = outputRecords.toSet @@ -142,7 +143,7 @@ class TransactionsBounceTest extends KafkaServerTestHarness { val props = new Properties() if (readCommitted) props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed") - props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "200") + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "2000") props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000") props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000") @@ -157,8 +158,8 @@ class TransactionsBounceTest extends KafkaServerTestHarness { private def createTopics() = { val topicConfig = new Properties() topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString) - TestUtils.createTopic(zkUtils, inputTopic, numPartitions, numServers, servers, topicConfig) - TestUtils.createTopic(zkUtils, outputTopic, numPartitions, numServers, servers, topicConfig) + TestUtils.createTopic(zkUtils, inputTopic, numPartitions, 3, servers, topicConfig) + TestUtils.createTopic(zkUtils, outputTopic, numPartitions, 3, servers, topicConfig) } private class BounceScheduler extends ShutdownableThread("daemon-broker-bouncer", false) { http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/core/src/test/scala/integration/kafka/api/TransactionsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala index ec6b3ea..fd9d884 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala @@ -120,15 +120,15 @@ class TransactionsTest extends KafkaServerTestHarness { consumer.subscribe(List(topic1)) producer.initTransactions() - val random = new Random() var shouldCommit = false var recordsProcessed = 0 try { while (recordsProcessed < numSeedMessages) { + val records = TestUtils.pollUntilAtLeastNumRecords(consumer, Math.min(10, numSeedMessages - recordsProcessed)) + producer.beginTransaction() shouldCommit = !shouldCommit - val records = TestUtils.pollUntilAtLeastNumRecords(consumer, Math.min(10, numSeedMessages - recordsProcessed)) records.zipWithIndex.foreach { case (record, i) => val key = new String(record.key(), "UTF-8") val value = new String(record.value(), "UTF-8") @@ -153,7 +153,7 @@ class TransactionsTest extends KafkaServerTestHarness { consumer.close() } - // Inspite of random aborts, we should still have exactly 1000 messages in topic2. Ie. we should not + // In spite of random aborts, we should still have exactly 1000 messages in topic2. Ie. we should not // re-copy or miss any messages from topic1, since the consumed offsets were committed transactionally. val verifyingConsumer = transactionalConsumer("foobargroup") verifyingConsumer.subscribe(List(topic2)) @@ -334,7 +334,6 @@ class TransactionsTest extends KafkaServerTestHarness { fail("Should not be able to send messages from a fenced producer.") } catch { case e : ProducerFencedException => - producer1.close() case e : ExecutionException => assertTrue(e.getCause.isInstanceOf[ProducerFencedException]) case e : Exception => http://git-wip-us.apache.org/repos/asf/kafka/blob/9a21bf20/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala index f5b0a06..f379585 100644 --- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala @@ -36,26 +36,33 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { private val TopicResources = Set(new Resource(Topic, "test-1"), new Resource(Topic, "test-2")) private val GroupResources = Set(new Resource(Group, "testGroup-1"), new Resource(Group, "testGroup-2")) private val BrokerResources = Set(new Resource(Broker, "0"), new Resource(Broker, "1")) + private val TransactionalIdResources = Set(new Resource(TransactionalId, "t0"), new Resource(TransactionalId, "t1")) private val ResourceToCommand = Map[Set[Resource], Array[String]]( TopicResources -> Array("--topic", "test-1", "--topic", "test-2"), Set(Resource.ClusterResource) -> Array("--cluster"), GroupResources -> Array("--group", "testGroup-1", "--group", "testGroup-2"), - BrokerResources -> Array("--broker", "0", "--broker", "1") + BrokerResources -> Array("--broker", "0", "--broker", "1"), + TransactionalIdResources -> Array("--transactional-id", "t0", "--transactional-id", "t1") ) private val ResourceToOperations = Map[Set[Resource], (Set[Operation], Array[String])]( TopicResources -> (Set(Read, Write, Describe, Delete, DescribeConfigs, AlterConfigs), Array("--operation", "Read" , "--operation", "Write", "--operation", "Describe", "--operation", "Delete", "--operation", "DescribeConfigs", "--operation", "AlterConfigs")), - Set(Resource.ClusterResource) -> (Set(Create, ClusterAction), Array("--operation", "Create", "--operation", "ClusterAction")), + Set(Resource.ClusterResource) -> (Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite), + Array("--operation", "Create", "--operation", "ClusterAction", "--operation", "DescribeConfigs", + "--operation", "AlterConfigs", "--operation", "IdempotentWrite")), GroupResources -> (Set(Read, Describe), Array("--operation", "Read", "--operation", "Describe")), - BrokerResources -> (Set(DescribeConfigs), Array("--operation", "DescribeConfigs")) + BrokerResources -> (Set(DescribeConfigs), Array("--operation", "DescribeConfigs")), + TransactionalIdResources -> (Set(Describe, Write), Array("--operation", "Describe", "--operation", "Write")) ) - private val ProducerResourceToAcls = Map[Set[Resource], Set[Acl]]( + private def ProducerResourceToAcls(enableIdempotence: Boolean = false) = Map[Set[Resource], Set[Acl]]( TopicResources -> AclCommand.getAcls(Users, Allow, Set(Write, Describe), Hosts), - Set(Resource.ClusterResource) -> AclCommand.getAcls(Users, Allow, Set(Create), Hosts) + TransactionalIdResources -> AclCommand.getAcls(Users, Allow, Set(Write, Describe), Hosts), + Set(Resource.ClusterResource) -> AclCommand.getAcls(Users, Allow, Set(Some(Create), + if (enableIdempotence) Some(IdempotentWrite) else None).flatten, Hosts) ) private val ConsumerResourceToAcls = Map[Set[Resource], Set[Acl]]( @@ -64,10 +71,13 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { ) private val CmdToResourcesToAcl = Map[Array[String], Map[Set[Resource], Set[Acl]]]( - Array[String]("--producer") -> ProducerResourceToAcls, + Array[String]("--producer") -> ProducerResourceToAcls(), + Array[String]("--producer", "--idempotent") -> ProducerResourceToAcls(enableIdempotence = true), Array[String]("--consumer") -> ConsumerResourceToAcls, Array[String]("--producer", "--consumer") -> ConsumerResourceToAcls.map { case (k, v) => k -> (v ++ - ProducerResourceToAcls.getOrElse(k, Set.empty[Acl])) } + ProducerResourceToAcls().getOrElse(k, Set.empty[Acl])) }, + Array[String]("--producer", "--idempotent", "--consumer") -> ConsumerResourceToAcls.map { case (k, v) => k -> (v ++ + ProducerResourceToAcls(enableIdempotence = true).getOrElse(k, Set.empty[Acl])) } ) @Test @@ -108,11 +118,11 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { } } } - testRemove(resourcesToAcls.keys.flatten.toSet, resourceCommand, args, brokerProps) + testRemove(resourcesToAcls.keys.flatten.toSet, resourceCommand ++ cmd, args, brokerProps) } } - @Test (expected = classOf[IllegalArgumentException]) + @Test(expected = classOf[IllegalArgumentException]) def testInvalidAuthorizerProperty() { val args = Array("--authorizer-properties", "zookeeper.connect " + zkConnect) AclCommand.withAuthorizer(new AclCommandOptions(args))(null) @@ -120,10 +130,10 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { private def testRemove(resources: Set[Resource], resourceCmd: Array[String], args: Array[String], brokerProps: Properties) { for (resource <- resources) { - AclCommand.main(args ++ resourceCmd :+ "--remove" :+ "--force") - withAuthorizer(brokerProps) { authorizer => - TestUtils.waitAndVerifyAcls(Set.empty[Acl], authorizer, resource) - } + AclCommand.main(args ++ resourceCmd :+ "--remove" :+ "--force") + withAuthorizer(brokerProps) { authorizer => + TestUtils.waitAndVerifyAcls(Set.empty[Acl], authorizer, resource) + } } }
