This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new db6cfec8 KAFKA-13051; Require principal builders implement 
`KafkaPrincipalSerde` and set default (#11011)
db6cfec8 is described below

commit db6cfec837d4843eb8cfa38abfe17d5375e27e52
Author: Ryan Dielhenn <[email protected]>
AuthorDate: Tue Jul 13 10:54:36 2021 -0700

    KAFKA-13051; Require principal builders implement `KafkaPrincipalSerde` and 
set default (#11011)
    
    This patch adds a check to ensure that principal builder implementations 
implement `KafkaPrincipalSerde` as specified in KIP-590: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-590%3A+Redirect+Zookeeper+Mutation+Protocols+to+The+Controller.
 This patch also changes the default value of `principal.builder.class` to 
`DefaultKafkaPrincipalBuilder`, which was already the implicit behavior when no 
principal builder was specified.
    
    Reviewers: Ismael Juma <[email protected]>, Jason Gustafson 
<[email protected]>
---
 core/src/main/scala/kafka/server/KafkaConfig.scala             | 10 +++++++++-
 .../integration/kafka/api/AuthorizerIntegrationTest.scala      |  5 +++--
 .../integration/kafka/api/GroupAuthorizerIntegrationTest.scala |  5 +++--
 .../integration/kafka/api/GroupEndToEndAuthorizationTest.scala |  5 +++--
 .../kafka/api/PlaintextEndToEndAuthorizationTest.scala         |  5 +++--
 .../kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala      |  3 ++-
 .../integration/kafka/api/SslEndToEndAuthorizationTest.scala   |  3 ++-
 .../kafka/server/AlterUserScramCredentialsRequestTest.scala    |  7 ++++---
 .../scala/unit/kafka/server/ControllerMutationQuotaTest.scala  |  4 ++--
 .../kafka/server/DescribeUserScramCredentialsRequestTest.scala |  7 ++++---
 docs/upgrade.html                                              |  2 ++
 11 files changed, 37 insertions(+), 19 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index cada670..634bb4d 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -39,7 +39,9 @@ import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.metrics.Sensor
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.record.{LegacyRecord, Records, TimestampType}
+import org.apache.kafka.common.security.auth.KafkaPrincipalSerde;
 import org.apache.kafka.common.security.auth.SecurityProtocol
+import 
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.raft.RaftConfig
 import org.apache.kafka.server.authorizer.Authorizer
@@ -247,6 +249,7 @@ object Defaults {
 
     /** ********* General Security configuration ***********/
   val ConnectionsMaxReauthMsDefault = 0L
+  val DefaultPrincipalSerde = classOf[DefaultKafkaPrincipalBuilder]
 
   /** ********* Sasl configuration ***********/
   val SaslMechanismInterBrokerProtocol = SaslConfigs.DEFAULT_SASL_MECHANISM
@@ -1235,7 +1238,7 @@ object KafkaConfig {
       .define(securityProviderClassProp, STRING, null, LOW, 
securityProviderClassDoc)
 
       /** ********* SSL Configuration ****************/
-      .define(PrincipalBuilderClassProp, CLASS, null, MEDIUM, 
PrincipalBuilderClassDoc)
+      .define(PrincipalBuilderClassProp, CLASS, 
Defaults.DefaultPrincipalSerde, MEDIUM, PrincipalBuilderClassDoc)
       .define(SslProtocolProp, STRING, Defaults.SslProtocol, MEDIUM, 
SslProtocolDoc)
       .define(SslProviderProp, STRING, null, MEDIUM, SslProviderDoc)
       .define(SslEnabledProtocolsProp, LIST, Defaults.SslEnabledProtocols, 
MEDIUM, SslEnabledProtocolsDoc)
@@ -1971,5 +1974,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
         
s"${KafkaConfig.FailedAuthenticationDelayMsProp}=$failedAuthenticationDelayMs 
should always be less than" +
         s" ${KafkaConfig.ConnectionsMaxIdleMsProp}=$connectionsMaxIdleMs to 
prevent failed" +
         s" authentication responses from timing out")
+
+    val principalBuilderClass = getClass(KafkaConfig.PrincipalBuilderClassProp)
+    require(principalBuilderClass != null, 
s"${KafkaConfig.PrincipalBuilderClassProp} must be non-null")
+    
require(classOf[KafkaPrincipalSerde].isAssignableFrom(principalBuilderClass), 
+      s"${KafkaConfig.PrincipalBuilderClassProp} must implement 
KafkaPrincipalSerde")
   }
 }
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 36cdc51..1e41fcf 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -54,7 +54,8 @@ import org.apache.kafka.common.requests._
 import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
 import org.apache.kafka.common.resource.ResourceType._
 import org.apache.kafka.common.resource.{PatternType, Resource, 
ResourcePattern, ResourcePatternFilter, ResourceType}
-import org.apache.kafka.common.security.auth.{AuthenticationContext, 
KafkaPrincipal, KafkaPrincipalBuilder, SecurityProtocol}
+import org.apache.kafka.common.security.auth.{AuthenticationContext, 
KafkaPrincipal, SecurityProtocol}
+import 
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, 
TopicPartition, Uuid, requests}
 import org.apache.kafka.test.{TestUtils => JTestUtils}
@@ -76,7 +77,7 @@ object AuthorizerIntegrationTest {
   val BrokerListenerName = "BROKER"
   val ClientListenerName = "CLIENT"
 
-  class PrincipalBuilder extends KafkaPrincipalBuilder {
+  class PrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
     override def build(context: AuthenticationContext): KafkaPrincipal = {
       context.listenerName match {
         case BrokerListenerName => BrokerPrincipal
diff --git 
a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
index 2b3ae1c..4f76735 100644
--- 
a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
@@ -27,7 +27,8 @@ import 
org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 import org.apache.kafka.common.errors.TopicAuthorizationException
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.resource.{PatternType, Resource, 
ResourcePattern, ResourceType}
-import org.apache.kafka.common.security.auth.{AuthenticationContext, 
KafkaPrincipal, KafkaPrincipalBuilder}
+import org.apache.kafka.common.security.auth.{AuthenticationContext, 
KafkaPrincipal}
+import 
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{BeforeEach, Test}
 
@@ -40,7 +41,7 @@ object GroupAuthorizerIntegrationTest {
   val BrokerListenerName = "BROKER"
   val ClientListenerName = "CLIENT"
 
-  class GroupPrincipalBuilder extends KafkaPrincipalBuilder {
+  class GroupPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) 
{
     override def build(context: AuthenticationContext): KafkaPrincipal = {
       context.listenerName match {
         case BrokerListenerName => BrokerPrincipal
diff --git 
a/core/src/test/scala/integration/kafka/api/GroupEndToEndAuthorizationTest.scala
 
b/core/src/test/scala/integration/kafka/api/GroupEndToEndAuthorizationTest.scala
index 05243f0..8bd6393 100644
--- 
a/core/src/test/scala/integration/kafka/api/GroupEndToEndAuthorizationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/GroupEndToEndAuthorizationTest.scala
@@ -19,12 +19,13 @@ package kafka.api
 import kafka.api.GroupEndToEndAuthorizationTest._
 import kafka.utils.JaasTestUtils
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
-import org.apache.kafka.common.security.auth.{AuthenticationContext, 
KafkaPrincipal, KafkaPrincipalBuilder, SaslAuthenticationContext}
+import org.apache.kafka.common.security.auth.{AuthenticationContext, 
KafkaPrincipal, SaslAuthenticationContext}
+import 
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
 
 object GroupEndToEndAuthorizationTest {
   val GroupPrincipalType = "Group"
   val ClientGroup = "testGroup"
-  class GroupPrincipalBuilder extends KafkaPrincipalBuilder {
+  class GroupPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) 
{
     override def build(context: AuthenticationContext): KafkaPrincipal = {
       context match {
         case ctx: SaslAuthenticationContext =>
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
 
b/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
index 842c4ad..c44028c 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
@@ -19,6 +19,7 @@ package kafka.api
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth._
+import 
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
 import org.junit.jupiter.api.{BeforeEach, Test}
 import org.junit.jupiter.api.Assertions._
 import org.apache.kafka.common.errors.TopicAuthorizationException
@@ -30,7 +31,7 @@ object PlaintextEndToEndAuthorizationTest {
   private var clientListenerName = None: Option[String]
   @volatile
   private var serverListenerName = None: Option[String]
-  class TestClientPrincipalBuilder extends KafkaPrincipalBuilder {
+  class TestClientPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, 
null) {
     override def build(context: AuthenticationContext): KafkaPrincipal = {
       clientListenerName = Some(context.listenerName)
       context match {
@@ -42,7 +43,7 @@ object PlaintextEndToEndAuthorizationTest {
     }
   }
 
-  class TestServerPrincipalBuilder extends KafkaPrincipalBuilder {
+  class TestServerPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, 
null) {
     override def build(context: AuthenticationContext): KafkaPrincipal = {
       serverListenerName = Some(context.listenerName)
       context match {
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
index 2e5411d..7727803 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
@@ -31,13 +31,14 @@ import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 import org.apache.kafka.common.network.Mode
 import org.apache.kafka.common.security.auth._
+import 
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
 import org.apache.kafka.common.security.plain.PlainAuthenticateCallback
 import org.junit.jupiter.api.Assertions.assertTrue
 import org.junit.jupiter.api.Test
 
 object SaslPlainSslEndToEndAuthorizationTest {
 
-  class TestPrincipalBuilder extends KafkaPrincipalBuilder {
+  class TestPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
 
     override def build(context: AuthenticationContext): KafkaPrincipal = {
       val saslContext = context.asInstanceOf[SaslAuthenticationContext]
diff --git 
a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala 
b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
index a47ac0b..65bef41 100644
--- 
a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
@@ -24,11 +24,12 @@ import org.apache.kafka.common.config.SslConfigs
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 import org.apache.kafka.common.network.Mode
 import org.apache.kafka.common.security.auth._
+import 
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
 import org.apache.kafka.common.utils.Java
 import org.junit.jupiter.api.BeforeEach
 
 object SslEndToEndAuthorizationTest {
-  class TestPrincipalBuilder extends KafkaPrincipalBuilder {
+  class TestPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
     private val Pattern = "O=A (.*?),CN=(.*?)".r
 
     // Use full DN as client principal to test special characters in principal
diff --git 
a/core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala
 
b/core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala
index 99d40ba..e0121b1 100644
--- 
a/core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala
@@ -28,7 +28,8 @@ import 
org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData.
 import org.apache.kafka.common.message.{AlterUserScramCredentialsRequestData, 
DescribeUserScramCredentialsRequestData}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{AlterUserScramCredentialsRequest, 
AlterUserScramCredentialsResponse, DescribeUserScramCredentialsRequest, 
DescribeUserScramCredentialsResponse}
-import org.apache.kafka.common.security.auth.{AuthenticationContext, 
KafkaPrincipal, KafkaPrincipalBuilder}
+import org.apache.kafka.common.security.auth.{AuthenticationContext, 
KafkaPrincipal}
+import 
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
 import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, 
AuthorizationResult}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
@@ -383,13 +384,13 @@ object AlterCredentialsTest {
     }
   }
 
-  class TestPrincipalBuilderReturningAuthorized extends KafkaPrincipalBuilder {
+  class TestPrincipalBuilderReturningAuthorized extends 
DefaultKafkaPrincipalBuilder(null, null) {
     override def build(context: AuthenticationContext): KafkaPrincipal = {
       AuthorizedPrincipal
     }
   }
 
-  class TestPrincipalBuilderReturningUnauthorized extends 
KafkaPrincipalBuilder {
+  class TestPrincipalBuilderReturningUnauthorized extends 
DefaultKafkaPrincipalBuilder(null, null) {
     override def build(context: AuthenticationContext): KafkaPrincipal = {
       UnauthorizedPrincipal
     }
diff --git 
a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
index 6465028..20e89ed 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
@@ -40,7 +40,7 @@ import org.apache.kafka.common.requests.DeleteTopicsRequest
 import org.apache.kafka.common.requests.DeleteTopicsResponse
 import org.apache.kafka.common.security.auth.AuthenticationContext
 import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder
+import 
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
 import org.apache.kafka.test.{TestUtils => JTestUtils}
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.Assertions.assertTrue
@@ -53,7 +53,7 @@ import scala.jdk.CollectionConverters._
 object ControllerMutationQuotaTest {
   // Principal used for all client connections. This is updated by each test.
   var principal = KafkaPrincipal.ANONYMOUS
-  class TestPrincipalBuilder extends KafkaPrincipalBuilder {
+  class TestPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
     override def build(context: AuthenticationContext): KafkaPrincipal = {
       principal
     }
diff --git 
a/core/src/test/scala/unit/kafka/server/DescribeUserScramCredentialsRequestTest.scala
 
b/core/src/test/scala/unit/kafka/server/DescribeUserScramCredentialsRequestTest.scala
index e4f28e2..012f833 100644
--- 
a/core/src/test/scala/unit/kafka/server/DescribeUserScramCredentialsRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/DescribeUserScramCredentialsRequestTest.scala
@@ -24,7 +24,8 @@ import 
org.apache.kafka.common.message.{DescribeUserScramCredentialsRequestData,
 import 
org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData.UserName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{DescribeUserScramCredentialsRequest, 
DescribeUserScramCredentialsResponse}
-import org.apache.kafka.common.security.auth.{AuthenticationContext, 
KafkaPrincipal, KafkaPrincipalBuilder}
+import org.apache.kafka.common.security.auth.{AuthenticationContext, 
KafkaPrincipal}
+import 
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
 import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, 
AuthorizationResult}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
@@ -125,13 +126,13 @@ object DescribeCredentialsTest {
     }
   }
 
-  class TestPrincipalBuilderReturningAuthorized extends KafkaPrincipalBuilder {
+  class TestPrincipalBuilderReturningAuthorized extends 
DefaultKafkaPrincipalBuilder(null, null) {
     override def build(context: AuthenticationContext): KafkaPrincipal = {
       AuthorizedPrincipal
     }
   }
 
-  class TestPrincipalBuilderReturningUnauthorized extends 
KafkaPrincipalBuilder {
+  class TestPrincipalBuilderReturningUnauthorized extends 
DefaultKafkaPrincipalBuilder(null, null) {
     override def build(context: AuthenticationContext): KafkaPrincipal = {
       UnauthorizedPrincipal
     }
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 5a9555e..c0d8eab 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -32,6 +32,8 @@
         For a complete list of removed APIs compare the detailed Kafka Streams 
upgrade notes.</li>
     <li>Kafka Streams no longer has a compile time dependency on 
"connect:json" module (<a 
href="https://issues.apache.org/jira/browse/KAFKA-5146";>KAFKA-5146</a>).
         Projects that were relying on this transitive dependency will have to 
explicitly declare it.</li>
+    <li>Custom principal builder implementations specified through 
<code>principal.builder.class</code> must now implement the 
+        <code>KafkaPrincipalSerde</code> interface to allow for forwarding 
between brokers. See <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-590%3A+Redirect+Zookeeper+Mutation+Protocols+to+The+Controller";>KIP-590</a>
 for more details about the usage of KafkaPrincipalSerde.</li>
     <li>A number of deprecated classes, methods and tools have been removed 
from the <code>clients</code>, <code>connect</code>, <code>core</code> and 
<code>tools</code> modules:</li>
     <ul>
         <li>The Scala <code>Authorizer</code>, 
<code>SimpleAclAuthorizer</code> and related classes have been removed. Please 
use the Java <code>Authorizer</code>

Reply via email to