This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4048aa19faf MINOR: Replace Collections factory methods with Java 11+ 
equivalents in clients (#22060)
4048aa19faf is described below

commit 4048aa19faf5159cca08a6754fafdc3be1c8bf08
Author: Maros Orsak <[email protected]>
AuthorDate: Sat Jun 6 17:38:59 2026 +0200

    MINOR: Replace Collections factory methods with Java 11+ equivalents in 
clients (#22060)
    
    This is the 4th part of improving replace collections factory methods
    with its Java 11 equivalents in the clients module.
    
    Reviewers: nileshkumar3 <[email protected]>, Ken Huang
    <[email protected]>, Christo Lolov <[email protected]>, Chia-Ping Tsai
    <[email protected]>
---
 .../main/java/org/apache/kafka/common/Cluster.java | 18 ++---
 .../kafka/common/metrics/MetricsReporter.java      |  3 +-
 .../kafka/common/network/ChannelBuilders.java      |  3 +-
 .../kafka/common/network/SaslChannelBuilder.java   |  9 +--
 .../org/apache/kafka/common/network/Selector.java  |  4 +-
 .../kafka/common/quota/ClientQuotaFilter.java      |  4 +-
 .../security/kerberos/KerberosShortNamer.java      |  3 +-
 .../security/oauthbearer/BrokerJwtValidator.java   |  5 +-
 .../security/oauthbearer/ClientJwtValidator.java   |  5 +-
 .../oauthbearer/OAuthBearerLoginModule.java        |  3 +-
 .../OAuthBearerSaslClientCallbackHandler.java      |  3 +-
 .../internals/secured/JaasOptionsUtils.java        |  3 +-
 .../unsecured/OAuthBearerUnsecuredJws.java         |  4 +-
 .../OAuthBearerUnsecuredLoginCallbackHandler.java  |  3 +-
 ...uthBearerUnsecuredValidatorCallbackHandler.java |  6 +-
 .../security/scram/ScramExtensionsCallback.java    |  3 +-
 .../common/security/scram/ScramLoginModule.java    |  3 +-
 .../security/scram/internals/ScramExtensions.java  |  3 +-
 .../kafka/common/security/ssl/SslFactory.java      |  4 +-
 .../common/telemetry/ClientTelemetryState.java     | 92 +++++++---------------
 .../internals/ClientTelemetryReporter.java         |  5 +-
 .../telemetry/internals/ClientTelemetryUtils.java  |  3 +-
 .../common/telemetry/internals/MetricKey.java      |  3 +-
 .../common/telemetry/internals/MetricsEmitter.java |  3 +-
 .../common/utils/internals/CopyOnWriteMap.java     |  4 +-
 .../ImplicitLinkedHashMultiCollection.java         |  3 +-
 26 files changed, 72 insertions(+), 130 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java 
b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index 93f2f4225bc..b8466dd0193 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -59,7 +59,7 @@ public final class Cluster {
                    Collection<PartitionInfo> partitions,
                    Set<String> unauthorizedTopics,
                    Set<String> internalTopics) {
-        this(clusterId, false, nodes, partitions, unauthorizedTopics, 
Collections.emptySet(), internalTopics, null, Collections.emptyMap());
+        this(clusterId, false, nodes, partitions, unauthorizedTopics, 
Set.of(), internalTopics, null, Map.of());
     }
 
     /**
@@ -73,7 +73,7 @@ public final class Cluster {
                    Set<String> unauthorizedTopics,
                    Set<String> internalTopics,
                    Node controller) {
-        this(clusterId, false, nodes, partitions, unauthorizedTopics, 
Collections.emptySet(), internalTopics, controller, Collections.emptyMap());
+        this(clusterId, false, nodes, partitions, unauthorizedTopics, 
Set.of(), internalTopics, controller, Map.of());
     }
 
     /**
@@ -88,7 +88,7 @@ public final class Cluster {
                    Set<String> invalidTopics,
                    Set<String> internalTopics,
                    Node controller) {
-        this(clusterId, false, nodes, partitions, unauthorizedTopics, 
invalidTopics, internalTopics, controller, Collections.emptyMap());
+        this(clusterId, false, nodes, partitions, unauthorizedTopics, 
invalidTopics, internalTopics, controller, Map.of());
     }
 
     /**
@@ -199,8 +199,8 @@ public final class Cluster {
      * Create an empty cluster instance with no nodes and no topic-partitions.
      */
     public static Cluster empty() {
-        return new Cluster(null, new ArrayList<>(0), new ArrayList<>(0), 
Collections.emptySet(),
-            Collections.emptySet(), null);
+        return new Cluster(null, new ArrayList<>(0), new ArrayList<>(0), 
Set.of(),
+            Set.of(), null);
     }
 
     /**
@@ -214,7 +214,7 @@ public final class Cluster {
         for (InetSocketAddress address : addresses)
             nodes.add(new Node(nodeId--, address.getHostString(), 
address.getPort()));
         return new Cluster(null, true, nodes, new ArrayList<>(0),
-            Collections.emptySet(), Collections.emptySet(), 
Collections.emptySet(), null, Collections.emptyMap());
+            Set.of(), Set.of(), Set.of(), null, Map.of());
     }
 
     /**
@@ -292,7 +292,7 @@ public final class Cluster {
      * @return A list of partitions
      */
     public List<PartitionInfo> partitionsForTopic(String topic) {
-        return partitionsByTopic.getOrDefault(topic, Collections.emptyList());
+        return partitionsByTopic.getOrDefault(topic, List.of());
     }
 
     /**
@@ -311,7 +311,7 @@ public final class Cluster {
      * @return A list of partitions
      */
     public List<PartitionInfo> availablePartitionsForTopic(String topic) {
-        return availablePartitionsByTopic.getOrDefault(topic, 
Collections.emptyList());
+        return availablePartitionsByTopic.getOrDefault(topic, List.of());
     }
 
     /**
@@ -320,7 +320,7 @@ public final class Cluster {
      * @return A list of partitions
      */
     public List<PartitionInfo> partitionsForNode(int nodeId) {
-        return partitionsByNode.getOrDefault(nodeId, Collections.emptyList());
+        return partitionsByNode.getOrDefault(nodeId, List.of());
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
index 0b6e47ee2f5..a46105def92 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
@@ -19,7 +19,6 @@ package org.apache.kafka.common.metrics;
 import org.apache.kafka.common.Reconfigurable;
 import org.apache.kafka.common.config.ConfigException;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -56,7 +55,7 @@ public interface MetricsReporter extends Reconfigurable, 
AutoCloseable {
 
     // default methods for backwards compatibility with reporters that only 
implement Configurable
     default Set<String> reconfigurableConfigs() {
-        return Collections.emptySet();
+        return Set.of();
     }
 
     default void validateReconfiguration(Map<String, ?> configs) throws 
ConfigException {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java 
b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
index dd557a1b25c..a028b345764 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
@@ -37,7 +37,6 @@ import org.apache.kafka.common.utils.internals.LogContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
@@ -163,7 +162,7 @@ public class ChannelBuilders {
                     // Use server context for inter-broker client connections 
and client context for other clients
                     JaasContext jaasContext = contextType == 
JaasContext.Type.CLIENT ? JaasContext.loadClientContext(configs) :
                             JaasContext.loadServerContext(listenerName, 
clientSaslMechanism, configs);
-                    jaasContexts = 
Collections.singletonMap(clientSaslMechanism, jaasContext);
+                    jaasContexts = Map.of(clientSaslMechanism, jaasContext);
                 }
                 channelBuilder = new SaslChannelBuilder(connectionMode,
                         jaasContexts,
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java 
b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 3401f593e91..52f798ef7e5 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -65,7 +65,6 @@ import java.io.IOException;
 import java.net.Socket;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -187,7 +186,7 @@ public class SaslChannelBuilder implements ChannelBuilder, 
ListenerReconfigurabl
 
     @Override
     public Set<String> reconfigurableConfigs() {
-        return securityProtocol == SecurityProtocol.SASL_SSL ? 
SslConfigs.RECONFIGURABLE_CONFIGS : Collections.emptySet();
+        return securityProtocol == SecurityProtocol.SASL_SSL ? 
SslConfigs.RECONFIGURABLE_CONFIGS : Set.of();
     }
 
     @Override
@@ -223,11 +222,11 @@ public class SaslChannelBuilder implements 
ChannelBuilder, ListenerReconfigurabl
             Supplier<Authenticator> authenticatorCreator;
             if (connectionMode == ConnectionMode.SERVER) {
                 authenticatorCreator = () -> buildServerAuthenticator(configs,
-                        Collections.unmodifiableMap(saslCallbackHandlers),
+                        Map.copyOf(saslCallbackHandlers),
                         id,
                         finalTransportLayer,
-                        Collections.unmodifiableMap(subjects),
-                        
Collections.unmodifiableMap(connectionsMaxReauthMsByMechanism),
+                        Map.copyOf(subjects),
+                        Map.copyOf(connectionsMaxReauthMsByMechanism),
                         metadataRegistry);
             } else {
                 LoginManager loginManager = 
loginManagers.get(clientSaslMechanism);
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/Selector.java 
b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index a88a7c1196e..0b6cee13e1c 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -226,11 +226,11 @@ public class Selector implements Selectable, 
AutoCloseable {
     }
 
     public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, 
String metricGrpPrefix, ChannelBuilder channelBuilder, LogContext logContext) {
-        this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, 
metricGrpPrefix, Collections.emptyMap(), true, channelBuilder, logContext);
+        this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, 
metricGrpPrefix, Map.of(), true, channelBuilder, logContext);
     }
 
     public Selector(long connectionMaxIdleMS, int failedAuthenticationDelayMs, 
Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder 
channelBuilder, LogContext logContext) {
-        this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, 
failedAuthenticationDelayMs, metrics, time, metricGrpPrefix, 
Collections.emptyMap(), true, channelBuilder, logContext);
+        this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, 
failedAuthenticationDelayMs, metrics, time, metricGrpPrefix, Map.of(), true, 
channelBuilder, logContext);
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaFilter.java 
b/clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaFilter.java
index e8a6a7290d4..0532c66580a 100644
--- a/clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaFilter.java
+++ b/clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaFilter.java
@@ -18,7 +18,7 @@
 package org.apache.kafka.common.quota;
 
 import java.util.Collection;
-import java.util.Collections;
+import java.util.List;
 import java.util.Objects;
 
 /**
@@ -64,7 +64,7 @@ public class ClientQuotaFilter {
      * Constructs and returns a quota filter that matches all configured 
entities.
      */
     public static ClientQuotaFilter all() {
-        return new ClientQuotaFilter(Collections.emptyList(), false);
+        return new ClientQuotaFilter(List.of(), false);
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosShortNamer.java
 
b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosShortNamer.java
index 96e01f17942..8aded504dce 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosShortNamer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosShortNamer.java
@@ -18,7 +18,6 @@ package org.apache.kafka.common.security.kerberos;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -43,7 +42,7 @@ public class KerberosShortNamer {
     }
 
     public static KerberosShortNamer fromUnparsedRules(String defaultRealm, 
List<String> principalToLocalRules) {
-        List<String> rules = principalToLocalRules == null ? 
Collections.singletonList("DEFAULT") : principalToLocalRules;
+        List<String> rules = principalToLocalRules == null ? 
List.of("DEFAULT") : principalToLocalRules;
         return new KerberosShortNamer(parseRules(defaultRealm, rules));
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/BrokerJwtValidator.java
 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/BrokerJwtValidator.java
index c69db033052..60b065a5e45 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/BrokerJwtValidator.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/BrokerJwtValidator.java
@@ -36,7 +36,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -164,11 +163,11 @@ public class BrokerJwtValidator implements JwtValidator {
         Collection<String> scopeRawCollection;
 
         if (scopeRaw instanceof String)
-            scopeRawCollection = Collections.singletonList((String) scopeRaw);
+            scopeRawCollection = List.of((String) scopeRaw);
         else if (scopeRaw instanceof Collection)
             scopeRawCollection = (Collection<String>) scopeRaw;
         else
-            scopeRawCollection = Collections.emptySet();
+            scopeRawCollection = Set.of();
 
         NumericDate expirationRaw = getClaim(claims::getExpirationTime, 
ReservedClaimNames.EXPIRATION_TIME);
         String subRaw = getClaim(() -> 
claims.getStringClaimValue(subClaimName), subClaimName);
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/ClientJwtValidator.java
 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/ClientJwtValidator.java
index 53cd88f24dd..6d3ce1a8079 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/ClientJwtValidator.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/ClientJwtValidator.java
@@ -28,7 +28,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -107,11 +106,11 @@ public class ClientJwtValidator implements JwtValidator {
         Collection<String> scopeRawCollection;
 
         if (scopeRaw instanceof String)
-            scopeRawCollection = Collections.singletonList((String) scopeRaw);
+            scopeRawCollection = List.of((String) scopeRaw);
         else if (scopeRaw instanceof Collection)
             scopeRawCollection = (Collection<String>) scopeRaw;
         else
-            scopeRawCollection = Collections.emptySet();
+            scopeRawCollection = Set.of();
 
         Number expirationRaw = (Number) getClaim(payload, 
EXPIRATION_CLAIM_NAME);
         String subRaw = (String) getClaim(payload, subClaimName);
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java
 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java
index ddbbd1a787a..d8cbe777458 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java
@@ -28,7 +28,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Objects;
@@ -257,7 +256,7 @@ public class OAuthBearerLoginModule implements LoginModule {
      */
     public static final String OAUTHBEARER_MECHANISM = "OAUTHBEARER";
     private static final Logger log = 
LoggerFactory.getLogger(OAuthBearerLoginModule.class);
-    private static final SaslExtensions EMPTY_EXTENSIONS = new 
SaslExtensions(Collections.emptyMap());
+    private static final SaslExtensions EMPTY_EXTENSIONS = new 
SaslExtensions(Map.of());
     private Subject subject = null;
     private AuthenticateCallbackHandler callbackHandler = null;
     private OAuthBearerToken tokenRequiringCommit = null;
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientCallbackHandler.java
 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientCallbackHandler.java
index 955ab4c1fac..b2a508e92eb 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientCallbackHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientCallbackHandler.java
@@ -28,7 +28,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
 import java.util.List;
@@ -100,7 +99,7 @@ public class OAuthBearerSaslClientCallbackHandler implements 
AuthenticateCallbac
         Subject subject = SecurityManagerCompatibility.get().current();
         Set<OAuthBearerToken> privateCredentials = subject != null
             ? subject.getPrivateCredentials(OAuthBearerToken.class)
-            : Collections.emptySet();
+            : Set.of();
         if (privateCredentials.isEmpty())
             throw new IOException("No OAuth Bearer tokens in Subject's private 
credentials");
         if (privateCredentials.size() == 1)
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/JaasOptionsUtils.java
 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/JaasOptionsUtils.java
index ec6d3daafe8..73b9a1560a6 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/JaasOptionsUtils.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/JaasOptionsUtils.java
@@ -31,7 +31,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.URL;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -65,7 +64,7 @@ public class JaasOptionsUtils {
         if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || 
jaasConfigEntries.get(0) == null)
             throw new IllegalArgumentException(String.format("Must supply 
exactly 1 non-null JAAS mechanism configuration (size was %d)", 
jaasConfigEntries.size()));
 
-        return 
Collections.unmodifiableMap(jaasConfigEntries.get(0).getOptions());
+        return Map.copyOf(jaasConfigEntries.get(0).getOptions());
     }
 
     public boolean containsKey(String name) {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java
 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java
index bea463a8d14..6d7c4dae109 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java
@@ -345,14 +345,14 @@ public class OAuthBearerUnsecuredJws implements 
OAuthBearerToken {
         if (isClaimType(scopeClaimName, String.class)) {
             String scopeClaimValue = claim(scopeClaimName, String.class);
             if (Utils.isBlank(scopeClaimValue))
-                return Collections.emptySet();
+                return Set.of();
             else {
                 return Set.of(scopeClaimValue.trim());
             }
         }
         List<?> scopeClaimValue = claim(scopeClaimName, List.class);
         if (scopeClaimValue == null || scopeClaimValue.isEmpty())
-            return Collections.emptySet();
+            return Set.of();
         @SuppressWarnings("unchecked")
         List<String> stringList = (List<String>) scopeClaimValue;
         Set<String> retval = new HashSet<>();
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java
 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java
index 455fda983c3..bec5e103a1b 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java
@@ -34,7 +34,6 @@ import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Base64;
 import java.util.Base64.Encoder;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -147,7 +146,7 @@ public class OAuthBearerUnsecuredLoginCallbackHandler 
implements AuthenticateCal
             throw new IllegalArgumentException(
                     String.format("Must supply exactly 1 non-null JAAS 
mechanism configuration (size was %d)",
                             jaasConfigEntries.size()));
-        this.moduleOptions = Collections.unmodifiableMap((Map<String, String>) 
jaasConfigEntries.get(0).getOptions());
+        this.moduleOptions = Map.copyOf((Map<String, String>) 
jaasConfigEntries.get(0).getOptions());
         configured = true;
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java
 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java
index 53e099688d9..4f2c06f4b85 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -118,8 +117,7 @@ public class OAuthBearerUnsecuredValidatorCallbackHandler 
implements Authenticat
             throw new IllegalArgumentException(
                     String.format("Must supply exactly 1 non-null JAAS 
mechanism configuration (size was %d)",
                             jaasConfigEntries.size()));
-        this.moduleOptions = Collections
-                .unmodifiableMap((Map<String, String>) 
jaasConfigEntries.get(0).getOptions());
+        this.moduleOptions = Map.copyOf((Map<String, String>) 
jaasConfigEntries.get(0).getOptions());
         configured = true;
     }
 
@@ -188,7 +186,7 @@ public class OAuthBearerUnsecuredValidatorCallbackHandler 
implements Authenticat
 
     private List<String> requiredScope() {
         String requiredSpaceDelimitedScope = option(REQUIRED_SCOPE_OPTION);
-        return Utils.isBlank(requiredSpaceDelimitedScope) ? 
Collections.emptyList() : 
OAuthBearerScopeUtils.parseScope(requiredSpaceDelimitedScope.trim());
+        return Utils.isBlank(requiredSpaceDelimitedScope) ? List.of() : 
OAuthBearerScopeUtils.parseScope(requiredSpaceDelimitedScope.trim());
     }
 
     private int allowableClockSkewMs() {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramExtensionsCallback.java
 
b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramExtensionsCallback.java
index 23ab1aaa8e4..69350b7f464 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramExtensionsCallback.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramExtensionsCallback.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.common.security.scram;
 
-import java.util.Collections;
 import java.util.Map;
 
 import javax.security.auth.callback.Callback;
@@ -27,7 +26,7 @@ import javax.security.auth.callback.Callback;
  * in the SASL/SCRAM exchange.
  */
 public class ScramExtensionsCallback implements Callback {
-    private Map<String, String> extensions = Collections.emptyMap();
+    private Map<String, String> extensions = Map.of();
 
     /**
      * Returns map of the extension names and values that are sent by the 
client to
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java
 
b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java
index eb523c9a82c..3bf60161a04 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java
@@ -19,7 +19,6 @@ package org.apache.kafka.common.security.scram;
 import 
org.apache.kafka.common.security.scram.internals.ScramSaslClientProvider;
 import 
org.apache.kafka.common.security.scram.internals.ScramSaslServerProvider;
 
-import java.util.Collections;
 import java.util.Map;
 
 import javax.security.auth.Subject;
@@ -48,7 +47,7 @@ public class ScramLoginModule implements LoginModule {
 
         boolean useTokenAuthentication = "true".equalsIgnoreCase((String) 
options.get(TOKEN_AUTH_CONFIG));
         if (useTokenAuthentication) {
-            Map<String, String> scramExtensions = 
Collections.singletonMap(TOKEN_AUTH_CONFIG, "true");
+            Map<String, String> scramExtensions = Map.of(TOKEN_AUTH_CONFIG, 
"true");
             subject.getPublicCredentials().add(scramExtensions);
         }
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramExtensions.java
 
b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramExtensions.java
index 439bcfe14f2..81fd5693b1c 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramExtensions.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramExtensions.java
@@ -20,13 +20,12 @@ import org.apache.kafka.common.security.auth.SaslExtensions;
 import org.apache.kafka.common.security.scram.ScramLoginModule;
 import org.apache.kafka.common.utils.Utils;
 
-import java.util.Collections;
 import java.util.Map;
 
 public class ScramExtensions extends SaslExtensions {
 
     public ScramExtensions() {
-        this(Collections.emptyMap());
+        this(Map.of());
     }
 
     public ScramExtensions(String extensions) {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java 
b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
index ced699640fd..ab99f5411c7 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -40,10 +40,8 @@ import java.security.cert.Certificate;
 import java.security.cert.X509Certificate;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -372,7 +370,7 @@ public class SslFactory implements Reconfigurable, 
Closeable {
             this.subjectPrincipal = cert.getSubjectX500Principal();
             Collection<List<?>> altNames = cert.getSubjectAlternativeNames();
             // use a set for comparison
-            this.subjectAltNames = altNames != null ? new HashSet<>(altNames) 
: Collections.emptySet();
+            this.subjectAltNames = altNames != null ? Set.copyOf(altNames) : 
Set.of();
         }
 
         @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetryState.java
 
b/clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetryState.java
index 35762de28fc..18fe80a54e3 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetryState.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetryState.java
@@ -18,9 +18,6 @@
 package org.apache.kafka.common.telemetry;
 
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.EnumMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -65,67 +62,34 @@ public enum ClientTelemetryState {
      */
     TERMINATED;
 
-    private static final Map<ClientTelemetryState, List<ClientTelemetryState>> 
VALID_NEXT_STATES = new EnumMap<>(ClientTelemetryState.class);
-
-    static {
-        /*
-         If clients needs a subscription, then issue telemetry API to fetch 
subscription from broker.
-
-         However, it's still possible that client doesn't get very far before 
terminating.
-        */
-        VALID_NEXT_STATES.put(
-            SUBSCRIPTION_NEEDED, Arrays.asList(SUBSCRIPTION_IN_PROGRESS, 
TERMINATED));
-
-        /*
-         If client is finished waiting for subscription, then client is ready 
to push the telemetry.
-         But, it's possible that no telemetry metrics are requested, hence 
client should go back to
-         subscription needed state i.e. requesting the next updated 
subscription.
-
-         However, it's still possible that client doesn't get very far before 
terminating.
-        */
-        VALID_NEXT_STATES.put(SUBSCRIPTION_IN_PROGRESS, 
Arrays.asList(PUSH_NEEDED,
-            SUBSCRIPTION_NEEDED, TERMINATING_PUSH_NEEDED, TERMINATED));
-
-        /*
-         If client transitions out of this state, then client should proceed 
to push the metrics.
-         But, if the push fails (network issues, the subscription changed, 
etc.) then client should
-         go back to subscription needed state and request the next 
subscription.
-
-         However, it's still possible that client doesn't get very far before 
terminating.
-        */
-        VALID_NEXT_STATES.put(PUSH_NEEDED, Arrays.asList(PUSH_IN_PROGRESS, 
SUBSCRIPTION_NEEDED,
-            TERMINATING_PUSH_NEEDED, TERMINATED));
-
-        /*
-         A successful push should transition client to push needed which sends 
the next telemetry
-         metrics after the elapsed wait interval. But, if the push fails 
(network issues, the
-         subscription changed, etc.) then client should go back to 
subscription needed state and
-         request the next subscription.
-
-         However, it's still possible that client doesn't get very far before 
terminating.
-        */
-        VALID_NEXT_STATES.put(
-            PUSH_IN_PROGRESS, Arrays.asList(PUSH_NEEDED, SUBSCRIPTION_NEEDED, 
TERMINATING_PUSH_NEEDED,
-                TERMINATED));
-
-        /*
-         If client is moving out of this state, then try to send last metrics 
push.
-
-         However, it's still possible that client doesn't get very far before 
terminating.
-        */
-        VALID_NEXT_STATES.put(
-            TERMINATING_PUSH_NEEDED, 
Arrays.asList(TERMINATING_PUSH_IN_PROGRESS, TERMINATED));
-
-        /*
-         Client should only be transited to terminated state.
-        */
-        VALID_NEXT_STATES.put(TERMINATING_PUSH_IN_PROGRESS, 
Collections.singletonList(TERMINATED));
-
-        /*
-         Client should never be able to transition out of terminated state.
-        */
-        VALID_NEXT_STATES.put(TERMINATED, Collections.emptyList());
-    }
+    private static final Map<ClientTelemetryState, List<ClientTelemetryState>> 
VALID_NEXT_STATES = Map.of(
+        // If client needs a subscription, then issue telemetry API to fetch 
subscription from broker.
+        // However, it's still possible that client doesn't get very far 
before terminating.
+        SUBSCRIPTION_NEEDED, List.of(SUBSCRIPTION_IN_PROGRESS, TERMINATED),
+        // If client is finished waiting for subscription, then client is 
ready to push the telemetry.
+        // But, it's possible that no telemetry metrics are requested, hence 
client should go back to
+        // subscription needed state i.e. requesting the next updated 
subscription.
+        // However, it's still possible that client doesn't get very far 
before terminating.
+        SUBSCRIPTION_IN_PROGRESS, List.of(PUSH_NEEDED, SUBSCRIPTION_NEEDED, 
TERMINATING_PUSH_NEEDED, TERMINATED),
+        // If client transitions out of this state, then client should proceed 
to push the metrics.
+        // But, if the push fails (network issues, the subscription changed, 
etc.) then client should
+        // go back to subscription needed state and request the next 
subscription.
+        // However, it's still possible that client doesn't get very far 
before terminating.
+        PUSH_NEEDED, List.of(PUSH_IN_PROGRESS, SUBSCRIPTION_NEEDED, 
TERMINATING_PUSH_NEEDED, TERMINATED),
+        // A successful push should transition client to push needed which 
sends the next telemetry
+        // metrics after the elapsed wait interval. But, if the push fails 
(network issues, the
+        // subscription changed, etc.) then client should go back to 
subscription needed state and
+        // request the next subscription.
+        // However, it's still possible that client doesn't get very far 
before terminating.
+        PUSH_IN_PROGRESS, List.of(PUSH_NEEDED, SUBSCRIPTION_NEEDED, 
TERMINATING_PUSH_NEEDED, TERMINATED),
+        // If client is moving out of this state, then try to send last 
metrics push.
+        // However, it's still possible that client doesn't get very far 
before terminating.
+        TERMINATING_PUSH_NEEDED, List.of(TERMINATING_PUSH_IN_PROGRESS, 
TERMINATED),
+        // Client should only be transited to terminated state.
+        TERMINATING_PUSH_IN_PROGRESS, List.of(TERMINATED),
+        // Client should never be able to transition out of terminated state.
+        TERMINATED, List.of()
+    );
 
     /**
      * Validates that the <code>newState</code> is one of the valid transition 
from the current
diff --git 
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
 
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
index 0aa6bf16f0a..cee8ff435a9 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
@@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.time.Duration;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -112,7 +111,7 @@ public class ClientTelemetryReporter implements 
MetricsReporter {
      context. These additional labels from the request context should be added 
by broker prior
      exporting the metrics to the telemetry backend.
     */
-    private static final Set<String> EXCLUDE_LABELS = 
Collections.singleton("client_id");
+    private static final Set<String> EXCLUDE_LABELS = Set.of("client_id");
 
     public static final int DEFAULT_PUSH_INTERVAL_MS = 5 * 60 * 1000;
 
@@ -976,7 +975,7 @@ public class ClientTelemetryReporter implements 
MetricsReporter {
             this.clientInstanceId = clientInstanceId;
             this.subscriptionId = subscriptionId;
             this.pushIntervalMs = pushIntervalMs;
-            this.acceptedCompressionTypes = 
Collections.unmodifiableList(acceptedCompressionTypes);
+            this.acceptedCompressionTypes = 
List.copyOf(acceptedCompressionTypes);
             this.deltaTemporality = deltaTemporality;
             this.selector = selector;
         }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
 
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
index 7f44b2dbd27..d67775e1afb 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
@@ -36,7 +36,6 @@ import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -122,7 +121,7 @@ public class ClientTelemetryUtils {
 
     public static List<CompressionType> 
getCompressionTypesFromAcceptedList(List<Byte> acceptedCompressionTypes) {
         if (acceptedCompressionTypes == null || 
acceptedCompressionTypes.isEmpty()) {
-            return Collections.emptyList();
+            return List.of();
         }
 
         List<CompressionType> result = new ArrayList<>();
diff --git 
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricKey.java
 
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricKey.java
index d2054069a9c..4dbf66b0b40 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricKey.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricKey.java
@@ -18,7 +18,6 @@ package org.apache.kafka.common.telemetry.internals;
 
 import org.apache.kafka.common.MetricName;
 
-import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
 
@@ -49,7 +48,7 @@ public class MetricKey implements MetricKeyable {
      */
     public MetricKey(String name, Map<String, String> tags) {
         this.name = Objects.requireNonNull(name);
-        this.tags = tags != null ? Collections.unmodifiableMap(tags) : 
Collections.emptyMap();
+        this.tags = tags != null ? Map.copyOf(tags) : Map.of();
     }
 
     public MetricKey(MetricName metricName) {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricsEmitter.java
 
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricsEmitter.java
index 3b6a3864aa2..005e6eb150e 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricsEmitter.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricsEmitter.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.common.telemetry.internals;
 
 import java.io.Closeable;
-import java.util.Collections;
 import java.util.List;
 
 /**
@@ -69,7 +68,7 @@ public interface MetricsEmitter extends Closeable {
      * @return emitted metrics.
      */
     default List<SinglePointMetric> emittedMetrics() {
-        return Collections.emptyList();
+        return List.of();
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/common/utils/internals/CopyOnWriteMap.java
 
b/clients/src/main/java/org/apache/kafka/common/utils/internals/CopyOnWriteMap.java
index 2e7bd69ff8c..5734742759b 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/utils/internals/CopyOnWriteMap.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/utils/internals/CopyOnWriteMap.java
@@ -35,7 +35,7 @@ public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, 
V> {
     private volatile Map<K, V> map;
 
     public CopyOnWriteMap() {
-        this.map = Collections.emptyMap();
+        this.map = Map.of();
     }
 
     public CopyOnWriteMap(Map<K, V> map) {
@@ -84,7 +84,7 @@ public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, 
V> {
 
     @Override
     public synchronized void clear() {
-        this.map = Collections.emptyMap();
+        this.map = Map.of();
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashMultiCollection.java
 
b/clients/src/main/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashMultiCollection.java
index 17252a5285c..280ac26c6f7 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashMultiCollection.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashMultiCollection.java
@@ -18,7 +18,6 @@
 package org.apache.kafka.common.utils.internals;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
@@ -121,7 +120,7 @@ public class ImplicitLinkedHashMultiCollection<E extends 
ImplicitLinkedHashColle
      */
     public final List<E> findAll(E key) {
         if (key == null || size() == 0) {
-            return Collections.emptyList();
+            return List.of();
         }
         ArrayList<E> results = new ArrayList<>();
         int slot = slot(elements, key);


Reply via email to