Repository: kafka Updated Branches: refs/heads/trunk e1abf1770 -> 972b75453
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 9eb1275..a362577 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -28,7 +28,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produce import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol} -import org.apache.kafka.common.requests._ +import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _} import CreateTopicsRequest.TopicDetails import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.{Node, TopicPartition, requests} @@ -40,6 +40,7 @@ import scala.collection.mutable import scala.collection.mutable.Buffer import org.apache.kafka.common.KafkaException import kafka.admin.AdminUtils +import kafka.log.LogConfig import kafka.network.SocketServer import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, SimpleRecord} @@ -65,13 +66,15 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val deleteTopicResource = new Resource(Topic, deleteTopic) val producerTransactionalIdResource = new Resource(ProducerTransactionalId, transactionalId) - val GroupReadAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read))) - val ClusterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction))) - val ClusterCreateAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create))) - val TopicReadAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read))) - val TopicWriteAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write))) - val TopicDescribeAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe))) - val TopicDeleteAcl = Map(deleteTopicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete))) + val groupReadAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read))) + val clusterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction))) + val clusterCreateAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create))) + val topicReadAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read))) + val topicWriteAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write))) + val topicDescribeAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe))) + val topicDeleteAcl = Map(deleteTopicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete))) + val topicDescribeConfigsAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, DescribeConfigs))) + val topicAlterConfigsAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, AlterConfigs))) val producerTransactionalIdWriteAcl = Map(producerTransactionalIdResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write))) val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]() @@ -92,7 +95,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1") } - val RequestKeyToResponseDeserializer: Map[ApiKeys, Class[_ <: Any]] = + val requestKeyToResponseDeserializer: Map[ApiKeys, Class[_ <: Any]] = Map(ApiKeys.METADATA -> classOf[requests.MetadataResponse], ApiKeys.PRODUCE -> classOf[requests.ProduceResponse], ApiKeys.FETCH -> classOf[requests.FetchResponse], @@ -110,15 +113,17 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.CONTROLLED_SHUTDOWN_KEY -> classOf[requests.ControlledShutdownResponse], ApiKeys.CREATE_TOPICS -> classOf[CreateTopicsResponse], ApiKeys.DELETE_TOPICS -> classOf[requests.DeleteTopicsResponse], - ApiKeys.OFFSET_FOR_LEADER_EPOCH -> classOf[OffsetsForLeaderEpochResponse] + ApiKeys.OFFSET_FOR_LEADER_EPOCH -> classOf[OffsetsForLeaderEpochResponse], + ApiKeys.DESCRIBE_CONFIGS -> classOf[DescribeConfigsResponse], + ApiKeys.ALTER_CONFIGS -> classOf[AlterConfigsResponse] ) - val RequestKeyToError = Map[ApiKeys, (Nothing) => Errors]( - ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => resp.errors().asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2), - ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => resp.responses().asScala.find(_._1 == tp).get._2.error), - ApiKeys.FETCH -> ((resp: requests.FetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.error), - ApiKeys.LIST_OFFSETS -> ((resp: requests.ListOffsetResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.error), - ApiKeys.OFFSET_COMMIT -> ((resp: requests.OffsetCommitResponse) => resp.responseData().asScala.find(_._1 == tp).get._2), + val requestKeyToError = Map[ApiKeys, Nothing => Errors]( + ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => resp.errors.asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2), + ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => resp.responses.asScala.find(_._1 == tp).get._2.error), + ApiKeys.FETCH -> ((resp: requests.FetchResponse) => resp.responseData.asScala.find(_._1 == tp).get._2.error), + ApiKeys.LIST_OFFSETS -> ((resp: requests.ListOffsetResponse) => resp.responseData.asScala.find(_._1 == tp).get._2.error), + ApiKeys.OFFSET_COMMIT -> ((resp: requests.OffsetCommitResponse) => resp.responseData.asScala.find(_._1 == tp).get._2), ApiKeys.OFFSET_FETCH -> ((resp: requests.OffsetFetchResponse) => resp.error), ApiKeys.FIND_COORDINATOR -> ((resp: FindCoordinatorResponse) => resp.error), ApiKeys.UPDATE_METADATA_KEY -> ((resp: requests.UpdateMetadataResponse) => resp.error), @@ -126,33 +131,39 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.SYNC_GROUP -> ((resp: SyncGroupResponse) => resp.error), ApiKeys.HEARTBEAT -> ((resp: HeartbeatResponse) => resp.error), ApiKeys.LEAVE_GROUP -> ((resp: LeaveGroupResponse) => resp.error), - ApiKeys.LEADER_AND_ISR -> ((resp: requests.LeaderAndIsrResponse) => resp.responses().asScala.find(_._1 == tp).get._2), - ApiKeys.STOP_REPLICA -> ((resp: requests.StopReplicaResponse) => resp.responses().asScala.find(_._1 == tp).get._2), + ApiKeys.LEADER_AND_ISR -> ((resp: requests.LeaderAndIsrResponse) => resp.responses.asScala.find(_._1 == tp).get._2), + ApiKeys.STOP_REPLICA -> ((resp: requests.StopReplicaResponse) => resp.responses.asScala.find(_._1 == tp).get._2), ApiKeys.CONTROLLED_SHUTDOWN_KEY -> ((resp: requests.ControlledShutdownResponse) => resp.error), - ApiKeys.CREATE_TOPICS -> ((resp: CreateTopicsResponse) => resp.errors().asScala.find(_._1 == createTopic).get._2.error), - ApiKeys.DELETE_TOPICS -> ((resp: requests.DeleteTopicsResponse) => resp.errors().asScala.find(_._1 == deleteTopic).get._2), - ApiKeys.OFFSET_FOR_LEADER_EPOCH -> ((resp: OffsetsForLeaderEpochResponse) => resp.responses.asScala.get(tp).get.error()) + ApiKeys.CREATE_TOPICS -> ((resp: CreateTopicsResponse) => resp.errors.asScala.find(_._1 == createTopic).get._2.error), + ApiKeys.DELETE_TOPICS -> ((resp: requests.DeleteTopicsResponse) => resp.errors.asScala.find(_._1 == deleteTopic).get._2), + ApiKeys.OFFSET_FOR_LEADER_EPOCH -> ((resp: OffsetsForLeaderEpochResponse) => resp.responses.get(tp).error), + ApiKeys.DESCRIBE_CONFIGS -> ((resp: DescribeConfigsResponse) => + resp.configs.get(new RResource(RResourceType.TOPIC, tp.topic)).error.error), + ApiKeys.ALTER_CONFIGS -> ((resp: AlterConfigsResponse) => + resp.errors.get(new RResource(RResourceType.TOPIC, tp.topic)).error) ) - val RequestKeysToAcls = Map[ApiKeys, Map[Resource, Set[Acl]]]( - ApiKeys.METADATA -> TopicDescribeAcl, - ApiKeys.PRODUCE -> TopicWriteAcl, - ApiKeys.FETCH -> TopicReadAcl, - ApiKeys.LIST_OFFSETS -> TopicDescribeAcl, - ApiKeys.OFFSET_COMMIT -> (TopicReadAcl ++ GroupReadAcl), - ApiKeys.OFFSET_FETCH -> (TopicReadAcl ++ GroupReadAcl), - ApiKeys.FIND_COORDINATOR -> (TopicReadAcl ++ GroupReadAcl), - ApiKeys.UPDATE_METADATA_KEY -> ClusterAcl, - ApiKeys.JOIN_GROUP -> GroupReadAcl, - ApiKeys.SYNC_GROUP -> GroupReadAcl, - ApiKeys.HEARTBEAT -> GroupReadAcl, - ApiKeys.LEAVE_GROUP -> GroupReadAcl, - ApiKeys.LEADER_AND_ISR -> ClusterAcl, - ApiKeys.STOP_REPLICA -> ClusterAcl, - ApiKeys.CONTROLLED_SHUTDOWN_KEY -> ClusterAcl, - ApiKeys.CREATE_TOPICS -> ClusterCreateAcl, - ApiKeys.DELETE_TOPICS -> TopicDeleteAcl, - ApiKeys.OFFSET_FOR_LEADER_EPOCH -> ClusterAcl + val requestKeysToAcls = Map[ApiKeys, Map[Resource, Set[Acl]]]( + ApiKeys.METADATA -> topicDescribeAcl, + ApiKeys.PRODUCE -> topicWriteAcl, + ApiKeys.FETCH -> topicReadAcl, + ApiKeys.LIST_OFFSETS -> topicDescribeAcl, + ApiKeys.OFFSET_COMMIT -> (topicReadAcl ++ groupReadAcl), + ApiKeys.OFFSET_FETCH -> (topicReadAcl ++ groupReadAcl), + ApiKeys.FIND_COORDINATOR -> (topicReadAcl ++ groupReadAcl), + ApiKeys.UPDATE_METADATA_KEY -> clusterAcl, + ApiKeys.JOIN_GROUP -> groupReadAcl, + ApiKeys.SYNC_GROUP -> groupReadAcl, + ApiKeys.HEARTBEAT -> groupReadAcl, + ApiKeys.LEAVE_GROUP -> groupReadAcl, + ApiKeys.LEADER_AND_ISR -> clusterAcl, + ApiKeys.STOP_REPLICA -> clusterAcl, + ApiKeys.CONTROLLED_SHUTDOWN_KEY -> clusterAcl, + ApiKeys.CREATE_TOPICS -> clusterCreateAcl, + ApiKeys.DELETE_TOPICS -> topicDeleteAcl, + ApiKeys.OFFSET_FOR_LEADER_EPOCH -> clusterAcl, + ApiKeys.DESCRIBE_CONFIGS -> topicDescribeConfigsAcl, + ApiKeys.ALTER_CONFIGS -> topicAlterConfigsAcl ) @Before @@ -221,8 +232,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } private def offsetsForLeaderEpochRequest = { - new OffsetsForLeaderEpochRequest.Builder().add(tp, 7).build() -} + new OffsetsForLeaderEpochRequest.Builder().add(tp, 7).build() + } private def createOffsetFetchRequest = { new requests.OffsetFetchRequest.Builder(group, List(tp).asJava).build() @@ -289,6 +300,17 @@ class AuthorizerIntegrationTest extends BaseRequestTest { new DeleteTopicsRequest.Builder(Set(deleteTopic).asJava, 5000).build() } + private def createDescribeConfigsRequest = + new DescribeConfigsRequest.Builder(Collections.singleton(new RResource(RResourceType.TOPIC, tp.topic))).build() + + private def createAlterConfigsRequest = + new AlterConfigsRequest.Builder( + Collections.singletonMap(new RResource(RResourceType.TOPIC, tp.topic), + new AlterConfigsRequest.Config(Collections.singleton( + new AlterConfigsRequest.ConfigEntry(LogConfig.MaxMessageBytesProp, "1000000") + ))), true).build() + + @Test def testAuthorizationWithTopicExisting() { val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( @@ -309,17 +331,19 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.CONTROLLED_SHUTDOWN_KEY -> createControlledShutdownRequest, ApiKeys.CREATE_TOPICS -> createTopicsRequest, ApiKeys.DELETE_TOPICS -> deleteTopicsRequest, - ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest + ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest, + ApiKeys.DESCRIBE_CONFIGS -> createDescribeConfigsRequest, + ApiKeys.ALTER_CONFIGS -> createAlterConfigsRequest ) for ((key, request) <- requestKeyToRequest) { removeAllAcls - val resources = RequestKeysToAcls(key).map(_._1.resourceType).toSet + val resources = requestKeysToAcls(key).map(_._1.resourceType).toSet sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false, isAuthorizedTopicDescribe = false) - val resourceToAcls = RequestKeysToAcls(key) + val resourceToAcls = requestKeysToAcls(key) resourceToAcls.get(topicResource).map { acls => - val describeAcls = TopicDescribeAcl(topicResource) + val describeAcls = topicDescribeAcl(topicResource) val isAuthorized = describeAcls == acls addAndVerifyAcls(describeAcls, topicResource) sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized, isAuthorizedTopicDescribe = true) @@ -353,12 +377,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest { for ((key, request) <- requestKeyToRequest) { removeAllAcls - val resources = RequestKeysToAcls(key).map(_._1.resourceType).toSet + val resources = requestKeysToAcls(key).map(_._1.resourceType).toSet sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false, isAuthorizedTopicDescribe = false, topicExists = false) - val resourceToAcls = RequestKeysToAcls(key) + val resourceToAcls = requestKeysToAcls(key) resourceToAcls.get(topicResource).map { acls => - val describeAcls = TopicDescribeAcl(topicResource) + val describeAcls = topicDescribeAcl(topicResource) val isAuthorized = describeAcls == acls addAndVerifyAcls(describeAcls, topicResource) sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized, isAuthorizedTopicDescribe = true, topicExists = false) @@ -427,7 +451,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { sendRecords(numRecords, topicPartition) } - @Test(expected = classOf[AuthorizationException]) + @Test(expected = classOf[GroupAuthorizationException]) def testConsumeWithNoAccess(): Unit = { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) sendRecords(1, tp) @@ -666,8 +690,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val topicPartition = new TopicPartition(newTopic, 0) val newTopicResource = new Resource(Topic, newTopic) addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), newTopicResource) - addAndVerifyAcls(GroupReadAcl(groupResource), groupResource) - addAndVerifyAcls(ClusterAcl(Resource.ClusterResource), Resource.ClusterResource) + addAndVerifyAcls(groupReadAcl(groupResource), groupResource) + addAndVerifyAcls(clusterAcl(Resource.ClusterResource), Resource.ClusterResource) try { this.consumers.head.assign(List(topicPartition).asJava) consumeRecords(this.consumers.head) @@ -933,9 +957,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest { isAuthorizedTopicDescribe: Boolean, topicExists: Boolean = true): AbstractResponse = { val resp = connectAndSend(request, apiKey) - val response = RequestKeyToResponseDeserializer(apiKey).getMethod("parse", classOf[ByteBuffer], classOf[Short]).invoke( + val response = requestKeyToResponseDeserializer(apiKey).getMethod("parse", classOf[ByteBuffer], classOf[Short]).invoke( null, resp, request.version: java.lang.Short).asInstanceOf[AbstractResponse] - val error = RequestKeyToError(apiKey).asInstanceOf[(AbstractResponse) => Errors](response) + val error = requestKeyToError(apiKey).asInstanceOf[(AbstractResponse) => Errors](response) val authorizationErrors = resources.flatMap { resourceType => if (resourceType == Topic) { http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala index f381b15..81f5c27 100644 --- a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala @@ -22,12 +22,13 @@ import java.util.concurrent.ExecutionException import org.apache.kafka.common.utils.Utils import kafka.integration.KafkaServerTestHarness -import kafka.server.KafkaConfig +import kafka.log.LogConfig +import kafka.server.{Defaults, KafkaConfig} import org.apache.kafka.clients.admin._ import kafka.utils.{Logging, TestUtils} import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.common.KafkaFuture -import org.apache.kafka.common.errors.{SecurityDisabledException, TopicExistsException} +import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException, TopicExistsException} import org.apache.kafka.common.protocol.ApiKeys import org.junit.{After, Before, Rule, Test} import org.apache.kafka.common.requests.MetadataResponse @@ -104,13 +105,12 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin def testListNodes(): Unit = { client = AdminClient.create(createConfig()) val brokerStrs = brokerList.split(",").toList.sorted - var nodeStrs : List[String] = null + var nodeStrs: List[String] = null do { - var nodes = client.describeCluster().nodes().get().asScala + val nodes = client.describeCluster().nodes().get().asScala nodeStrs = nodes.map ( node => s"${node.host}:${node.port}" ).toList.sorted } while (nodeStrs.size < brokerStrs.size) assertEquals(brokerStrs.mkString(","), nodeStrs.mkString(",")) - client.close() } @Test @@ -153,7 +153,213 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin assertTrue(s"Unknown host:port pair $hostStr in brokerVersionInfos", brokers.contains(hostStr)) assertEquals(1, brokerVersionInfo.usableVersion(ApiKeys.API_VERSIONS)) } - client.close() + } + + @Test + def testDescribeAndAlterConfigs(): Unit = { + client = AdminClient.create(createConfig) + + // Create topics + val topic1 = "describe-alter-configs-topic-1" + val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1) + val topicConfig1 = new Properties + topicConfig1.setProperty(LogConfig.MaxMessageBytesProp, "500000") + topicConfig1.setProperty(LogConfig.RetentionMsProp, "60000000") + TestUtils.createTopic(zkUtils, topic1, 1, 1, servers, topicConfig1) + + val topic2 = "describe-alter-configs-topic-2" + val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2) + TestUtils.createTopic(zkUtils, topic2, 1, 1, servers, new Properties) + + // Describe topics and broker + val brokerResource1 = new ConfigResource(ConfigResource.Type.BROKER, servers(1).config.brokerId.toString) + val brokerResource2 = new ConfigResource(ConfigResource.Type.BROKER, servers(2).config.brokerId.toString) + val configResources = Seq(topicResource1, topicResource2, brokerResource1, brokerResource2) + var describeResult = client.describeConfigs(configResources.asJava) + var configs = describeResult.all.get + + assertEquals(4, configs.size) + + val maxMessageBytes1 = configs.get(topicResource1).get(LogConfig.MaxMessageBytesProp) + assertEquals(LogConfig.MaxMessageBytesProp, maxMessageBytes1.name) + assertEquals(topicConfig1.get(LogConfig.MaxMessageBytesProp), maxMessageBytes1.value) + assertFalse(maxMessageBytes1.isDefault) + assertFalse(maxMessageBytes1.isSensitive) + assertFalse(maxMessageBytes1.isReadOnly) + + assertEquals(topicConfig1.get(LogConfig.RetentionMsProp), + configs.get(topicResource1).get(LogConfig.RetentionMsProp).value) + + val maxMessageBytes2 = configs.get(topicResource2).get(LogConfig.MaxMessageBytesProp) + assertEquals(Defaults.MessageMaxBytes.toString, maxMessageBytes2.value) + assertEquals(LogConfig.MaxMessageBytesProp, maxMessageBytes2.name) + assertTrue(maxMessageBytes2.isDefault) + assertFalse(maxMessageBytes2.isSensitive) + assertFalse(maxMessageBytes2.isReadOnly) + + assertEquals(servers(1).config.values.size, configs.get(brokerResource1).entries.size) + assertEquals(servers(1).config.brokerId.toString, configs.get(brokerResource1).get(KafkaConfig.BrokerIdProp).value) + val listenerSecurityProtocolMap = configs.get(brokerResource1).get(KafkaConfig.ListenerSecurityProtocolMapProp) + assertEquals(servers(1).config.getString(KafkaConfig.ListenerSecurityProtocolMapProp), listenerSecurityProtocolMap.value) + assertEquals(KafkaConfig.ListenerSecurityProtocolMapProp, listenerSecurityProtocolMap.name) + assertFalse(listenerSecurityProtocolMap.isDefault) + assertFalse(listenerSecurityProtocolMap.isSensitive) + assertTrue(listenerSecurityProtocolMap.isReadOnly) + val truststorePassword = configs.get(brokerResource1).get(KafkaConfig.SslTruststorePasswordProp) + assertEquals(KafkaConfig.SslTruststorePasswordProp, truststorePassword.name) + assertNull(truststorePassword.value) + assertFalse(truststorePassword.isDefault) + assertTrue(truststorePassword.isSensitive) + assertTrue(truststorePassword.isReadOnly) + val compressionType = configs.get(brokerResource1).get(KafkaConfig.CompressionTypeProp) + assertEquals(servers(1).config.compressionType.toString, compressionType.value) + assertEquals(KafkaConfig.CompressionTypeProp, compressionType.name) + assertTrue(compressionType.isDefault) + assertFalse(compressionType.isSensitive) + assertTrue(compressionType.isReadOnly) + + assertEquals(servers(2).config.values.size, configs.get(brokerResource2).entries.size) + assertEquals(servers(2).config.brokerId.toString, configs.get(brokerResource2).get(KafkaConfig.BrokerIdProp).value) + assertEquals(servers(2).config.logCleanerThreads.toString, + configs.get(brokerResource2).get(KafkaConfig.LogCleanerThreadsProp).value) + + // Alter topics + var topicConfigEntries1 = Seq( + new ConfigEntry(LogConfig.FlushMsProp, "1000") + ).asJava + + var topicConfigEntries2 = Seq( + new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.9"), + new ConfigEntry(LogConfig.CompressionTypeProp, "lz4") + ).asJava + + var alterResult = client.alterConfigs(Map( + topicResource1 -> new Config(topicConfigEntries1), + topicResource2 -> new Config(topicConfigEntries2) + ).asJava) + + assertEquals(Set(topicResource1, topicResource2).asJava, alterResult.results.keySet) + alterResult.all.get + + // Verify that topics were updated correctly + describeResult = client.describeConfigs(Seq(topicResource1, topicResource2).asJava) + configs = describeResult.all.get + + assertEquals(2, configs.size) + + assertEquals("1000", configs.get(topicResource1).get(LogConfig.FlushMsProp).value) + assertEquals(Defaults.MessageMaxBytes.toString, + configs.get(topicResource1).get(LogConfig.MaxMessageBytesProp).value) + assertEquals((Defaults.LogRetentionHours * 60 * 60 * 1000).toString, + configs.get(topicResource1).get(LogConfig.RetentionMsProp).value) + + assertEquals("0.9", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value) + assertEquals("lz4", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value) + + // Alter topics with validateOnly=true + topicConfigEntries1 = Seq( + new ConfigEntry(LogConfig.MaxMessageBytesProp, "10") + ).asJava + + topicConfigEntries2 = Seq( + new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.3") + ).asJava + + alterResult = client.alterConfigs(Map( + topicResource1 -> new Config(topicConfigEntries1), + topicResource2 -> new Config(topicConfigEntries2) + ).asJava, new AlterConfigsOptions().validateOnly(true)) + + assertEquals(Set(topicResource1, topicResource2).asJava, alterResult.results.keySet) + alterResult.all.get + + // Verify that topics were not updated due to validateOnly = true + describeResult = client.describeConfigs(Seq(topicResource1, topicResource2).asJava) + configs = describeResult.all.get + + assertEquals(2, configs.size) + + assertEquals(Defaults.MessageMaxBytes.toString, + configs.get(topicResource1).get(LogConfig.MaxMessageBytesProp).value) + assertEquals("0.9", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value) + } + + @Test + def testInvalidAlterConfigs(): Unit = { + client = AdminClient.create(createConfig) + + // Create topics + val topic1 = "invalid-alter-configs-topic-1" + val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1) + TestUtils.createTopic(zkUtils, topic1, 1, 1, servers, new Properties()) + + val topic2 = "invalid-alter-configs-topic-2" + val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2) + TestUtils.createTopic(zkUtils, topic2, 1, 1, servers, new Properties) + + val topicConfigEntries1 = Seq( + new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "1.1"), // this value is invalid as it's above 1.0 + new ConfigEntry(LogConfig.CompressionTypeProp, "lz4") + ).asJava + + var topicConfigEntries2 = Seq(new ConfigEntry(LogConfig.CompressionTypeProp, "snappy")).asJava + + val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, servers.head.config.brokerId.toString) + val brokerConfigEntries = Seq(new ConfigEntry(KafkaConfig.CompressionTypeProp, "gzip")).asJava + + // Alter configs: first and third are invalid, second is valid + var alterResult = client.alterConfigs(Map( + topicResource1 -> new Config(topicConfigEntries1), + topicResource2 -> new Config(topicConfigEntries2), + brokerResource -> new Config(brokerConfigEntries) + ).asJava) + + assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.results.keySet) + assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException]) + alterResult.results.get(topicResource2).get + assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException]) + + // Verify that first and third resources were not updated and second was updated + var describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, brokerResource).asJava) + var configs = describeResult.all.get + assertEquals(3, configs.size) + + assertEquals(Defaults.LogCleanerMinCleanRatio.toString, + configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value) + assertEquals(Defaults.CompressionType.toString, + configs.get(topicResource1).get(LogConfig.CompressionTypeProp).value) + + assertEquals("snappy", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value) + + assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(LogConfig.CompressionTypeProp).value) + + // Alter configs with validateOnly = true: first and third are invalid, second is valid + topicConfigEntries2 = Seq(new ConfigEntry(LogConfig.CompressionTypeProp, "gzip")).asJava + + alterResult = client.alterConfigs(Map( + topicResource1 -> new Config(topicConfigEntries1), + topicResource2 -> new Config(topicConfigEntries2), + brokerResource -> new Config(brokerConfigEntries) + ).asJava, new AlterConfigsOptions().validateOnly(true)) + + assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.results.keySet) + assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException]) + alterResult.results.get(topicResource2).get + assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException]) + + // Verify that no resources are updated since validate_only = true + describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, brokerResource).asJava) + configs = describeResult.all.get + assertEquals(3, configs.size) + + assertEquals(Defaults.LogCleanerMinCleanRatio.toString, + configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value) + assertEquals(Defaults.CompressionType.toString, + configs.get(topicResource1).get(LogConfig.CompressionTypeProp).value) + + assertEquals("snappy", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value) + + assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(LogConfig.CompressionTypeProp).value) } val ACL1 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"), @@ -183,7 +389,11 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin config.remove(KafkaConfig.InterBrokerSecurityProtocolProp) config.setProperty(KafkaConfig.InterBrokerListenerNameProp, listenerName.value) config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"${listenerName.value}:${securityProtocol.name}") - config.setProperty(KafkaConfig.DeleteTopicEnableProp, "true"); + config.setProperty(KafkaConfig.DeleteTopicEnableProp, "true") + // We set this in order to test that we don't expose sensitive data via describe configs. This will already be + // set for subclasses with security enabled and we don't want to overwrite it. + if (!config.containsKey(KafkaConfig.SslTruststorePasswordProp)) + config.setProperty(KafkaConfig.SslTruststorePasswordProp, "some.invalid.pass") } cfgs.foreach(_.putAll(serverConfig)) cfgs.map(KafkaConfig.fromProps) http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala index 6d89d4f..f5b0a06 100644 --- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala @@ -35,17 +35,22 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { private val TopicResources = Set(new Resource(Topic, "test-1"), new Resource(Topic, "test-2")) private val GroupResources = Set(new Resource(Group, "testGroup-1"), new Resource(Group, "testGroup-2")) + private val BrokerResources = Set(new Resource(Broker, "0"), new Resource(Broker, "1")) private val ResourceToCommand = Map[Set[Resource], Array[String]]( TopicResources -> Array("--topic", "test-1", "--topic", "test-2"), Set(Resource.ClusterResource) -> Array("--cluster"), - GroupResources -> Array("--group", "testGroup-1", "--group", "testGroup-2") + GroupResources -> Array("--group", "testGroup-1", "--group", "testGroup-2"), + BrokerResources -> Array("--broker", "0", "--broker", "1") ) private val ResourceToOperations = Map[Set[Resource], (Set[Operation], Array[String])]( - TopicResources -> (Set(Read, Write, Describe, Delete), Array("--operation", "Read" , "--operation", "Write", "--operation", "Describe", "--operation", "Delete")), + TopicResources -> (Set(Read, Write, Describe, Delete, DescribeConfigs, AlterConfigs), + Array("--operation", "Read" , "--operation", "Write", "--operation", "Describe", "--operation", "Delete", + "--operation", "DescribeConfigs", "--operation", "AlterConfigs")), Set(Resource.ClusterResource) -> (Set(Create, ClusterAction), Array("--operation", "Create", "--operation", "ClusterAction")), - GroupResources -> (Set(Read, Describe), Array("--operation", "Read", "--operation", "Describe")) + GroupResources -> (Set(Read, Describe), Array("--operation", "Read", "--operation", "Describe")), + BrokerResources -> (Set(DescribeConfigs), Array("--operation", "DescribeConfigs")) ) private val ProducerResourceToAcls = Map[Set[Resource], Set[Acl]]( http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala index b82ddf9..8e6b11d 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala @@ -23,7 +23,7 @@ import kafka.network.SocketServer import kafka.utils.TestUtils import org.apache.kafka.common.protocol.types.Struct import org.apache.kafka.common.protocol.{ApiKeys, Errors} -import org.apache.kafka.common.requests.{CreateTopicsRequest, CreateTopicsResponse, MetadataRequest, MetadataResponse} +import org.apache.kafka.common.requests.{ApiError, CreateTopicsRequest, CreateTopicsResponse, MetadataRequest, MetadataResponse} import org.junit.Assert.{assertEquals, assertFalse, assertNotNull, assertTrue} import scala.collection.JavaConverters._ @@ -79,8 +79,8 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest { } } - protected def error(error: Errors, errorMessage: Option[String] = None): CreateTopicsResponse.Error = - new CreateTopicsResponse.Error(error, errorMessage.orNull) + protected def error(error: Errors, errorMessage: Option[String] = None): ApiError = + new ApiError(error, errorMessage.orNull) protected def toStructWithDuplicateFirstTopic(request: CreateTopicsRequest): Struct = { val struct = request.toStruct @@ -101,7 +101,7 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest { } protected def validateErrorCreateTopicsRequests(request: CreateTopicsRequest, - expectedResponse: Map[String, CreateTopicsResponse.Error], + expectedResponse: Map[String, ApiError], checkErrorMessage: Boolean = true, requestStruct: Option[Struct] = None): Unit = { val response = requestStruct.map(sendCreateTopicRequestStruct(_, request.version)).getOrElse( http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 5cb6f71..7e50049 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -20,6 +20,7 @@ import java.util.{Collections, LinkedHashMap, Properties} import java.util.concurrent.{Executors, Future, TimeUnit} import kafka.admin.AdminUtils +import kafka.log.LogConfig import kafka.network.RequestChannel.Session import kafka.security.auth._ import kafka.utils.TestUtils @@ -30,9 +31,8 @@ import org.apache.kafka.common.network.{Authenticator, ListenerName, TransportLa import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol} import org.apache.kafka.common.protocol.types.Struct import org.apache.kafka.common.record._ -import org.apache.kafka.common.requests import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation -import org.apache.kafka.common.requests._ +import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _} import org.apache.kafka.common.security.auth.{DefaultPrincipalBuilder, KafkaPrincipal} import org.junit.Assert._ import org.junit.{After, Before, Test} @@ -277,7 +277,17 @@ class RequestQuotaTest extends BaseRequestTest { new ResourceFilter(AdminResourceType.TOPIC, null), new AccessControlEntryFilter("User:ANONYMOUS", "*", AclOperation.ANY, AclPermissionType.DENY)))) - case key => + case ApiKeys.DESCRIBE_CONFIGS => + new DescribeConfigsRequest.Builder(Collections.singleton(new RResource(RResourceType.TOPIC, tp.topic))) + + case ApiKeys.ALTER_CONFIGS => + new AlterConfigsRequest.Builder( + Collections.singletonMap(new RResource(RResourceType.TOPIC, tp.topic), + new AlterConfigsRequest.Config(Collections.singleton( + new AlterConfigsRequest.ConfigEntry(LogConfig.MaxMessageBytesProp, "1000000") + ))), true) + + case _ => throw new IllegalArgumentException("Unsupported API key " + apiKey) } } @@ -366,6 +376,8 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.DESCRIBE_ACLS => new DescribeAclsResponse(response).throttleTimeMs case ApiKeys.CREATE_ACLS => new CreateAclsResponse(response).throttleTimeMs case ApiKeys.DELETE_ACLS => new DeleteAclsResponse(response).throttleTimeMs + case ApiKeys.DESCRIBE_CONFIGS => new DescribeConfigsResponse(response).throttleTimeMs + case ApiKeys.ALTER_CONFIGS => new AlterConfigsResponse(response).throttleTimeMs case requestId => throw new IllegalArgumentException(s"No throttle time for $requestId") } } http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java index 96a56b4..47ca8ae 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java @@ -38,6 +38,7 @@ import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.network.Selector; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.requests.ApiVersionsRequest; import org.apache.kafka.common.requests.ApiVersionsResponse; import org.apache.kafka.common.requests.CreateTopicsRequest; @@ -182,7 +183,7 @@ public class StreamsKafkaClient { final CreateTopicsResponse createTopicsResponse = (CreateTopicsResponse) clientResponse.responseBody(); for (InternalTopicConfig internalTopicConfig : topicsMap.keySet()) { - CreateTopicsResponse.Error error = createTopicsResponse.errors().get(internalTopicConfig.name()); + ApiError error = createTopicsResponse.errors().get(internalTopicConfig.name()); if (!error.is(Errors.NONE) && !error.is(Errors.TOPIC_ALREADY_EXISTS)) { throw new StreamsException("Could not create topic: " + internalTopicConfig.name() + " due to " + error.messageWithFallback()); }
