This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 22bef988d49 KAFKA-18926 KafkaPrincipalBuilder should extend
KafkaPrincipalSerde (#19987)
22bef988d49 is described below
commit 22bef988d490a5b653b92830c3e56cd1e00eaf65
Author: S.Y. Wang <[email protected]>
AuthorDate: Sun Jun 22 23:01:03 2025 +0900
KAFKA-18926 KafkaPrincipalBuilder should extend KafkaPrincipalSerde (#19987)
In KRaft, custom KafkaPrincipalBuilder instances must implement
KafkaPrincipalSerde to support the forward mechanism. Currently, this
requirement is not enforced and relies on the developer's attention.
With this patch, we can prevent incorrect implementations at compile
time.
Reviewers: Ken Huang <[email protected]>, TengYao Chi
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../apache/kafka/common/network/PlaintextChannelBuilder.java | 2 +-
.../org/apache/kafka/common/network/SslChannelBuilder.java | 2 +-
.../kafka/common/security/auth/KafkaPrincipalBuilder.java | 6 +-----
.../security/authenticator/DefaultKafkaPrincipalBuilder.java | 3 +--
.../common/security/authenticator/SaslServerAuthenticator.java | 2 +-
.../org/apache/kafka/common/network/ChannelBuildersTest.java | 10 ++++++++++
.../common/security/authenticator/SaslAuthenticatorTest.java | 10 ++++++++++
core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala | 2 +-
docs/upgrade.html | 3 +++
9 files changed, 29 insertions(+), 11 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
index c6181b81c5e..cf4ef470af0 100644
---
a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
+++
b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
@@ -103,7 +103,7 @@ public class PlaintextChannelBuilder implements
ChannelBuilder {
@Override
public Optional<KafkaPrincipalSerde> principalSerde() {
- return principalBuilder instanceof KafkaPrincipalSerde ?
Optional.of((KafkaPrincipalSerde) principalBuilder) : Optional.empty();
+ return Optional.of(principalBuilder);
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
index b45fb07442e..a35a0b8b209 100644
---
a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
+++
b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
@@ -164,7 +164,7 @@ public class SslChannelBuilder implements ChannelBuilder,
ListenerReconfigurable
@Override
public Optional<KafkaPrincipalSerde> principalSerde() {
- return principalBuilder instanceof KafkaPrincipalSerde ?
Optional.of((KafkaPrincipalSerde) principalBuilder) : Optional.empty();
+ return Optional.of(principalBuilder);
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalBuilder.java
b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalBuilder.java
index ec4317268d1..92be58ea2dc 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalBuilder.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalBuilder.java
@@ -23,12 +23,8 @@ package org.apache.kafka.common.security.auth;
* Note that the {@link org.apache.kafka.common.Configurable} and {@link
java.io.Closeable}
* interfaces are respected if implemented. Additionally, implementations must
provide a
* default no-arg constructor.
- *
- * Note that custom implementations of {@link KafkaPrincipalBuilder}
- * must also implement {@link KafkaPrincipalSerde}, otherwise brokers will not
be able to
- * forward requests to the controller.
*/
-public interface KafkaPrincipalBuilder {
+public interface KafkaPrincipalBuilder extends KafkaPrincipalSerde {
/**
* Build a kafka principal from the authentication context.
* @param context The authentication context (either {@link
SslAuthenticationContext} or
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
index fa654bcb928..5ba472263dd 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.common.security.auth.AuthenticationContext;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
-import org.apache.kafka.common.security.auth.KafkaPrincipalSerde;
import org.apache.kafka.common.security.auth.PlaintextAuthenticationContext;
import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
import org.apache.kafka.common.security.auth.SslAuthenticationContext;
@@ -50,7 +49,7 @@ import javax.security.sasl.SaslServer;
*
* NOTE: This is an internal class and can change without notice.
*/
-public class DefaultKafkaPrincipalBuilder implements KafkaPrincipalBuilder,
KafkaPrincipalSerde {
+public class DefaultKafkaPrincipalBuilder implements KafkaPrincipalBuilder {
private final KerberosShortNamer kerberosShortNamer;
private final SslPrincipalMapper sslPrincipalMapper;
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index e2ebaa31cd2..a0dbe5b21dc 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -318,7 +318,7 @@ public class SaslServerAuthenticator implements
Authenticator {
@Override
public Optional<KafkaPrincipalSerde> principalSerde() {
- return principalBuilder instanceof KafkaPrincipalSerde ?
Optional.of((KafkaPrincipalSerde) principalBuilder) : Optional.empty();
+ return Optional.of(principalBuilder);
}
@Override
diff --git
a/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java
b/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java
index e84c7c5e7c2..01936f457c6 100644
---
a/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java
@@ -118,5 +118,15 @@ public class ChannelBuildersTest {
public KafkaPrincipal build(AuthenticationContext context) {
return null;
}
+
+ @Override
+ public byte[] serialize(KafkaPrincipal principal) {
+ return new byte[0];
+ }
+
+ @Override
+ public KafkaPrincipal deserialize(byte[] bytes) {
+ return null;
+ }
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index c21b5d11023..a15414a73d0 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -2616,5 +2616,15 @@ public class SaslAuthenticatorTest {
static KafkaPrincipal saslSslPrincipal(String saslPrincipal, String
sslPrincipal) {
return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, saslPrincipal
+ ":" + sslPrincipal);
}
+
+ @Override
+ public byte[] serialize(KafkaPrincipal principal) {
+ return new byte[0];
+ }
+
+ @Override
+ public KafkaPrincipal deserialize(byte[] bytes) {
+ return null;
+ }
}
}
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index db03c891e4c..0049204964d 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -835,7 +835,7 @@ object RequestQuotaTest {
}
}
- class TestPrincipalBuilder extends KafkaPrincipalBuilder with
KafkaPrincipalSerde {
+ class TestPrincipalBuilder extends KafkaPrincipalBuilder {
override def build(context: AuthenticationContext): KafkaPrincipal = {
principal
}
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 9cd6d9b866a..3beec9dcb14 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -31,6 +31,9 @@
<li>
The <code>remote.log.manager.thread.pool.size</code> config was
deprecated. Please use the
<code>remote.log.manager.follower.thread.pool.size</code> instead.
</li>
+ <li>The <code>KafkaPrincipalBuilder</code> now extends
<code>KafkaPrincipalSerde</code>. Force developer to implement
<code>KafkaPrincipalSerde</code> interface for custom
<code>KafkaPrincipalBuilder</code>.
+ For further details, please refer to <a
href="https://cwiki.apache.org/confluence/x/1gq9F">KIP-1157</a>.
+ </li>
</ul>