This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new db6cfec8 KAFKA-13051; Require principal builders implement
`KafkaPrincipalSerde` and set default (#11011)
db6cfec8 is described below
commit db6cfec837d4843eb8cfa38abfe17d5375e27e52
Author: Ryan Dielhenn <[email protected]>
AuthorDate: Tue Jul 13 10:54:36 2021 -0700
KAFKA-13051; Require principal builders implement `KafkaPrincipalSerde` and
set default (#11011)
This patch adds a check to ensure that principal builder implementations
implement `KafkaPrincipalSerde` as specified in KIP-590:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-590%3A+Redirect+Zookeeper+Mutation+Protocols+to+The+Controller.
This patch also changes the default value of `principal.builder.class` to
`DefaultKafkaPrincipalBuilder`, which was already the implicit behavior when no
principal builder was specified.
Reviewers: Ismael Juma <[email protected]>, Jason Gustafson
<[email protected]>
---
core/src/main/scala/kafka/server/KafkaConfig.scala | 10 +++++++++-
.../integration/kafka/api/AuthorizerIntegrationTest.scala | 5 +++--
.../integration/kafka/api/GroupAuthorizerIntegrationTest.scala | 5 +++--
.../integration/kafka/api/GroupEndToEndAuthorizationTest.scala | 5 +++--
.../kafka/api/PlaintextEndToEndAuthorizationTest.scala | 5 +++--
.../kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala | 3 ++-
.../integration/kafka/api/SslEndToEndAuthorizationTest.scala | 3 ++-
.../kafka/server/AlterUserScramCredentialsRequestTest.scala | 7 ++++---
.../scala/unit/kafka/server/ControllerMutationQuotaTest.scala | 4 ++--
.../kafka/server/DescribeUserScramCredentialsRequestTest.scala | 7 ++++---
docs/upgrade.html | 2 ++
11 files changed, 37 insertions(+), 19 deletions(-)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index cada670..634bb4d 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -39,7 +39,9 @@ import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.metrics.Sensor
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.record.{LegacyRecord, Records, TimestampType}
+import org.apache.kafka.common.security.auth.KafkaPrincipalSerde;
import org.apache.kafka.common.security.auth.SecurityProtocol
+import
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.authorizer.Authorizer
@@ -247,6 +249,7 @@ object Defaults {
/** ********* General Security configuration ***********/
val ConnectionsMaxReauthMsDefault = 0L
+ val DefaultPrincipalSerde = classOf[DefaultKafkaPrincipalBuilder]
/** ********* Sasl configuration ***********/
val SaslMechanismInterBrokerProtocol = SaslConfigs.DEFAULT_SASL_MECHANISM
@@ -1235,7 +1238,7 @@ object KafkaConfig {
.define(securityProviderClassProp, STRING, null, LOW,
securityProviderClassDoc)
/** ********* SSL Configuration ****************/
- .define(PrincipalBuilderClassProp, CLASS, null, MEDIUM,
PrincipalBuilderClassDoc)
+ .define(PrincipalBuilderClassProp, CLASS,
Defaults.DefaultPrincipalSerde, MEDIUM, PrincipalBuilderClassDoc)
.define(SslProtocolProp, STRING, Defaults.SslProtocol, MEDIUM,
SslProtocolDoc)
.define(SslProviderProp, STRING, null, MEDIUM, SslProviderDoc)
.define(SslEnabledProtocolsProp, LIST, Defaults.SslEnabledProtocols,
MEDIUM, SslEnabledProtocolsDoc)
@@ -1971,5 +1974,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog:
Boolean, dynamicConfigO
s"${KafkaConfig.FailedAuthenticationDelayMsProp}=$failedAuthenticationDelayMs
should always be less than" +
s" ${KafkaConfig.ConnectionsMaxIdleMsProp}=$connectionsMaxIdleMs to
prevent failed" +
s" authentication responses from timing out")
+
+ val principalBuilderClass = getClass(KafkaConfig.PrincipalBuilderClassProp)
+ require(principalBuilderClass != null,
s"${KafkaConfig.PrincipalBuilderClassProp} must be non-null")
+
require(classOf[KafkaPrincipalSerde].isAssignableFrom(principalBuilderClass),
+ s"${KafkaConfig.PrincipalBuilderClassProp} must implement
KafkaPrincipalSerde")
}
}
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 36cdc51..1e41fcf 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -54,7 +54,8 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
import org.apache.kafka.common.resource.ResourceType._
import org.apache.kafka.common.resource.{PatternType, Resource,
ResourcePattern, ResourcePatternFilter, ResourceType}
-import org.apache.kafka.common.security.auth.{AuthenticationContext,
KafkaPrincipal, KafkaPrincipalBuilder, SecurityProtocol}
+import org.apache.kafka.common.security.auth.{AuthenticationContext,
KafkaPrincipal, SecurityProtocol}
+import
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{ElectionType, IsolationLevel, Node,
TopicPartition, Uuid, requests}
import org.apache.kafka.test.{TestUtils => JTestUtils}
@@ -76,7 +77,7 @@ object AuthorizerIntegrationTest {
val BrokerListenerName = "BROKER"
val ClientListenerName = "CLIENT"
- class PrincipalBuilder extends KafkaPrincipalBuilder {
+ class PrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
override def build(context: AuthenticationContext): KafkaPrincipal = {
context.listenerName match {
case BrokerListenerName => BrokerPrincipal
diff --git
a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
index 2b3ae1c..4f76735 100644
---
a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
@@ -27,7 +27,8 @@ import
org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.errors.TopicAuthorizationException
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.resource.{PatternType, Resource,
ResourcePattern, ResourceType}
-import org.apache.kafka.common.security.auth.{AuthenticationContext,
KafkaPrincipal, KafkaPrincipalBuilder}
+import org.apache.kafka.common.security.auth.{AuthenticationContext,
KafkaPrincipal}
+import
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, Test}
@@ -40,7 +41,7 @@ object GroupAuthorizerIntegrationTest {
val BrokerListenerName = "BROKER"
val ClientListenerName = "CLIENT"
- class GroupPrincipalBuilder extends KafkaPrincipalBuilder {
+ class GroupPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null)
{
override def build(context: AuthenticationContext): KafkaPrincipal = {
context.listenerName match {
case BrokerListenerName => BrokerPrincipal
diff --git
a/core/src/test/scala/integration/kafka/api/GroupEndToEndAuthorizationTest.scala
b/core/src/test/scala/integration/kafka/api/GroupEndToEndAuthorizationTest.scala
index 05243f0..8bd6393 100644
---
a/core/src/test/scala/integration/kafka/api/GroupEndToEndAuthorizationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/GroupEndToEndAuthorizationTest.scala
@@ -19,12 +19,13 @@ package kafka.api
import kafka.api.GroupEndToEndAuthorizationTest._
import kafka.utils.JaasTestUtils
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
-import org.apache.kafka.common.security.auth.{AuthenticationContext,
KafkaPrincipal, KafkaPrincipalBuilder, SaslAuthenticationContext}
+import org.apache.kafka.common.security.auth.{AuthenticationContext,
KafkaPrincipal, SaslAuthenticationContext}
+import
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
object GroupEndToEndAuthorizationTest {
val GroupPrincipalType = "Group"
val ClientGroup = "testGroup"
- class GroupPrincipalBuilder extends KafkaPrincipalBuilder {
+ class GroupPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null)
{
override def build(context: AuthenticationContext): KafkaPrincipal = {
context match {
case ctx: SaslAuthenticationContext =>
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
index 842c4ad..c44028c 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
@@ -19,6 +19,7 @@ package kafka.api
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth._
+import
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.junit.jupiter.api.{BeforeEach, Test}
import org.junit.jupiter.api.Assertions._
import org.apache.kafka.common.errors.TopicAuthorizationException
@@ -30,7 +31,7 @@ object PlaintextEndToEndAuthorizationTest {
private var clientListenerName = None: Option[String]
@volatile
private var serverListenerName = None: Option[String]
- class TestClientPrincipalBuilder extends KafkaPrincipalBuilder {
+ class TestClientPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null,
null) {
override def build(context: AuthenticationContext): KafkaPrincipal = {
clientListenerName = Some(context.listenerName)
context match {
@@ -42,7 +43,7 @@ object PlaintextEndToEndAuthorizationTest {
}
}
- class TestServerPrincipalBuilder extends KafkaPrincipalBuilder {
+ class TestServerPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null,
null) {
override def build(context: AuthenticationContext): KafkaPrincipal = {
serverListenerName = Some(context.listenerName)
context match {
diff --git
a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
index 2e5411d..7727803 100644
---
a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
@@ -31,13 +31,14 @@ import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.network.Mode
import org.apache.kafka.common.security.auth._
+import
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.common.security.plain.PlainAuthenticateCallback
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
object SaslPlainSslEndToEndAuthorizationTest {
- class TestPrincipalBuilder extends KafkaPrincipalBuilder {
+ class TestPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
override def build(context: AuthenticationContext): KafkaPrincipal = {
val saslContext = context.asInstanceOf[SaslAuthenticationContext]
diff --git
a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
index a47ac0b..65bef41 100644
---
a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
@@ -24,11 +24,12 @@ import org.apache.kafka.common.config.SslConfigs
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.network.Mode
import org.apache.kafka.common.security.auth._
+import
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.common.utils.Java
import org.junit.jupiter.api.BeforeEach
object SslEndToEndAuthorizationTest {
- class TestPrincipalBuilder extends KafkaPrincipalBuilder {
+ class TestPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
private val Pattern = "O=A (.*?),CN=(.*?)".r
// Use full DN as client principal to test special characters in principal
diff --git
a/core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala
index 99d40ba..e0121b1 100644
---
a/core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala
@@ -28,7 +28,8 @@ import
org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData.
import org.apache.kafka.common.message.{AlterUserScramCredentialsRequestData,
DescribeUserScramCredentialsRequestData}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{AlterUserScramCredentialsRequest,
AlterUserScramCredentialsResponse, DescribeUserScramCredentialsRequest,
DescribeUserScramCredentialsResponse}
-import org.apache.kafka.common.security.auth.{AuthenticationContext,
KafkaPrincipal, KafkaPrincipalBuilder}
+import org.apache.kafka.common.security.auth.{AuthenticationContext,
KafkaPrincipal}
+import
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext,
AuthorizationResult}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@@ -383,13 +384,13 @@ object AlterCredentialsTest {
}
}
- class TestPrincipalBuilderReturningAuthorized extends KafkaPrincipalBuilder {
+ class TestPrincipalBuilderReturningAuthorized extends
DefaultKafkaPrincipalBuilder(null, null) {
override def build(context: AuthenticationContext): KafkaPrincipal = {
AuthorizedPrincipal
}
}
- class TestPrincipalBuilderReturningUnauthorized extends
KafkaPrincipalBuilder {
+ class TestPrincipalBuilderReturningUnauthorized extends
DefaultKafkaPrincipalBuilder(null, null) {
override def build(context: AuthenticationContext): KafkaPrincipal = {
UnauthorizedPrincipal
}
diff --git
a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
index 6465028..20e89ed 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
@@ -40,7 +40,7 @@ import org.apache.kafka.common.requests.DeleteTopicsRequest
import org.apache.kafka.common.requests.DeleteTopicsResponse
import org.apache.kafka.common.security.auth.AuthenticationContext
import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder
+import
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertTrue
@@ -53,7 +53,7 @@ import scala.jdk.CollectionConverters._
object ControllerMutationQuotaTest {
// Principal used for all client connections. This is updated by each test.
var principal = KafkaPrincipal.ANONYMOUS
- class TestPrincipalBuilder extends KafkaPrincipalBuilder {
+ class TestPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
override def build(context: AuthenticationContext): KafkaPrincipal = {
principal
}
diff --git
a/core/src/test/scala/unit/kafka/server/DescribeUserScramCredentialsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/DescribeUserScramCredentialsRequestTest.scala
index e4f28e2..012f833 100644
---
a/core/src/test/scala/unit/kafka/server/DescribeUserScramCredentialsRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/DescribeUserScramCredentialsRequestTest.scala
@@ -24,7 +24,8 @@ import
org.apache.kafka.common.message.{DescribeUserScramCredentialsRequestData,
import
org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData.UserName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{DescribeUserScramCredentialsRequest,
DescribeUserScramCredentialsResponse}
-import org.apache.kafka.common.security.auth.{AuthenticationContext,
KafkaPrincipal, KafkaPrincipalBuilder}
+import org.apache.kafka.common.security.auth.{AuthenticationContext,
KafkaPrincipal}
+import
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext,
AuthorizationResult}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@@ -125,13 +126,13 @@ object DescribeCredentialsTest {
}
}
- class TestPrincipalBuilderReturningAuthorized extends KafkaPrincipalBuilder {
+ class TestPrincipalBuilderReturningAuthorized extends
DefaultKafkaPrincipalBuilder(null, null) {
override def build(context: AuthenticationContext): KafkaPrincipal = {
AuthorizedPrincipal
}
}
- class TestPrincipalBuilderReturningUnauthorized extends
KafkaPrincipalBuilder {
+ class TestPrincipalBuilderReturningUnauthorized extends
DefaultKafkaPrincipalBuilder(null, null) {
override def build(context: AuthenticationContext): KafkaPrincipal = {
UnauthorizedPrincipal
}
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 5a9555e..c0d8eab 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -32,6 +32,8 @@
For a complete list of removed APIs compare the detailed Kafka Streams
upgrade notes.</li>
<li>Kafka Streams no longer has a compile time dependency on
"connect:json" module (<a
href="https://issues.apache.org/jira/browse/KAFKA-5146">KAFKA-5146</a>).
Projects that were relying on this transitive dependency will have to
explicitly declare it.</li>
+ <li>Custom principal builder implementations specified through
<code>principal.builder.class</code> must now implement the
+ <code>KafkaPrincipalSerde</code> interface to allow for forwarding
between brokers. See <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-590%3A+Redirect+Zookeeper+Mutation+Protocols+to+The+Controller">KIP-590</a>
for more details about the usage of KafkaPrincipalSerde.</li>
<li>A number of deprecated classes, methods and tools have been removed
from the <code>clients</code>, <code>connect</code>, <code>core</code> and
<code>tools</code> modules:</li>
<ul>
<li>The Scala <code>Authorizer</code>,
<code>SimpleAclAuthorizer</code> and related classes have been removed. Please
use the Java <code>Authorizer</code>