This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d5e270482cf MINOR: Various cleanups in clients (#17895)
d5e270482cf is described below

commit d5e270482cf6601e1b635d69d311957e84d6bb59
Author: Mickael Maison <[email protected]>
AuthorDate: Fri Nov 22 20:38:31 2024 +0100

    MINOR: Various cleanups in clients (#17895)
    
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../java/org/apache/kafka/clients/Metadata.java    |  6 ++--
 .../org/apache/kafka/clients/MetadataSnapshot.java |  2 +-
 .../clients/admin/ConsumerGroupDescription.java    |  8 ++---
 .../clients/admin/DeleteConsumerGroupsResult.java  |  2 +-
 .../admin/DescribeUserScramCredentialsResult.java  |  2 +-
 .../kafka/clients/admin/MemberAssignment.java      |  4 +--
 .../clients/admin/NewPartitionReassignment.java    |  4 +--
 .../kafka/clients/admin/ShareGroupDescription.java |  5 ++--
 .../admin/UserScramCredentialsDescription.java     |  4 +--
 .../clients/admin/internals/AdminApiDriver.java    |  2 +-
 .../admin/internals/AdminMetadataManager.java      |  2 +-
 .../admin/internals/AllBrokersStrategy.java        |  2 +-
 .../admin/internals/ListTransactionsHandler.java   |  2 +-
 .../kafka/clients/consumer/ConsumerConfig.java     |  9 ++----
 .../kafka/clients/consumer/ConsumerRecords.java    |  2 +-
 .../kafka/clients/consumer/MockConsumer.java       |  2 +-
 .../consumer/NoOffsetForPartitionException.java    |  3 +-
 .../kafka/clients/consumer/RangeAssignor.java      |  2 +-
 .../consumer/internals/AbstractCoordinator.java    |  6 ++--
 .../clients/consumer/internals/AbstractFetch.java  |  2 +-
 .../consumer/internals/AbstractStickyAssignor.java |  8 ++---
 .../clients/consumer/internals/AsyncClient.java    |  2 +-
 .../consumer/internals/AsyncKafkaConsumer.java     | 10 +++----
 .../consumer/internals/ClassicKafkaConsumer.java   |  8 ++---
 .../consumer/internals/CommitRequestManager.java   |  2 +-
 .../consumer/internals/ConsumerCoordinator.java    |  4 +--
 .../internals/CoordinatorRequestManager.java       |  2 +-
 .../kafka/clients/consumer/internals/Fetch.java    |  2 +-
 .../clients/consumer/internals/FetchCollector.java |  2 +-
 .../kafka/clients/consumer/internals/Fetcher.java  |  2 +-
 .../consumer/internals/NetworkClientDelegate.java  |  2 +-
 .../clients/consumer/internals/OffsetFetcher.java  | 12 ++++----
 .../consumer/internals/OffsetsRequestManager.java  |  2 +-
 .../clients/consumer/internals/RequestFuture.java  |  4 +--
 .../consumer/internals/RequestManagers.java        |  4 +--
 .../internals/ShareConsumeRequestManager.java      |  2 +-
 .../consumer/internals/ShareConsumerImpl.java      |  2 +-
 .../consumer/internals/SubscriptionState.java      |  2 +-
 .../events/ApplicationEventProcessor.java          | 22 +++++++-------
 .../internals/events/AssignmentChangeEvent.java    |  5 ++--
 .../consumer/internals/events/CommitEvent.java     |  2 +-
 .../events/ShareAcknowledgeOnCloseEvent.java       |  2 +-
 .../events/ShareAcknowledgeSyncEvent.java          |  2 +-
 .../consumer/internals/events/ShareFetchEvent.java |  2 +-
 .../kafka/clients/producer/KafkaProducer.java      |  2 +-
 .../clients/producer/internals/ProducerBatch.java  |  2 +-
 .../producer/internals/TxnPartitionMap.java        |  2 +-
 .../org/apache/kafka/common/cache/LRUCache.java    |  2 +-
 .../org/apache/kafka/common/config/ConfigDef.java  |  4 +--
 .../apache/kafka/common/config/SslClientAuth.java  |  5 +---
 .../common/header/internals/RecordHeaders.java     |  2 +-
 .../org/apache/kafka/common/protocol/Protocol.java |  5 +---
 .../common/record/AbstractLegacyRecordBatch.java   |  2 +-
 .../kafka/common/record/AbstractRecords.java       |  2 +-
 .../apache/kafka/common/record/MemoryRecords.java  |  2 +-
 .../kafka/common/requests/ApiVersionsResponse.java |  2 +-
 .../common/requests/DescribeConfigsResponse.java   |  1 -
 .../apache/kafka/common/security/JaasContext.java  |  4 +--
 .../kafka/common/security/auth/SaslExtensions.java |  3 +-
 .../OAuthBearerClientInitialResponse.java          |  2 +-
 .../secured/HttpAccessTokenRetriever.java          | 35 +++++++++-------------
 .../internals/secured/RefreshingHttpsJwks.java     |  2 +-
 .../unsecured/OAuthBearerUnsecuredJws.java         |  4 +--
 .../OAuthBearerUnsecuredLoginCallbackHandler.java  |  5 +---
 ...uthBearerUnsecuredValidatorCallbackHandler.java |  2 +-
 ...ommonNameLoggingTrustManagerFactoryWrapper.java |  4 +--
 .../internals/TelemetryMetricNamingConvention.java |  2 +-
 .../kafka/common/utils/ChildFirstClassLoader.java  |  2 +-
 .../kafka/common/utils/CloseableIterator.java      |  2 +-
 .../org/apache/kafka/common/utils/Sanitizer.java   | 35 ++++++++--------------
 .../apache/kafka/server/authorizer/Authorizer.java |  4 +--
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 10 +++----
 .../admin/internals/DeleteRecordsHandlerTest.java  |  2 +-
 .../admin/internals/ListOffsetsHandlerTest.java    |  2 +-
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 10 +++----
 .../internals/AbstractCoordinatorTest.java         |  2 +-
 .../consumer/internals/FetchCollectorTest.java     |  2 +-
 .../internals/FetchRequestManagerTest.java         |  5 ++--
 .../clients/consumer/internals/FetcherTest.java    |  7 ++---
 .../consumer/internals/OffsetFetcherTest.java      |  2 +-
 .../consumer/internals/RequestFutureTest.java      |  4 +--
 .../internals/ShareConsumeRequestManagerTest.java  |  8 ++---
 .../internals/ShareFetchCollectorTest.java         |  2 +-
 .../internals/TopicMetadataFetcherTest.java        |  2 +-
 .../kafka/clients/producer/KafkaProducerTest.java  |  2 +-
 .../kafka/clients/producer/MockProducerTest.java   | 24 +++++++--------
 .../config/provider/EnvVarConfigProviderTest.java  |  2 +-
 .../common/network/SaslChannelBuilderTest.java     |  3 +-
 .../internals/OAuthBearerSaslClientTest.java       |  2 +-
 .../internals/OAuthBearerSaslServerTest.java       | 10 ++-----
 .../secured/HttpAccessTokenRetrieverTest.java      | 13 ++++----
 ...earerUnsecuredValidatorCallbackHandlerTest.java | 30 +++++++------------
 .../common/serialization/SerializationTest.java    |  2 +-
 .../org/apache/kafka/common/utils/UtilsTest.java   |  4 +--
 94 files changed, 199 insertions(+), 268 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java 
b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index ece1a25adca..b60156aae00 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -294,7 +294,7 @@ public class Metadata implements Closeable {
 
     public synchronized LeaderAndEpoch currentLeader(TopicPartition 
topicPartition) {
         Optional<MetadataResponse.PartitionMetadata> maybeMetadata = 
partitionMetadataIfCurrent(topicPartition);
-        if (!maybeMetadata.isPresent())
+        if (maybeMetadata.isEmpty())
             return new LeaderAndEpoch(Optional.empty(), 
Optional.ofNullable(lastSeenLeaderEpochs.get(topicPartition)));
 
         MetadataResponse.PartitionMetadata partitionMetadata = 
maybeMetadata.get();
@@ -392,7 +392,7 @@ public class Metadata implements Closeable {
             TopicPartition partition = partitionLeader.getKey();
             Metadata.LeaderAndEpoch currentLeader = currentLeader(partition);
             Metadata.LeaderIdAndEpoch newLeader = partitionLeader.getValue();
-            if (!newLeader.epoch.isPresent() || 
!newLeader.leaderId.isPresent()) {
+            if (newLeader.epoch.isEmpty() || newLeader.leaderId.isEmpty()) {
                 log.debug("For {}, incoming leader information is incomplete 
{}", partition, newLeader);
                 continue;
             }
@@ -404,7 +404,7 @@ public class Metadata implements Closeable {
                 log.debug("For {}, incoming leader({}), the corresponding node 
information for node-id {} is missing, so ignoring.", partition, newLeader, 
newLeader.leaderId.get());
                 continue;
             }
-            if 
(!this.metadataSnapshot.partitionMetadata(partition).isPresent()) {
+            if (this.metadataSnapshot.partitionMetadata(partition).isEmpty()) {
                 log.debug("For {}, incoming leader({}), partition metadata is 
no longer cached, ignoring.", partition, newLeader);
                 continue;
             }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/MetadataSnapshot.java 
b/clients/src/main/java/org/apache/kafka/clients/MetadataSnapshot.java
index 6d84a738c1d..424ecb0d291 100644
--- a/clients/src/main/java/org/apache/kafka/clients/MetadataSnapshot.java
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataSnapshot.java
@@ -133,7 +133,7 @@ public class MetadataSnapshot {
      */
     public OptionalInt leaderEpochFor(TopicPartition tp) {
         PartitionMetadata partitionMetadata = metadataByPartition.get(tp);
-        if (partitionMetadata == null || 
!partitionMetadata.leaderEpoch.isPresent()) {
+        if (partitionMetadata == null || 
partitionMetadata.leaderEpoch.isEmpty()) {
             return OptionalInt.empty();
         } else {
             return OptionalInt.of(partitionMetadata.leaderEpoch.get());
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
index 4f3dc837cde..4cbc5b4b43b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
@@ -23,9 +23,9 @@ import org.apache.kafka.common.GroupType;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.acl.AclOperation;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -84,8 +84,7 @@ public class ConsumerGroupDescription {
                                     Set<AclOperation> authorizedOperations) {
         this.groupId = groupId == null ? "" : groupId;
         this.isSimpleConsumerGroup = isSimpleConsumerGroup;
-        this.members = members == null ? Collections.emptyList() :
-            Collections.unmodifiableList(new ArrayList<>(members));
+        this.members = members == null ? Collections.emptyList() : 
List.copyOf(members);
         this.partitionAssignor = partitionAssignor == null ? "" : 
partitionAssignor;
         this.type = type;
         this.groupState = GroupState.parse(state.name());
@@ -122,8 +121,7 @@ public class ConsumerGroupDescription {
                                     Set<AclOperation> authorizedOperations) {
         this.groupId = groupId == null ? "" : groupId;
         this.isSimpleConsumerGroup = isSimpleConsumerGroup;
-        this.members = members == null ? Collections.emptyList() :
-                Collections.unmodifiableList(new ArrayList<>(members));
+        this.members = members == null ? Collections.emptyList() : 
List.copyOf(members);
         this.partitionAssignor = partitionAssignor == null ? "" : 
partitionAssignor;
         this.type = type;
         this.groupState = groupState;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
index 90ddbd0582c..5c1e60b1a61 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
@@ -42,7 +42,7 @@ public class DeleteConsumerGroupsResult {
      */
     public Map<String, KafkaFuture<Void>> deletedGroups() {
         Map<String, KafkaFuture<Void>> deletedGroups = new 
HashMap<>(futures.size());
-        futures.forEach((key, future) -> deletedGroups.put(key, future));
+        deletedGroups.putAll(futures);
         return deletedGroups;
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
index 2eddd7ee28c..f13fd2a8e7d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
@@ -125,7 +125,7 @@ public class DescribeUserScramCredentialsResult {
                 // for users 1, 2, and 3 but this is looking for user 4), so 
explicitly take care of that case
                 
Optional<DescribeUserScramCredentialsResponseData.DescribeUserScramCredentialsResult>
 optionalUserResult =
                         data.results().stream().filter(result -> 
result.user().equals(userName)).findFirst();
-                if (!optionalUserResult.isPresent()) {
+                if (optionalUserResult.isEmpty()) {
                     retval.completeExceptionally(new 
ResourceNotFoundException("No such user: " + userName));
                 } else {
                     
DescribeUserScramCredentialsResponseData.DescribeUserScramCredentialsResult 
userResult = optionalUserResult.get();
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java
index 495ddb09745..ec30b83baf7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java
@@ -19,7 +19,6 @@ package org.apache.kafka.clients.admin;
 import org.apache.kafka.common.TopicPartition;
 
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -36,8 +35,7 @@ public class MemberAssignment {
      * @param topicPartitions List of topic partitions
      */
     public MemberAssignment(Set<TopicPartition> topicPartitions) {
-        this.topicPartitions = topicPartitions == null ? 
Collections.emptySet() :
-            Collections.unmodifiableSet(new HashSet<>(topicPartitions));
+        this.topicPartitions = topicPartitions == null ? 
Collections.emptySet() : Set.copyOf(topicPartitions);
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java
index 02bce5e98c4..0a37c012cc7 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java
@@ -17,8 +17,6 @@
 
 package org.apache.kafka.clients.admin;
 
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -34,7 +32,7 @@ public class NewPartitionReassignment {
     public NewPartitionReassignment(List<Integer> targetReplicas) {
         if (targetReplicas == null || targetReplicas.isEmpty())
             throw new IllegalArgumentException("Cannot create a new partition 
reassignment without any replicas");
-        this.targetReplicas = Collections.unmodifiableList(new 
ArrayList<>(targetReplicas));
+        this.targetReplicas = List.copyOf(targetReplicas);
     }
 
     public List<Integer> targetReplicas() {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupDescription.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupDescription.java
index 52fe0136aa2..4a2292a4d45 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupDescription.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupDescription.java
@@ -22,9 +22,9 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -53,8 +53,7 @@ public class ShareGroupDescription {
                                  Node coordinator,
                                  Set<AclOperation> authorizedOperations) {
         this.groupId = groupId == null ? "" : groupId;
-        this.members = members == null ? Collections.emptyList() :
-            Collections.unmodifiableList(new ArrayList<>(members));
+        this.members = members == null ? Collections.emptyList() : 
List.copyOf(members);
         this.groupState = groupState;
         this.coordinator = coordinator;
         this.authorizedOperations = authorizedOperations;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/UserScramCredentialsDescription.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/UserScramCredentialsDescription.java
index 97bc3588af6..03a713149be 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/UserScramCredentialsDescription.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/UserScramCredentialsDescription.java
@@ -17,8 +17,6 @@
 
 package org.apache.kafka.clients.admin;
 
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 
@@ -61,7 +59,7 @@ public class UserScramCredentialsDescription {
      */
     public UserScramCredentialsDescription(String name, 
List<ScramCredentialInfo> credentialInfos) {
         this.name = Objects.requireNonNull(name);
-        this.credentialInfos = Collections.unmodifiableList(new 
ArrayList<>(credentialInfos));
+        this.credentialInfos = List.copyOf(credentialInfos);
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
index 19e3f13ebef..6286f59ed71 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
@@ -339,7 +339,7 @@ public class AdminApiDriver<K, V> {
             }
 
             // Copy the keys to avoid exposing the underlying mutable set
-            Set<K> copyKeys = Collections.unmodifiableSet(new HashSet<>(keys));
+            Set<K> copyKeys = Set.copyOf(keys);
 
             Collection<AdminApiHandler.RequestAndKeys<K>> newRequests = 
buildRequest.apply(copyKeys, scope);
             if (newRequests.isEmpty()) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
index 09fd000e50e..c1e92ca26fa 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
@@ -259,7 +259,7 @@ public class AdminMetadataManager {
     public void transitionToUpdatePending(long now) {
         this.state = State.UPDATE_PENDING;
         this.lastMetadataFetchAttemptMs = now;
-        if (!metadataAttemptStartMs.isPresent())
+        if (metadataAttemptStartMs.isEmpty())
             metadataAttemptStartMs = Optional.of(now);
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AllBrokersStrategy.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AllBrokersStrategy.java
index 433d25e8e54..ed5f513e020 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AllBrokersStrategy.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AllBrokersStrategy.java
@@ -195,7 +195,7 @@ public class AllBrokersStrategy implements 
AdminApiLookupStrategy<AllBrokersStra
         }
 
         private KafkaFutureImpl<V> futureOrThrow(BrokerKey key) {
-            if (!key.brokerId.isPresent()) {
+            if (key.brokerId.isEmpty()) {
                 throw new IllegalArgumentException("Attempt to complete with 
invalid key: " + key);
             } else {
                 int brokerId = key.brokerId.getAsInt();
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandler.java
index 56318fc0acc..71b8e1a7c56 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandler.java
@@ -124,7 +124,7 @@ public class ListTransactionsHandler extends 
AdminApiHandler.Batched<AllBrokersS
         }
 
         AllBrokersStrategy.BrokerKey key = keys.iterator().next();
-        if (!key.brokerId.isPresent() || key.brokerId.getAsInt() != brokerId) {
+        if (key.brokerId.isEmpty() || key.brokerId.getAsInt() != brokerId) {
             throw new IllegalArgumentException("Unexpected broker key: " + 
key);
         }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index ff9bb6c1166..41702835d46 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -61,12 +61,7 @@ public class ConsumerConfig extends AbstractConfig {
     // a list contains all the assignor names that only assign subscribed 
topics to consumer. Should be updated when new assignor added.
     // This is to help optimize ConsumerCoordinator#performAssignment method
     public static final List<String> ASSIGN_FROM_SUBSCRIBED_ASSIGNORS =
-        Collections.unmodifiableList(Arrays.asList(
-            RANGE_ASSIGNOR_NAME,
-            ROUNDROBIN_ASSIGNOR_NAME,
-            STICKY_ASSIGNOR_NAME,
-            COOPERATIVE_STICKY_ASSIGNOR_NAME
-        ));
+        List.of(RANGE_ASSIGNOR_NAME, ROUNDROBIN_ASSIGNOR_NAME, 
STICKY_ASSIGNOR_NAME, COOPERATIVE_STICKY_ASSIGNOR_NAME);
 
     /*
      * NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES 
AS
@@ -709,7 +704,7 @@ public class ConsumerConfig extends AbstractConfig {
         Optional<String> groupId = 
Optional.ofNullable(getString(CommonClientConfigs.GROUP_ID_CONFIG));
         Map<String, Object> originals = originals();
         boolean enableAutoCommit = 
originals.containsKey(ENABLE_AUTO_COMMIT_CONFIG) ? 
getBoolean(ENABLE_AUTO_COMMIT_CONFIG) : false;
-        if (!groupId.isPresent()) { // overwrite in case of default group id 
where the config is not explicitly provided
+        if (groupId.isEmpty()) { // overwrite in case of default group id 
where the config is not explicitly provided
             if (!originals.containsKey(ENABLE_AUTO_COMMIT_CONFIG)) {
                 configs.put(ENABLE_AUTO_COMMIT_CONFIG, false);
             } else if (enableAutoCommit) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
index f36f4cdc7b8..535313b1f2f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
@@ -115,7 +115,7 @@ public class ConsumerRecords<K, V> implements 
Iterable<ConsumerRecord<K, V>> {
 
         @Override
         public Iterator<ConsumerRecord<K, V>> iterator() {
-            return new AbstractIterator<ConsumerRecord<K, V>>() {
+            return new AbstractIterator<>() {
                 final Iterator<? extends Iterable<ConsumerRecord<K, V>>> iters 
= iterables.iterator();
                 Iterator<ConsumerRecord<K, V>> current;
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 4bdffc48c23..a5143749b89 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -553,7 +553,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     public synchronized Set<TopicPartition> paused() {
-        return Collections.unmodifiableSet(new HashSet<>(paused));
+        return Set.copyOf(paused);
     }
 
     private void ensureNotClosed() {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
index 2890f8e3332..93b6094fcfc 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.TopicPartition;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.Set;
 
 /**
@@ -40,7 +39,7 @@ public class NoOffsetForPartitionException extends 
InvalidOffsetException {
 
     public NoOffsetForPartitionException(Collection<TopicPartition> 
partitions) {
         super("Undefined offset with no reset policy for partitions: " + 
partitions);
-        this.partitions = Collections.unmodifiableSet(new 
HashSet<>(partitions));
+        this.partitions = Set.copyOf(partitions);
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
index 6bc064006fe..f5be90b712e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
@@ -269,7 +269,7 @@ public class RangeAssignor extends 
AbstractPartitionAssignor {
         boolean racksMatch(String consumer, TopicPartition tp) {
             Optional<String> consumerRack = consumers.get(consumer);
             Set<String> replicaRacks = partitionRacks.get(tp);
-            return !consumerRack.isPresent() || (replicaRacks != null && 
replicaRacks.contains(consumerRack.get()));
+            return consumerRack.isEmpty() || (replicaRacks != null && 
replicaRacks.contains(consumerRack.get()));
         }
 
         int maxAssignable(String consumer) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index f177dad62b9..9860d2f5890 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -568,7 +568,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
             if (lastRebalanceStartMs == -1L)
                 lastRebalanceStartMs = time.milliseconds();
             joinFuture = sendJoinGroupRequest();
-            joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
+            joinFuture.addListener(new RequestFutureListener<>() {
                 @Override
                 public void onSuccess(ByteBuffer value) {
                     // do nothing since all the handler logic are in 
SyncGroupResponseHandler already
@@ -1188,7 +1188,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
     }
 
     protected boolean isDynamicMember() {
-        return !rebalanceConfig.groupInstanceId.isPresent();
+        return rebalanceConfig.groupInstanceId.isEmpty();
     }
 
     private class LeaveGroupResponseHandler extends 
CoordinatorResponseHandler<LeaveGroupResponse, Void> {
@@ -1528,7 +1528,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
                         } else {
                             heartbeat.sentHeartbeat(now);
                             final RequestFuture<Void> heartbeatFuture = 
sendHeartbeatRequest();
-                            heartbeatFuture.addListener(new 
RequestFutureListener<Void>() {
+                            heartbeatFuture.addListener(new 
RequestFutureListener<>() {
                                 @Override
                                 public void onSuccess(Void value) {
                                     synchronized (AbstractCoordinator.this) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
index 4802eb7a120..e3d4eb58af4 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
@@ -416,7 +416,7 @@ public abstract class AbstractFetch implements Closeable {
 
             Optional<Node> leaderOpt = position.currentLeader.leader;
 
-            if (!leaderOpt.isPresent()) {
+            if (leaderOpt.isEmpty()) {
                 log.debug("Requesting metadata update for partition {} since 
the position {} is missing the current leader node", partition, position);
                 metadata.requestUpdate(false);
                 continue;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
index 966b44b59a6..4ac1513ede5 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
@@ -892,7 +892,7 @@ public abstract class AbstractStickyAssignor extends 
AbstractPartitionAssignor {
 
             List<TopicPartition> unassignedPartitions = new 
ArrayList<>(totalPartitionsCount - sortedAssignedPartitions.size());
 
-            Collections.sort(sortedAssignedPartitions, 
Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
+            
sortedAssignedPartitions.sort(Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
 
             boolean shouldAddDirectly = false;
             Iterator<TopicPartition> sortedAssignedPartitionsIter = 
sortedAssignedPartitions.iterator();
@@ -991,7 +991,7 @@ public abstract class AbstractStickyAssignor extends 
AbstractPartitionAssignor {
                     currentPartitionConsumer.put(topicPartition, 
entry.getKey());
 
             List<String> sortedAllTopics = new 
ArrayList<>(topic2AllPotentialConsumers.keySet());
-            Collections.sort(sortedAllTopics, new 
TopicComparator(topic2AllPotentialConsumers));
+            sortedAllTopics.sort(new 
TopicComparator(topic2AllPotentialConsumers));
             sortedAllPartitions = getAllTopicPartitions(sortedAllTopics);
 
             sortedCurrentSubscriptions = new TreeSet<>(new 
SubscriptionComparator(currentAssignment));
@@ -1084,7 +1084,7 @@ public abstract class AbstractStickyAssignor extends 
AbstractPartitionAssignor {
 
             List<TopicPartition> unassignedPartitions = new ArrayList<>();
 
-            Collections.sort(sortedAssignedPartitions, new 
PartitionComparator(topic2AllPotentialConsumers));
+            sortedAssignedPartitions.sort(new 
PartitionComparator(topic2AllPotentialConsumers));
 
             boolean shouldAddDirectly = false;
             Iterator<TopicPartition> sortedAssignedPartitionsIter = 
sortedAssignedPartitions.iterator();
@@ -1154,7 +1154,7 @@ public abstract class AbstractStickyAssignor extends 
AbstractPartitionAssignor {
                 if (memberData.generation.isPresent() && 
memberData.generation.get() < maxGeneration) {
                     // if the current member's generation is lower than 
maxGeneration, put into prevAssignment if needed
                     updatePrevAssignment(prevAssignment, 
memberData.partitions, consumer, memberData.generation.get());
-                } else if (!memberData.generation.isPresent() && maxGeneration 
> DEFAULT_GENERATION) {
+                } else if (memberData.generation.isEmpty() && maxGeneration > 
DEFAULT_GENERATION) {
                     // if maxGeneration is larger than DEFAULT_GENERATION
                     // put all (no generation) partitions as 
DEFAULT_GENERATION into prevAssignment if needed
                     updatePrevAssignment(prevAssignment, 
memberData.partitions, consumer, DEFAULT_GENERATION);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncClient.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncClient.java
index d4265e72c04..05f04cd6659 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncClient.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncClient.java
@@ -37,7 +37,7 @@ public abstract class AsyncClient<T1, Req extends 
AbstractRequest, Resp extends
     public RequestFuture<T2> sendAsyncRequest(Node node, T1 requestData) {
         AbstractRequest.Builder<Req> requestBuilder = prepareRequest(node, 
requestData);
 
-        return client.send(node, requestBuilder).compose(new 
RequestFutureAdapter<ClientResponse, T2>() {
+        return client.send(node, requestBuilder).compose(new 
RequestFutureAdapter<>() {
             @Override
             @SuppressWarnings("unchecked")
             public void onSuccess(ClientResponse value, RequestFuture<T2> 
future) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 3ab84fd7aec..033b16ba7b6 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -613,7 +613,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             groupRebalanceConfig.groupId,
             groupRebalanceConfig.groupInstanceId
         );
-        if (!groupMetadata.isPresent()) {
+        if (groupMetadata.isEmpty()) {
             config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
             config.ignore(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
         }
@@ -952,7 +952,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     }
 
     private void maybeThrowInvalidGroupIdException() {
-        if (!groupMetadata.get().isPresent()) {
+        if (groupMetadata.get().isEmpty()) {
             throw new InvalidGroupIdException("To use the group management or 
offset commit APIs, you must " +
                 "provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the 
consumer configuration.");
         }
@@ -1350,7 +1350,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     }
 
     private void autoCommitOnClose(final Timer timer) {
-        if (!groupMetadata.get().isPresent())
+        if (groupMetadata.get().isEmpty())
             return;
 
         if (autoCommitEnabled)
@@ -1390,7 +1390,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     }
 
     private void leaveGroupOnClose(final Timer timer) {
-        if (!groupMetadata.get().isPresent())
+        if (groupMetadata.get().isEmpty())
             return;
 
         log.debug("Leaving the consumer group during consumer close");
@@ -1490,7 +1490,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     @Override
     public Uuid clientInstanceId(Duration timeout) {
-        if (!clientTelemetryReporter.isPresent()) {
+        if (clientTelemetryReporter.isEmpty()) {
             throw new IllegalStateException("Telemetry is not enabled. Set 
config `" + ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG + "` to `true`.");
         }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
index a3ac5e5698b..f5a672631ff 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
@@ -212,7 +212,7 @@ public class ClassicKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             );
 
             // no coordinator will be constructed for the default (null) group 
id
-            if (!groupId.isPresent()) {
+            if (groupId.isEmpty()) {
                 config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
                 config.ignore(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
                 this.coordinator = null;
@@ -414,7 +414,7 @@ public class ClassicKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     public Set<String> subscription() {
         acquireAndEnsureOpen();
         try {
-            return Collections.unmodifiableSet(new 
HashSet<>(this.subscriptions.subscription()));
+            return Set.copyOf(this.subscriptions.subscription());
         } finally {
             release();
         }
@@ -906,7 +906,7 @@ public class ClassicKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     @Override
     public Uuid clientInstanceId(Duration timeout) {
-        if (!clientTelemetryReporter.isPresent()) {
+        if (clientTelemetryReporter.isEmpty()) {
             throw new IllegalStateException("Telemetry is not enabled. Set 
config `" + ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG + "` to `true`.");
 
         }
@@ -1258,7 +1258,7 @@ public class ClassicKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     }
 
     private void maybeThrowInvalidGroupIdException() {
-        if (!groupId.isPresent())
+        if (groupId.isEmpty())
             throw new InvalidGroupIdException("To use the group management or 
offset commit APIs, you must " +
                     "provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in 
the consumer configuration.");
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index a5cb4753b38..41522889768 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -177,7 +177,7 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
     @Override
     public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
         // poll only when the coordinator node is known.
-        if (!coordinatorRequestManager.coordinator().isPresent())
+        if (coordinatorRequestManager.coordinator().isEmpty())
             return EMPTY;
 
         if (closing) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 8647476758f..584a03736f9 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -1031,7 +1031,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
             // the same order that they were added. Note also that 
AbstractCoordinator prevents
             // multiple concurrent coordinator lookup requests.
             pendingAsyncCommits.incrementAndGet();
-            lookupCoordinator().addListener(new RequestFutureListener<Void>() {
+            lookupCoordinator().addListener(new RequestFutureListener<>() {
                 @Override
                 public void onSuccess(Void value) {
                     pendingAsyncCommits.decrementAndGet();
@@ -1059,7 +1059,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
         inFlightAsyncCommits.incrementAndGet();
         final OffsetCommitCallback cb = callback == null ? 
defaultOffsetCommitCallback : callback;
-        future.addListener(new RequestFutureListener<Void>() {
+        future.addListener(new RequestFutureListener<>() {
             @Override
             public void onSuccess(Void value) {
                 inFlightAsyncCommits.decrementAndGet();
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
index a9e1bf46bed..c1e367055e4 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
@@ -210,7 +210,7 @@ public class CoordinatorRequestManager implements 
RequestManager {
     ) {
         // handles Runtime exception
         Optional<FindCoordinatorResponseData.Coordinator> coordinator = 
response.coordinatorByKey(this.groupId);
-        if (!coordinator.isPresent()) {
+        if (coordinator.isEmpty()) {
             String msg = String.format("Response did not contain expected 
coordinator section for groupId: %s", this.groupId);
             onFailedResponse(currentTimeMs, new IllegalStateException(msg));
             return;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java
index 4127603372d..fa45de7e2cb 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java
@@ -32,9 +32,9 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
 
 public class Fetch<K, V> {
     private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
+    private final Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadata;
     private boolean positionAdvanced;
     private int numRecords;
-    private Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadata;
 
     public static <K, V> Fetch<K, V> empty() {
         return new Fetch<>(new HashMap<>(), false, 0, new HashMap<>());
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java
index 794ff3aceba..94e76edd0a5 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java
@@ -347,7 +347,7 @@ public class FetchCollector<K, V> {
         } else if (error == Errors.OFFSET_OUT_OF_RANGE) {
             Optional<Integer> clearedReplicaId = 
subscriptions.clearPreferredReadReplica(tp);
 
-            if (!clearedReplicaId.isPresent()) {
+            if (clearedReplicaId.isEmpty()) {
                 // If there's no preferred replica to clear, we're fetching 
from the leader so handle this error normally
                 SubscriptionState.FetchPosition position = 
subscriptions.positionOrNull(tp);
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 745bbfde992..ac86d1ebeaa 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -192,7 +192,7 @@ public class Fetcher<K, V> extends AbstractFetch {
             final FetchRequest.Builder request = 
createFetchRequest(fetchTarget, data);
             final RequestFuture<ClientResponse> responseFuture = 
client.send(fetchTarget, request);
 
-            responseFuture.addListener(new 
RequestFutureListener<ClientResponse>() {
+            responseFuture.addListener(new RequestFutureListener<>() {
                 @Override
                 public void onSuccess(ClientResponse resp) {
                     successHandler.handle(fetchTarget, data, resp);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
index 56e4d697748..fd05f898355 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
@@ -413,7 +413,7 @@ public class NetworkClientDelegate implements AutoCloseable 
{
                                                            final Sensor 
throttleTimeSensor,
                                                            final 
ClientTelemetrySender clientTelemetrySender,
                                                            final 
BackgroundEventHandler backgroundEventHandler) {
-        return new CachedSupplier<NetworkClientDelegate>() {
+        return new CachedSupplier<>() {
             @Override
             protected NetworkClientDelegate create() {
                 KafkaClient client = ClientUtils.createNetworkClient(config,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
index e5a8ba197a1..f7646bff9ed 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
@@ -144,7 +144,7 @@ public class OffsetFetcher {
         do {
             RequestFuture<ListOffsetResult> future = 
sendListOffsetsRequests(remainingToSearch, requireTimestamps);
 
-            future.addListener(new RequestFutureListener<ListOffsetResult>() {
+            future.addListener(new RequestFutureListener<>() {
                 @Override
                 public void onSuccess(ListOffsetResult value) {
                     synchronized (future) {
@@ -218,7 +218,7 @@ public class OffsetFetcher {
             subscriptions.setNextAllowedRetry(resetTimestamps.keySet(), 
time.milliseconds() + requestTimeoutMs);
 
             RequestFuture<ListOffsetResult> future = 
sendListOffsetRequest(node, resetTimestamps, false);
-            future.addListener(new RequestFutureListener<ListOffsetResult>() {
+            future.addListener(new RequestFutureListener<>() {
                 @Override
                 public void onSuccess(ListOffsetResult result) {
                     
offsetFetcherUtils.onSuccessfulResponseForResettingPositions(resetTimestamps, 
result);
@@ -271,7 +271,7 @@ public class OffsetFetcher {
             RequestFuture<OffsetForEpochResult> future =
                     offsetsForLeaderEpochClient.sendAsyncRequest(node, 
fetchPositions);
 
-            future.addListener(new 
RequestFutureListener<OffsetForEpochResult>() {
+            future.addListener(new RequestFutureListener<>() {
                 @Override
                 public void onSuccess(OffsetForEpochResult offsetsResult) {
                     
offsetFetcherUtils.onSuccessfulResponseForValidatingPositions(fetchPositions,
@@ -308,7 +308,7 @@ public class OffsetFetcher {
 
         for (Map.Entry<Node, Map<TopicPartition, ListOffsetsPartition>> entry 
: timestampsToSearchByNode.entrySet()) {
             RequestFuture<ListOffsetResult> future = 
sendListOffsetRequest(entry.getKey(), entry.getValue(), requireTimestamps);
-            future.addListener(new RequestFutureListener<ListOffsetResult>() {
+            future.addListener(new RequestFutureListener<>() {
                 @Override
                 public void onSuccess(ListOffsetResult partialResult) {
                     synchronized (listOffsetRequestsFuture) {
@@ -352,7 +352,7 @@ public class OffsetFetcher {
             Long offset = entry.getValue();
             Metadata.LeaderAndEpoch leaderAndEpoch = 
metadata.currentLeader(tp);
 
-            if (!leaderAndEpoch.leader.isPresent()) {
+            if (leaderAndEpoch.leader.isEmpty()) {
                 log.debug("Leader for partition {} is unknown for fetching 
offset {}", tp, offset);
                 metadata.requestUpdate(true);
                 partitionsToRetry.add(tp);
@@ -397,7 +397,7 @@ public class OffsetFetcher {
 
         log.debug("Sending ListOffsetRequest {} to broker {}", builder, node);
         return client.send(node, builder)
-                .compose(new RequestFutureAdapter<ClientResponse, 
ListOffsetResult>() {
+                .compose(new RequestFutureAdapter<>() {
                     @Override
                     public void onSuccess(ClientResponse response, 
RequestFuture<ListOffsetResult> future) {
                         ListOffsetsResponse lor = (ListOffsetsResponse) 
response.responseBody();
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
index 6d296149b70..db847eae831 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
@@ -894,7 +894,7 @@ public final class OffsetsRequestManager implements 
RequestManager, ClusterResou
             Long offset = entry.getValue();
             Metadata.LeaderAndEpoch leaderAndEpoch = 
metadata.currentLeader(tp);
 
-            if (!leaderAndEpoch.leader.isPresent()) {
+            if (leaderAndEpoch.leader.isEmpty()) {
                 log.debug("Leader for partition {} is unknown for fetching 
offset {}", tp, offset);
                 metadata.requestUpdate(true);
                 listOffsetsRequestState.ifPresent(offsetsRequestState -> 
offsetsRequestState.remainingToSearch.put(tp, offset));
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
index 034efd4dba8..ed75524ac81 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
@@ -200,7 +200,7 @@ public class RequestFuture<T> implements 
ConsumerNetworkClient.PollCondition {
      */
     public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> 
adapter) {
         final RequestFuture<S> adapted = new RequestFuture<>();
-        addListener(new RequestFutureListener<T>() {
+        addListener(new RequestFutureListener<>() {
             @Override
             public void onSuccess(T value) {
                 adapter.onSuccess(value, adapted);
@@ -215,7 +215,7 @@ public class RequestFuture<T> implements 
ConsumerNetworkClient.PollCondition {
     }
 
     public void chain(final RequestFuture<T> future) {
-        addListener(new RequestFutureListener<T>() {
+        addListener(new RequestFutureListener<>() {
             @Override
             public void onSuccess(T value) {
                 future.complete(value);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index 7f682c81fc4..304f0fffd4a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -160,7 +160,7 @@ public class RequestManagers implements Closeable {
                                                      final 
OffsetCommitCallbackInvoker offsetCommitCallbackInvoker,
                                                      final MemberStateListener 
applicationThreadMemberStateListener
                                                      ) {
-        return new CachedSupplier<RequestManagers>() {
+        return new CachedSupplier<>() {
             @Override
             protected RequestManagers create() {
                 final NetworkClientDelegate networkClientDelegate = 
networkClientDelegateSupplier.get();
@@ -284,7 +284,7 @@ public class RequestManagers implements Closeable {
                                                      final 
Optional<ClientTelemetryReporter> clientTelemetryReporter,
                                                      final Metrics metrics
     ) {
-        return new CachedSupplier<RequestManagers>() {
+        return new CachedSupplier<>() {
             @Override
             protected RequestManagers create() {
                 long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index a07b2be9084..cda0196abe9 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -150,7 +150,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
         for (TopicPartition partition : partitionsToFetch()) {
             Optional<Node> leaderOpt = 
metadata.currentLeader(partition).leader;
 
-            if (!leaderOpt.isPresent()) {
+            if (leaderOpt.isEmpty()) {
                 log.debug("Requesting metadata update for partition {} since 
current leader node is missing", partition);
                 metadata.requestUpdate(false);
                 continue;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index e209ec00b0d..c4f77bf1c6f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -785,7 +785,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
      */
     @Override
     public Uuid clientInstanceId(final Duration timeout) {
-        if (!clientTelemetryReporter.isPresent()) {
+        if (clientTelemetryReporter.isEmpty()) {
             throw new IllegalStateException("Telemetry is not enabled. Set 
config `" + ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG + "` to `true`.");
         }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 8871694db45..7bd4217d6c2 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -985,7 +985,7 @@ public class SubscriptionState {
                 return false;
             }
 
-            if (!currentLeaderAndEpoch.leader.isPresent()) {
+            if (currentLeaderAndEpoch.leader.isEmpty()) {
                 return false;
             }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 7bd2d1f28b7..d2e45370c66 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -281,7 +281,7 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
      * it is already a member on the next poll.
      */
     private void process(final TopicSubscriptionChangeEvent event) {
-        if (!requestManagers.consumerHeartbeatRequestManager.isPresent()) {
+        if (requestManagers.consumerHeartbeatRequestManager.isEmpty()) {
             log.warn("Group membership manager not present when processing a 
subscribe event");
             event.future().complete(null);
             return;
@@ -381,7 +381,7 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
     }
 
     private void process(final ConsumerRebalanceListenerCallbackCompletedEvent 
event) {
-        if (!requestManagers.consumerHeartbeatRequestManager.isPresent()) {
+        if (requestManagers.consumerHeartbeatRequestManager.isEmpty()) {
             log.warn(
                 "An internal error occurred; the group membership manager was 
not present, so the notification of the {} callback execution could not be 
sent",
                 event.methodName()
@@ -392,14 +392,14 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
     }
 
     private void process(@SuppressWarnings("unused") final CommitOnCloseEvent 
event) {
-        if (!requestManagers.commitRequestManager.isPresent())
+        if (requestManagers.commitRequestManager.isEmpty())
             return;
         log.debug("Signal CommitRequestManager closing");
         requestManagers.commitRequestManager.get().signalClose();
     }
 
     private void process(final LeaveGroupOnCloseEvent event) {
-        if (!requestManagers.consumerMembershipManager.isPresent())
+        if (requestManagers.consumerMembershipManager.isEmpty())
             return;
 
         log.debug("Signal the ConsumerMembershipManager to leave the consumer 
group since the consumer is closing");
@@ -418,7 +418,7 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
      * Process event that indicates the consumer acknowledged delivery of 
records synchronously.
      */
     private void process(final ShareAcknowledgeSyncEvent event) {
-        if (!requestManagers.shareConsumeRequestManager.isPresent()) {
+        if (requestManagers.shareConsumeRequestManager.isEmpty()) {
             return;
         }
 
@@ -432,7 +432,7 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
      * Process event that indicates the consumer acknowledged delivery of 
records asynchronously.
      */
     private void process(final ShareAcknowledgeAsyncEvent event) {
-        if (!requestManagers.shareConsumeRequestManager.isPresent()) {
+        if (requestManagers.shareConsumeRequestManager.isEmpty()) {
             return;
         }
 
@@ -446,7 +446,7 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
      * it is already a member.
      */
     private void process(final ShareSubscriptionChangeEvent event) {
-        if (!requestManagers.shareHeartbeatRequestManager.isPresent()) {
+        if (requestManagers.shareHeartbeatRequestManager.isEmpty()) {
             KafkaException error = new KafkaException("Group membership 
manager not present when processing a subscribe event");
             event.future().completeExceptionally(error);
             return;
@@ -469,7 +469,7 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
      *              the group is sent out.
      */
     private void process(final ShareUnsubscribeEvent event) {
-        if (!requestManagers.shareHeartbeatRequestManager.isPresent()) {
+        if (requestManagers.shareHeartbeatRequestManager.isEmpty()) {
             KafkaException error = new KafkaException("Group membership 
manager not present when processing an unsubscribe event");
             event.future().completeExceptionally(error);
             return;
@@ -490,7 +490,7 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
      *              the acknowledgements have responses.
      */
     private void process(final ShareAcknowledgeOnCloseEvent event) {
-        if (!requestManagers.shareConsumeRequestManager.isPresent()) {
+        if (requestManagers.shareConsumeRequestManager.isEmpty()) {
             KafkaException error = new KafkaException("Group membership 
manager not present when processing an acknowledge-on-close event");
             event.future().completeExceptionally(error);
             return;
@@ -507,7 +507,7 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
      * @param event Event containing a boolean to indicate if the callback 
handler is configured or not.
      */
     private void process(final 
ShareAcknowledgementCommitCallbackRegistrationEvent event) {
-        if (!requestManagers.shareConsumeRequestManager.isPresent()) {
+        if (requestManagers.shareConsumeRequestManager.isEmpty()) {
             return;
         }
 
@@ -532,7 +532,7 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
                                                                final 
ConsumerMetadata metadata,
                                                                final 
SubscriptionState subscriptions,
                                                                final 
Supplier<RequestManagers> requestManagersSupplier) {
-        return new CachedSupplier<ApplicationEventProcessor>() {
+        return new CachedSupplier<>() {
             @Override
             protected ApplicationEventProcessor create() {
                 RequestManagers requestManagers = 
requestManagersSupplier.get();
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeEvent.java
index 68e1bbc5e6d..7f48ee644c7 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeEvent.java
@@ -19,8 +19,7 @@ package org.apache.kafka.clients.consumer.internals.events;
 import org.apache.kafka.common.TopicPartition;
 
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
+import java.util.Set;
 
 public class AssignmentChangeEvent extends CompletableApplicationEvent<Void> {
 
@@ -30,7 +29,7 @@ public class AssignmentChangeEvent extends 
CompletableApplicationEvent<Void> {
     public AssignmentChangeEvent(final long currentTimeMs, final long 
deadlineMs, final Collection<TopicPartition> partitions) {
         super(Type.ASSIGNMENT_CHANGE, deadlineMs);
         this.currentTimeMs = currentTimeMs;
-        this.partitions = Collections.unmodifiableSet(new 
HashSet<>(partitions));
+        this.partitions = Set.copyOf(partitions);
     }
 
     public long currentTimeMs() {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java
index 9c94d3b9d3b..a05ce638ece 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java
@@ -40,7 +40,7 @@ public abstract class CommitEvent extends 
CompletableApplicationEvent<Map<TopicP
      * {@link Collections#unmodifiableMap(Map) as unmodifiable}.
      */
     private static Optional<Map<TopicPartition, OffsetAndMetadata>> 
validate(final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets) {
-        if (!offsets.isPresent()) {
+        if (offsets.isEmpty()) {
             return Optional.empty();
         }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgeOnCloseEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgeOnCloseEvent.java
index 23d4410d07b..0916ab8666c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgeOnCloseEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgeOnCloseEvent.java
@@ -23,7 +23,7 @@ import java.util.Map;
 
 public class ShareAcknowledgeOnCloseEvent extends 
CompletableApplicationEvent<Void> {
 
-    private Map<TopicIdPartition, Acknowledgements> acknowledgementsMap;
+    private final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap;
 
     public ShareAcknowledgeOnCloseEvent(final Map<TopicIdPartition, 
Acknowledgements> acknowledgementsMap, final long deadlineMs) {
         super(Type.SHARE_ACKNOWLEDGE_ON_CLOSE, deadlineMs);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgeSyncEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgeSyncEvent.java
index db3259a6779..49cb422e633 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgeSyncEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgeSyncEvent.java
@@ -23,7 +23,7 @@ import java.util.Map;
 
 public class ShareAcknowledgeSyncEvent extends 
CompletableApplicationEvent<Map<TopicIdPartition, Acknowledgements>> {
 
-    private Map<TopicIdPartition, Acknowledgements> acknowledgementsMap;
+    private final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap;
 
     public ShareAcknowledgeSyncEvent(final Map<TopicIdPartition, 
Acknowledgements> acknowledgementsMap, final long deadlineMs) {
         super(Type.SHARE_ACKNOWLEDGE_SYNC, deadlineMs);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareFetchEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareFetchEvent.java
index d5ce57b947a..2a2b56e87cd 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareFetchEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareFetchEvent.java
@@ -23,7 +23,7 @@ import java.util.Map;
 
 public class ShareFetchEvent extends ApplicationEvent {
 
-    private Map<TopicIdPartition, Acknowledgements> acknowledgementsMap;
+    private final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap;
 
     public ShareFetchEvent(Map<TopicIdPartition, Acknowledgements> 
acknowledgementsMap) {
         super(Type.SHARE_FETCH);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index a77b3f8809f..7bc4b47ab08 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -1337,7 +1337,7 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
      */
     @Override
     public Uuid clientInstanceId(Duration timeout) {
-        if (!clientTelemetryReporter.isPresent()) {
+        if (clientTelemetryReporter.isEmpty()) {
             throw new IllegalStateException("Telemetry is not enabled. Set 
config `" + ProducerConfig.ENABLE_METRICS_PUSH_CONFIG + "` to `true`.");
         }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index f70c1a33814..5619819dde7 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -112,7 +112,7 @@ public final class ProducerBatch {
      */
     void maybeUpdateLeaderEpoch(OptionalInt latestLeaderEpoch) {
         if (latestLeaderEpoch.isPresent()
-            && (!currentLeaderEpoch.isPresent() || 
currentLeaderEpoch.getAsInt() < latestLeaderEpoch.getAsInt())) {
+            && (currentLeaderEpoch.isEmpty() || currentLeaderEpoch.getAsInt() 
< latestLeaderEpoch.getAsInt())) {
             log.trace("For {}, leader will be updated, currentLeaderEpoch: {}, 
attemptsWhenLeaderLastChanged:{}, latestLeaderEpoch: {}, current attempt: {}",
                 this, currentLeaderEpoch, attemptsWhenLeaderLastChanged, 
latestLeaderEpoch, attempts);
             attemptsWhenLeaderLastChanged = attempts();
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionMap.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionMap.java
index 05a0fa3fbbc..5d2be89a9dd 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionMap.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionMap.java
@@ -90,7 +90,7 @@ class TxnPartitionMap {
         // It might happen that the TransactionManager has been reset while a 
request was reenqueued and got a valid
         // response for this. This can happen only if the producer is only 
idempotent (not transactional) and in
         // this case there will be no tracked bookkeeper entry about it, so we 
have to insert one.
-        if (!lastAckedOffset.isPresent() && !isTransactional)
+        if (lastAckedOffset.isEmpty() && !isTransactional)
             getOrCreate(topicPartition);
         if (lastOffset > 
lastAckedOffset.orElse(ProduceResponse.INVALID_OFFSET))
             get(topicPartition).setLastAckedOffset(lastOffset);
diff --git a/clients/src/main/java/org/apache/kafka/common/cache/LRUCache.java 
b/clients/src/main/java/org/apache/kafka/common/cache/LRUCache.java
index 672cb65d66a..11f0be4f6e5 100644
--- a/clients/src/main/java/org/apache/kafka/common/cache/LRUCache.java
+++ b/clients/src/main/java/org/apache/kafka/common/cache/LRUCache.java
@@ -26,7 +26,7 @@ public class LRUCache<K, V> implements Cache<K, V> {
     private final LinkedHashMap<K, V> cache;
 
     public LRUCache(final int maxSize) {
-        cache = new LinkedHashMap<K, V>(16, .75f, true) {
+        cache = new LinkedHashMap<>(16, .75f, true) {
             @Override
             protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
                 return this.size() > maxSize; // require this. prefix to make 
lgtm.com happy
diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java 
b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 9cf5fbae515..970d9cebf72 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -1501,7 +1501,7 @@ public class ConfigDef {
         b.append("``").append(key.name).append("``").append("\n");
         if (key.documentation != null) {
             for (String docLine : key.documentation.split("\n")) {
-                if (docLine.length() == 0) {
+                if (docLine.isEmpty()) {
                     continue;
                 }
                 b.append("  ").append(docLine).append("\n\n");
@@ -1532,7 +1532,7 @@ public class ConfigDef {
         }
 
         List<ConfigKey> configs = new ArrayList<>(configKeys.values());
-        Collections.sort(configs, (k1, k2) -> compare(k1, k2, groupOrd));
+        configs.sort((k1, k2) -> compare(k1, k2, groupOrd));
         return configs;
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/SslClientAuth.java 
b/clients/src/main/java/org/apache/kafka/common/config/SslClientAuth.java
index 75f8e3640e9..c9552b7c44d 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/SslClientAuth.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/SslClientAuth.java
@@ -17,8 +17,6 @@
 
 package org.apache.kafka.common.config;
 
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
 
@@ -30,8 +28,7 @@ public enum SslClientAuth {
     REQUESTED,
     NONE;
 
-    public static final List<SslClientAuth> VALUES =
-            
Collections.unmodifiableList(Arrays.asList(SslClientAuth.values()));
+    public static final List<SslClientAuth> VALUES = 
List.of(SslClientAuth.values());
 
     public static SslClientAuth forConfig(String key) {
         if (key == null) {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java
 
b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java
index 7137f723d8f..52863c6c0b5 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java
@@ -123,7 +123,7 @@ public class RecordHeaders implements Headers {
     }
 
     private Iterator<Header> closeAware(final Iterator<Header> original) {
-        return new Iterator<Header>() {
+        return new Iterator<>() {
             @Override
             public boolean hasNext() {
                 return original.hasNext();
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 090d7c92eb0..614c8cd22e4 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -31,10 +31,7 @@ import java.util.Set;
 public class Protocol {
 
     private static String indentString(int size) {
-        StringBuilder b = new StringBuilder(size);
-        for (int i = 0; i < size; i++)
-            b.append(" ");
-        return b.toString();
+        return " ".repeat(Math.max(0, size));
     }
 
     private static void schemaToBnfHtml(Schema schema, StringBuilder b, int 
indentSize) {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
 
b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
index 9ab8715236e..e47d7c866cc 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
@@ -235,7 +235,7 @@ public abstract class AbstractLegacyRecordBatch extends 
AbstractRecordBatch impl
         if (isCompressed())
             return new DeepRecordsIterator(this, false, Integer.MAX_VALUE, 
bufferSupplier);
 
-        return new CloseableIterator<Record>() {
+        return new CloseableIterator<>() {
             private boolean hasNext = true;
 
             @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java 
b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
index 5fddaae4062..16ee3596ea3 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
@@ -71,7 +71,7 @@ public abstract class AbstractRecords implements Records {
     }
 
     private Iterator<Record> recordsIterator() {
-        return new AbstractIterator<Record>() {
+        return new AbstractIterator<>() {
             private final Iterator<? extends RecordBatch> batches = 
batches().iterator();
             private Iterator<Record> records;
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index c01bca2496e..650071474db 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -192,7 +192,7 @@ public class MemoryRecords extends AbstractRecords {
                 // in which case, we need to reset the base timestamp and 
overwrite the timestamp deltas
                 // if the batch does not contain tombstones, then we don't 
need to overwrite batch
                 boolean needToSetDeleteHorizon = batch.magic() >= 
RecordBatch.MAGIC_VALUE_V2 && (containsTombstones || containsMarkerForEmptyTxn)
-                    && !batch.deleteHorizonMs().isPresent();
+                    && batch.deleteHorizonMs().isEmpty();
                 if (writeOriginalBatch && !needToSetDeleteHorizon) {
                     batch.writeTo(bufferOutputStream);
                     filterResult.updateRetainedBatchMetadata(batch, 
retainedRecords.size(), false);
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index 0155fb8af5f..2327e910abe 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -251,7 +251,7 @@ public class ApiVersionsResponse extends AbstractResponse {
         for (ApiKeys apiKey : ApiKeys.apisForListener(listenerType)) {
             if (apiKey.minRequiredInterBrokerMagic <= minRecordVersion.value) {
                 final Optional<ApiVersion> brokerApiVersion = 
apiKey.toApiVersion(enableUnstableLastVersion);
-                if (!brokerApiVersion.isPresent()) {
+                if (brokerApiVersion.isEmpty()) {
                     // Broker does not support this API key.
                     continue;
                 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
index cb128b8b42c..f352ba4eea9 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.message.DescribeConfigsResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java 
b/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java
index 6211c5355cc..029b6881fdb 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java
@@ -23,9 +23,7 @@ import org.apache.kafka.common.network.ListenerName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -177,7 +175,7 @@ public class JaasContext {
         AppConfigurationEntry[] entries = 
configuration.getAppConfigurationEntry(name);
         if (entries == null)
             throw new IllegalArgumentException("Could not find a '" + name + 
"' entry in this JAAS configuration.");
-        this.configurationEntries = Collections.unmodifiableList(new 
ArrayList<>(Arrays.asList(entries)));
+        this.configurationEntries = List.of(entries);
         this.dynamicJaasConfig = dynamicJaasConfig;
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/auth/SaslExtensions.java
 
b/clients/src/main/java/org/apache/kafka/common/security/auth/SaslExtensions.java
index 8156c6fe231..3210c859baa 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/auth/SaslExtensions.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/auth/SaslExtensions.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.common.security.auth;
 
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.StringJoiner;
@@ -51,7 +50,7 @@ public class SaslExtensions {
     private final Map<String, String> extensionsMap;
 
     public SaslExtensions(Map<String, String> extensionsMap) {
-        this.extensionsMap = Collections.unmodifiableMap(new 
HashMap<>(extensionsMap));
+        this.extensionsMap = Map.copyOf(extensionsMap);
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerClientInitialResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerClientInitialResponse.java
index 3e29841baa5..75c62b696cf 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerClientInitialResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerClientInitialResponse.java
@@ -136,7 +136,7 @@ public class OAuthBearerClientInitialResponse {
     /**
      * Return the always non-null token value
      * 
-     * @return the always non-null toklen value
+     * @return the always non-null token value
      */
     public String tokenValue() {
         return tokenValue;
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java
 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java
index a878ae7a261..fdc5707278a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java
@@ -33,7 +33,6 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.io.UnsupportedEncodingException;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.net.URLEncoder;
@@ -261,14 +260,14 @@ public class HttpAccessTokenRetriever implements 
AccessTokenRetriever {
             ByteArrayOutputStream os = new ByteArrayOutputStream();
             log.debug("handleOutput - preparing to read response body from 
{}", con.getURL());
             copy(is, os);
-            responseBody = os.toString(StandardCharsets.UTF_8.name());
+            responseBody = os.toString(StandardCharsets.UTF_8);
         } catch (Exception e) {
             // there still can be useful error response from the servers, lets 
get it
             try (InputStream is = con.getErrorStream()) {
                 ByteArrayOutputStream os = new ByteArrayOutputStream();
                 log.debug("handleOutput - preparing to read error response 
body from {}", con.getURL());
                 copy(is, os);
-                errorResponseBody = os.toString(StandardCharsets.UTF_8.name());
+                errorResponseBody = os.toString(StandardCharsets.UTF_8);
             } catch (Exception e2) {
                 log.warn("handleOutput - error retrieving error information", 
e2);
             }
@@ -354,15 +353,14 @@ public class HttpAccessTokenRetriever implements 
AccessTokenRetriever {
         return sanitizeString("the token endpoint response's access_token JSON 
attribute", accessTokenNode.textValue());
     }
 
-    static String formatAuthorizationHeader(String clientId, String 
clientSecret, boolean urlencode) throws
-        UnsupportedEncodingException {
+    static String formatAuthorizationHeader(String clientId, String 
clientSecret, boolean urlencode) {
         clientId = sanitizeString("the token endpoint request client ID 
parameter", clientId);
         clientSecret = sanitizeString("the token endpoint request client 
secret parameter", clientSecret);
 
         // according to RFC-6749 clientId & clientSecret must be urlencoded, 
see https://tools.ietf.org/html/rfc6749#section-2.3.1
         if (urlencode) {
-            clientId = URLEncoder.encode(clientId, 
StandardCharsets.UTF_8.name());
-            clientSecret = URLEncoder.encode(clientSecret, 
StandardCharsets.UTF_8.name());
+            clientId = URLEncoder.encode(clientId, StandardCharsets.UTF_8);
+            clientSecret = URLEncoder.encode(clientSecret, 
StandardCharsets.UTF_8);
         }
 
         String s = String.format("%s:%s", clientId, clientSecret);
@@ -371,22 +369,17 @@ public class HttpAccessTokenRetriever implements 
AccessTokenRetriever {
         return String.format("Basic %s", encoded);
     }
 
-    static String formatRequestBody(String scope) throws IOException {
-        try {
-            StringBuilder requestParameters = new StringBuilder();
-            requestParameters.append("grant_type=client_credentials");
-
-            if (scope != null && !scope.trim().isEmpty()) {
-                scope = scope.trim();
-                String encodedScope = URLEncoder.encode(scope, 
StandardCharsets.UTF_8.name());
-                requestParameters.append("&scope=").append(encodedScope);
-            }
+    static String formatRequestBody(String scope) {
+        StringBuilder requestParameters = new StringBuilder();
+        requestParameters.append("grant_type=client_credentials");
 
-            return requestParameters.toString();
-        } catch (UnsupportedEncodingException e) {
-            // The world has gone crazy!
-            throw new IOException(String.format("Encoding %s not supported", 
StandardCharsets.UTF_8.name()));
+        if (scope != null && !scope.trim().isEmpty()) {
+            scope = scope.trim();
+            String encodedScope = URLEncoder.encode(scope, 
StandardCharsets.UTF_8);
+            requestParameters.append("&scope=").append(encodedScope);
         }
+
+        return requestParameters.toString();
     }
 
     private static String sanitizeString(String name, String value) {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwks.java
 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwks.java
index 3618005ebe1..62261fed58d 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwks.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwks.java
@@ -142,7 +142,7 @@ public final class RefreshingHttpsJwks implements Initable, 
Closeable {
         this.refreshRetryBackoffMs = refreshRetryBackoffMs;
         this.refreshRetryBackoffMaxMs = refreshRetryBackoffMaxMs;
         this.executorService = executorService;
-        this.missingKeyIds = new LinkedHashMap<String, 
Long>(MISSING_KEY_ID_CACHE_MAX_ENTRIES, .75f, true) {
+        this.missingKeyIds = new 
LinkedHashMap<>(MISSING_KEY_ID_CACHE_MAX_ENTRIES, .75f, true) {
             @Override
             protected boolean removeEldestEntry(Map.Entry<String, Long> 
eldest) {
                 return this.size() > MISSING_KEY_ID_CACHE_MAX_ENTRIES;
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java
 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java
index 64f5ddf070b..6b1148e291b 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java
@@ -349,9 +349,7 @@ public class OAuthBearerUnsecuredJws implements 
OAuthBearerToken {
             if (Utils.isBlank(scopeClaimValue))
                 return Collections.emptySet();
             else {
-                Set<String> retval = new HashSet<>();
-                retval.add(scopeClaimValue.trim());
-                return Collections.unmodifiableSet(retval);
+                return Set.of(scopeClaimValue.trim());
             }
         }
         List<?> scopeClaimValue = claim(scopeClaimName, List.class);
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java
 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java
index 2241eb50fdc..455fda983c3 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java
@@ -32,12 +32,10 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
 import java.util.Base64;
 import java.util.Base64.Encoder;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -104,8 +102,7 @@ public class OAuthBearerUnsecuredLoginCallbackHandler 
implements AuthenticateCal
     private static final String PRINCIPAL_CLAIM_NAME_OPTION = OPTION_PREFIX + 
"PrincipalClaimName";
     private static final String LIFETIME_SECONDS_OPTION = OPTION_PREFIX + 
"LifetimeSeconds";
     private static final String SCOPE_CLAIM_NAME_OPTION = OPTION_PREFIX + 
"ScopeClaimName";
-    private static final Set<String> RESERVED_CLAIMS = Collections
-            .unmodifiableSet(new HashSet<>(Arrays.asList("iat", "exp")));
+    private static final Set<String> RESERVED_CLAIMS = Set.of("iat", "exp");
     private static final String DEFAULT_PRINCIPAL_CLAIM_NAME = "sub";
     private static final String DEFAULT_LIFETIME_SECONDS_ONE_HOUR = "3600";
     private static final String DEFAULT_SCOPE_CLAIM_NAME = "scope";
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java
 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java
index a817d7b8f27..53e099688d9 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java
@@ -193,7 +193,7 @@ public class OAuthBearerUnsecuredValidatorCallbackHandler 
implements Authenticat
 
     private int allowableClockSkewMs() {
         String allowableClockSkewMsValue = 
option(ALLOWABLE_CLOCK_SKEW_MILLIS_OPTION);
-        int allowableClockSkewMs = 0;
+        int allowableClockSkewMs;
         try {
             allowableClockSkewMs = Utils.isBlank(allowableClockSkewMsValue) ? 
0 : Integer.parseInt(allowableClockSkewMsValue.trim());
         } catch (NumberFormatException e) {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/ssl/CommonNameLoggingTrustManagerFactoryWrapper.java
 
b/clients/src/main/java/org/apache/kafka/common/security/ssl/CommonNameLoggingTrustManagerFactoryWrapper.java
index cae9e19ad2d..a7b16906d5d 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/ssl/CommonNameLoggingTrustManagerFactoryWrapper.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/ssl/CommonNameLoggingTrustManagerFactoryWrapper.java
@@ -121,7 +121,7 @@ class CommonNameLoggingTrustManagerFactoryWrapper {
             this.origTm = originalTrustManager;
             this.nrOfRememberedBadCerts = nrOfRememberedBadCerts;
             // Restrict maximal size of the LinkedHashMap to avoid security 
attacks causing OOM
-            this.previouslyRejectedClientCertChains = new 
LinkedHashMap<ByteBuffer, String>() {
+            this.previouslyRejectedClientCertChains = new LinkedHashMap<>() {
                 @Override
                 protected boolean removeEldestEntry(final 
Map.Entry<ByteBuffer, String> eldest) {
                     return size() > nrOfRememberedBadCerts;
@@ -238,7 +238,7 @@ class CommonNameLoggingTrustManagerFactoryWrapper {
                 principalToCertMap.put(principal, cert);
             }
             // Thus, expect certificate chain to be broken, e.g. containing 
multiple enbd certificates
-            HashSet<X509Certificate> endCertificates = new HashSet<>();
+            Set<X509Certificate> endCertificates = new HashSet<>();
             for (X509Certificate cert: origChain) {
                 X500Principal subjectPrincipal = 
cert.getSubjectX500Principal();
                 if 
(!issuedbyPrincipalToCertificatesMap.containsKey(subjectPrincipal)) {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/TelemetryMetricNamingConvention.java
 
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/TelemetryMetricNamingConvention.java
index 9e6dbc0c559..cb8c245a613 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/TelemetryMetricNamingConvention.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/TelemetryMetricNamingConvention.java
@@ -41,7 +41,7 @@ public class TelemetryMetricNamingConvention {
     public static MetricNamingStrategy<MetricName> 
getClientTelemetryMetricNamingStrategy(String prefix) {
         Objects.requireNonNull(prefix, "prefix cannot be null");
 
-        return new MetricNamingStrategy<MetricName>() {
+        return new MetricNamingStrategy<>() {
             @Override
             public MetricKey metricKey(MetricName metricName) {
                 Objects.requireNonNull(metricName, "metric name cannot be 
null");
diff --git 
a/clients/src/main/java/org/apache/kafka/common/utils/ChildFirstClassLoader.java
 
b/clients/src/main/java/org/apache/kafka/common/utils/ChildFirstClassLoader.java
index ce1680b62a7..9a891e08463 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/utils/ChildFirstClassLoader.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/utils/ChildFirstClassLoader.java
@@ -112,7 +112,7 @@ public class ChildFirstClassLoader extends URLClassLoader {
         Enumeration<URL> urls1 = findResources(name);
         Enumeration<URL> urls2 = getParent() != null ? 
getParent().getResources(name) : null;
 
-        return new Enumeration<URL>() {
+        return new Enumeration<>() {
             @Override
             public boolean hasMoreElements() {
                 return (urls1 != null && urls1.hasMoreElements()) || (urls2 != 
null && urls2.hasMoreElements());
diff --git 
a/clients/src/main/java/org/apache/kafka/common/utils/CloseableIterator.java 
b/clients/src/main/java/org/apache/kafka/common/utils/CloseableIterator.java
index 50b06369b47..1709b5d47fb 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/CloseableIterator.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/CloseableIterator.java
@@ -29,7 +29,7 @@ public interface CloseableIterator<T> extends Iterator<T>, 
Closeable {
     void close();
 
     static <R> CloseableIterator<R> wrap(Iterator<R> inner) {
-        return new CloseableIterator<R>() {
+        return new CloseableIterator<>() {
             @Override
             public void close() {}
 
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java 
b/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java
index 61d29a1a353..5b1b99692c9 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java
@@ -16,9 +16,6 @@
  */
 package org.apache.kafka.common.utils;
 
-import org.apache.kafka.common.KafkaException;
-
-import java.io.UnsupportedEncodingException;
 import java.net.URLDecoder;
 import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
@@ -50,23 +47,19 @@ public class Sanitizer {
      * using URL-encoding.
      */
     public static String sanitize(String name) {
-        try {
-            String encoded = URLEncoder.encode(name, 
StandardCharsets.UTF_8.name());
-            StringBuilder builder = new StringBuilder();
-            for (int i = 0; i < encoded.length(); i++) {
-                char c = encoded.charAt(i);
-                if (c == '*') {         // Metric ObjectName treats * as 
pattern
-                    builder.append("%2A");
-                } else if (c == '+') {  // Space URL-encoded as +, replace 
with percent encoding
-                    builder.append("%20");
-                } else {
-                    builder.append(c);
-                }
+        String encoded = URLEncoder.encode(name, StandardCharsets.UTF_8);
+        StringBuilder builder = new StringBuilder();
+        for (int i = 0; i < encoded.length(); i++) {
+            char c = encoded.charAt(i);
+            if (c == '*') {         // Metric ObjectName treats * as pattern
+                builder.append("%2A");
+            } else if (c == '+') {  // Space URL-encoded as +, replace with 
percent encoding
+                builder.append("%20");
+            } else {
+                builder.append(c);
             }
-            return builder.toString();
-        } catch (UnsupportedEncodingException e) {
-            throw new KafkaException(e);
         }
+        return builder.toString();
     }
 
     /**
@@ -74,11 +67,7 @@ public class Sanitizer {
      * is used to obtain the desanitized version of node names in ZooKeeper.
      */
     public static String desanitize(String name) {
-        try {
-            return URLDecoder.decode(name, StandardCharsets.UTF_8.name());
-        } catch (UnsupportedEncodingException e) {
-            throw new KafkaException(e);
-        }
+        return URLDecoder.decode(name, StandardCharsets.UTF_8);
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java 
b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
index c501dce65d3..ca2a76b4716 100644
--- a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
+++ b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
@@ -201,12 +201,12 @@ public interface Authorizer extends Configurable, 
Closeable {
             resourceTypeFilter, AccessControlEntryFilter.ANY);
 
         EnumMap<PatternType, Set<String>> denyPatterns =
-            new EnumMap<PatternType, Set<String>>(PatternType.class) {{
+            new EnumMap<>(PatternType.class) {{
                 put(PatternType.LITERAL, new HashSet<>());
                 put(PatternType.PREFIXED, new HashSet<>());
             }};
         EnumMap<PatternType, Set<String>> allowPatterns =
-            new EnumMap<PatternType, Set<String>>(PatternType.class) {{
+            new EnumMap<>(PatternType.class) {{
                 put(PatternType.LITERAL, new HashSet<>());
                 put(PatternType.PREFIXED, new HashSet<>());
             }};
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index a836e982140..d713abbcae7 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -3206,7 +3206,7 @@ public class KafkaAdminClientTest {
     }
 
     @Test
-    public void testListGroupsWithTypesOlderBrokerVersion() throws Exception {
+    public void testListGroupsWithTypesOlderBrokerVersion() {
         ApiVersion listGroupV4 = new ApiVersion()
             .setApiKey(ApiKeys.LIST_GROUPS.id)
             .setMinVersion((short) 0)
@@ -5296,7 +5296,7 @@ public class KafkaAdminClientTest {
     }
 
     @Test
-    public void testListShareGroupsWithStatesOlderBrokerVersion() throws 
Exception {
+    public void testListShareGroupsWithStatesOlderBrokerVersion() {
         ApiVersion listGroupV4 = new ApiVersion()
             .setApiKey(ApiKeys.LIST_GROUPS.id)
             .setMinVersion((short) 0)
@@ -6548,7 +6548,7 @@ public class KafkaAdminClientTest {
                     .noneMatch(p -> p.timestamp() == 
ListOffsetsRequest.MAX_TIMESTAMP),
                 new ListOffsetsResponse(responseData), node);
 
-            ListOffsetsResult result = env.adminClient().listOffsets(new 
HashMap<TopicPartition, OffsetSpec>() {{
+            ListOffsetsResult result = env.adminClient().listOffsets(new 
HashMap<>() {{
                     put(tp0, OffsetSpec.maxTimestamp());
                     put(tp1, OffsetSpec.latest());
                 }});
@@ -6609,7 +6609,7 @@ public class KafkaAdminClientTest {
                     new ListOffsetsResponse(responseDataWithError), node);
             }
             ListOffsetsResult result = env.adminClient().listOffsets(
-                new HashMap<TopicPartition, OffsetSpec>() {
+                new HashMap<>() {
                     {
                         put(tp0, OffsetSpec.latest());
                         put(tp1, OffsetSpec.latest());
@@ -6632,7 +6632,7 @@ public class KafkaAdminClientTest {
             env.kafkaClient().prepareResponseFrom(
                 request -> request instanceof ListOffsetsRequest, new 
ListOffsetsResponse(responseData), node);
             result = env.adminClient().listOffsets(
-                new HashMap<TopicPartition, OffsetSpec>() {
+                new HashMap<>() {
                     {
                         put(tp0, OffsetSpec.latest());
                         put(tp1, OffsetSpec.latest());
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandlerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandlerTest.java
index 9651964e1c6..4cd9d613095 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandlerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandlerTest.java
@@ -61,7 +61,7 @@ public class DeleteRecordsHandlerTest {
     private final TopicPartition t0p3 = new TopicPartition("t0", 3);
     private final Node node1 = new Node(1, "host", 1234);
     private final Node node2 = new Node(2, "host", 1235);
-    private final Map<TopicPartition, RecordsToDelete> recordsToDelete = new 
HashMap<TopicPartition, RecordsToDelete>() {
+    private final Map<TopicPartition, RecordsToDelete> recordsToDelete = new 
HashMap<>() {
         {
             put(t0p0, RecordsToDelete.beforeOffset(10L));
             put(t0p1, RecordsToDelete.beforeOffset(10L));
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandlerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandlerTest.java
index 5ad92ce1c95..a7156554001 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandlerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandlerTest.java
@@ -65,7 +65,7 @@ public final class ListOffsetsHandlerTest {
 
     private final Node node = new Node(1, "host", 1234);
 
-    private final Map<TopicPartition, Long> offsetTimestampsByPartition = new 
HashMap<TopicPartition, Long>() {
+    private final Map<TopicPartition, Long> offsetTimestampsByPartition = new 
HashMap<>() {
         {
             put(t0p0, ListOffsetsRequest.LATEST_TIMESTAMP);
             put(t0p1, ListOffsetsRequest.EARLIEST_TIMESTAMP);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 2deaec2efea..f39d11a5cb5 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1969,11 +1969,11 @@ public class KafkaConsumerTest {
         }
 
         try (KafkaConsumer<byte[], byte[]> consumer = 
newConsumer(groupProtocol, null)) {
-            assertThrows(InvalidGroupIdException.class, () -> 
consumer.commitAsync());
+            assertThrows(InvalidGroupIdException.class, consumer::commitAsync);
         }
 
         try (KafkaConsumer<byte[], byte[]> consumer = 
newConsumer(groupProtocol, null)) {
-            assertThrows(InvalidGroupIdException.class, () -> 
consumer.commitSync());
+            assertThrows(InvalidGroupIdException.class, consumer::commitSync);
         }
     }
 
@@ -1984,8 +1984,8 @@ public class KafkaConsumerTest {
             consumer.assign(singleton(tp0));
 
             assertThrows(InvalidGroupIdException.class, () -> 
consumer.committed(Collections.singleton(tp0)).get(tp0));
-            assertThrows(InvalidGroupIdException.class, () -> 
consumer.commitAsync());
-            assertThrows(InvalidGroupIdException.class, () -> 
consumer.commitSync());
+            assertThrows(InvalidGroupIdException.class, consumer::commitAsync);
+            assertThrows(InvalidGroupIdException.class, consumer::commitSync);
         }
     }
 
@@ -2668,7 +2668,7 @@ public class KafkaConsumerTest {
 
     @ParameterizedTest
     @EnumSource(GroupProtocol.class)
-    public void testListOffsetShouldUpdateSubscriptions(GroupProtocol 
groupProtocol) throws InterruptedException {
+    public void testListOffsetShouldUpdateSubscriptions(GroupProtocol 
groupProtocol) {
         final ConsumerMetadata metadata = createMetadata(subscription);
         final MockClient client = new MockClient(time, metadata);
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index f82bb011b84..8b5c2d418df 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -156,7 +156,7 @@ public class AbstractCoordinatorTest {
                                                                         
groupInstanceId,
                                                                         
retryBackoffMs,
                                                                         
retryBackoffMaxMs,
-                                                                        
!groupInstanceId.isPresent());
+                                                                        
groupInstanceId.isEmpty());
         this.coordinator = new DummyCoordinator(rebalanceConfig,
                                                 consumerClient,
                                                 metrics,
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
index 5960fd28fbf..68b9ecb528b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
@@ -214,7 +214,7 @@ public class FetchCollectorTest {
         assignAndSeek(topicAPartition0);
 
         // Create a FetchCollector that fails on CompletedFetch initialization.
-        fetchCollector = new FetchCollector<String, String>(logContext,
+        fetchCollector = new FetchCollector<>(logContext,
                 metadata,
                 subscriptions,
                 fetchConfig,
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
index 3e3f70a7443..c2b6ad8b168 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
@@ -161,7 +161,7 @@ public class FetchRequestManagerTest {
     private final String topicName = "test";
     private final String groupId = "test-group";
     private final Uuid topicId = Uuid.randomUuid();
-    private final Map<String, Uuid> topicIds = new HashMap<String, Uuid>() {
+    private final Map<String, Uuid> topicIds = new HashMap<>() {
         {
             put(topicName, topicId);
         }
@@ -1723,8 +1723,7 @@ public class FetchRequestManagerTest {
 
         assertFalse(subscriptions.isOffsetResetNeeded(tp0));
         for (int i = 0; i < 2; i++) {
-            OffsetOutOfRangeException e = 
assertThrows(OffsetOutOfRangeException.class, () ->
-                    collectFetch());
+            OffsetOutOfRangeException e = 
assertThrows(OffsetOutOfRangeException.class, this::collectFetch);
             assertEquals(singleton(tp0), 
e.offsetOutOfRangePartitions().keySet());
             assertEquals(0L, 
e.offsetOutOfRangePartitions().get(tp0).longValue());
         }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 6c29d3df82b..c554f1c8e7a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -157,7 +157,7 @@ public class FetcherTest {
     private final String topicName = "test";
     private final String groupId = "test-group";
     private final Uuid topicId = Uuid.randomUuid();
-    private final Map<String, Uuid> topicIds = new HashMap<String, Uuid>() {
+    private final Map<String, Uuid> topicIds = new HashMap<>() {
         {
             put(topicName, topicId);
         }
@@ -1709,8 +1709,7 @@ public class FetcherTest {
 
         assertFalse(subscriptions.isOffsetResetNeeded(tp0));
         for (int i = 0; i < 2; i++) {
-            OffsetOutOfRangeException e = 
assertThrows(OffsetOutOfRangeException.class, () ->
-                    collectFetch());
+            OffsetOutOfRangeException e = 
assertThrows(OffsetOutOfRangeException.class, this::collectFetch);
             assertEquals(singleton(tp0), 
e.offsetOutOfRangePartitions().keySet());
             assertEquals(0L, 
e.offsetOutOfRangePartitions().get(tp0).longValue());
         }
@@ -2849,7 +2848,7 @@ public class FetcherTest {
                 true, // check crcs
                 CommonClientConfigs.DEFAULT_CLIENT_RACK,
                 isolationLevel);
-        fetcher = new Fetcher<byte[], byte[]>(
+        fetcher = new Fetcher<>(
                 logContext,
                 consumerClient,
                 metadata,
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
index 4973624b0a0..bcbd0411656 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
@@ -99,7 +99,7 @@ public class OffsetFetcherTest {
 
     private final String topicName = "test";
     private final Uuid topicId = Uuid.randomUuid();
-    private final Map<String, Uuid> topicIds = new HashMap<String, Uuid>() {
+    private final Map<String, Uuid> topicIds = new HashMap<>() {
         {
             put(topicName, topicId);
         }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java
index e218f8109fc..2cc4485f460 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java
@@ -182,7 +182,7 @@ public class RequestFutureTest {
     @Test
     public void testComposeSuccessCase() {
         RequestFuture<String> future = new RequestFuture<>();
-        RequestFuture<Integer> composed = future.compose(new 
RequestFutureAdapter<String, Integer>() {
+        RequestFuture<Integer> composed = future.compose(new 
RequestFutureAdapter<>() {
             @Override
             public void onSuccess(String value, RequestFuture<Integer> future) 
{
                 future.complete(value.length());
@@ -199,7 +199,7 @@ public class RequestFutureTest {
     @Test
     public void testComposeFailureCase() {
         RequestFuture<String> future = new RequestFuture<>();
-        RequestFuture<Integer> composed = future.compose(new 
RequestFutureAdapter<String, Integer>() {
+        RequestFuture<Integer> composed = future.compose(new 
RequestFutureAdapter<>() {
             @Override
             public void onSuccess(String value, RequestFuture<Integer> future) 
{
                 future.complete(value.length());
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index 8ec60d4ea97..135574d4e1d 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -124,13 +124,13 @@ public class ShareConsumeRequestManagerTest {
     private final String groupId = "test-group";
     private final Uuid topicId = Uuid.randomUuid();
     private final Uuid topicId2 = Uuid.randomUuid();
-    private final Map<String, Uuid> topicIds = new HashMap<String, Uuid>() {
+    private final Map<String, Uuid> topicIds = new HashMap<>() {
         {
             put(topicName, topicId);
             put(topicName2, topicId2);
         }
     };
-    private final Map<String, Integer> topicPartitionCounts = new 
HashMap<String, Integer>() {
+    private final Map<String, Integer> topicPartitionCounts = new HashMap<>() {
         {
             put(topicName, 2);
             put(topicName2, 1);
@@ -739,7 +739,7 @@ public class ShareConsumeRequestManagerTest {
     }
 
     @Test
-    public void testRetryAcknowledgementsWithLeaderChange() throws 
InterruptedException {
+    public void testRetryAcknowledgementsWithLeaderChange() {
         buildRequestManager();
 
         subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
@@ -1473,7 +1473,7 @@ public class ShareConsumeRequestManagerTest {
     }
 
     /**
-     * Assert that the {@link ShareFetchCollector#collect(ShareFetchBuffer)} 
latest fetch} does not contain any
+     * Assert that the {@link ShareFetchCollector#collect(ShareFetchBuffer) 
latest fetch} does not contain any
      * {@link ShareFetch#records() user-visible records}, and is {@link 
ShareFetch#isEmpty() empty}.
      *
      * @param reason the reason to include for assertion methods such as 
{@link org.junit.jupiter.api.Assertions#assertTrue(boolean, String)}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
index fb6a57ac039..893840de4c6 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
@@ -137,7 +137,7 @@ public class ShareFetchCollectorTest {
         subscribeAndAssign(topicAPartition0);
 
         // Create a ShareFetchCollector that fails on ShareCompletedFetch 
initialization.
-        fetchCollector = new ShareFetchCollector<String, String>(logContext,
+        fetchCollector = new ShareFetchCollector<>(logContext,
                 metadata,
                 subscriptions,
                 fetchConfig,
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataFetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataFetcherTest.java
index e5b2833b154..5f34836eaba 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataFetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataFetcherTest.java
@@ -59,7 +59,7 @@ public class TopicMetadataFetcherTest {
 
     private final String topicName = "test";
     private final Uuid topicId = Uuid.randomUuid();
-    private final Map<String, Uuid> topicIds = new HashMap<String, Uuid>() {
+    private final Map<String, Uuid> topicIds = new HashMap<>() {
         {
             put(topicName, topicId);
         }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 3bd0df92b00..e2e8ca7fed2 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -748,7 +748,7 @@ public class KafkaProducerTest {
             }
         };
 
-        return new KafkaProducer<String, String>(
+        return new KafkaProducer<>(
                 new 
ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, new 
StringSerializer(), new StringSerializer())),
                 new StringSerializer(), new StringSerializer(), metadata, 
mockClient, null, time) {
             @Override
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 
b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index 0045f271c1a..d3964fc87ed 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -393,14 +393,14 @@ public class MockProducerTest {
         producer.beginTransaction();
 
         String group1 = "g1";
-        Map<TopicPartition, OffsetAndMetadata> group1Commit = new 
HashMap<TopicPartition, OffsetAndMetadata>() {
+        Map<TopicPartition, OffsetAndMetadata> group1Commit = new HashMap<>() {
             {
                 put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L, 
null));
                 put(new TopicPartition(topic, 1), new OffsetAndMetadata(73L, 
null));
             }
         };
         String group2 = "g2";
-        Map<TopicPartition, OffsetAndMetadata> group2Commit = new 
HashMap<TopicPartition, OffsetAndMetadata>() {
+        Map<TopicPartition, OffsetAndMetadata> group2Commit = new HashMap<>() {
             {
                 put(new TopicPartition(topic, 0), new OffsetAndMetadata(101L, 
null));
                 put(new TopicPartition(topic, 1), new OffsetAndMetadata(21L, 
null));
@@ -435,7 +435,7 @@ public class MockProducerTest {
         producer.sendOffsetsToTransaction(Collections.emptyMap(), new 
ConsumerGroupMetadata("groupId"));
         assertFalse(producer.sentOffsets());
     }
-
+    
     @Test
     public void shouldAddOffsetsWhenSendOffsetsToTransactionByGroupMetadata() {
         buildMockProducer(true);
@@ -444,7 +444,7 @@ public class MockProducerTest {
 
         assertFalse(producer.sentOffsets());
 
-        Map<TopicPartition, OffsetAndMetadata> groupCommit = new 
HashMap<TopicPartition, OffsetAndMetadata>() {
+        Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<>() {
             {
                 put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L, 
null));
             }
@@ -461,7 +461,7 @@ public class MockProducerTest {
 
         assertFalse(producer.sentOffsets());
 
-        Map<TopicPartition, OffsetAndMetadata> groupCommit = new 
HashMap<TopicPartition, OffsetAndMetadata>() {
+        Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<>() {
             {
                 put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L, 
null));
             }
@@ -488,13 +488,13 @@ public class MockProducerTest {
         producer.beginTransaction();
 
         String group = "g";
-        Map<TopicPartition, OffsetAndMetadata> groupCommit1 = new 
HashMap<TopicPartition, OffsetAndMetadata>() {
+        Map<TopicPartition, OffsetAndMetadata> groupCommit1 = new HashMap<>() {
             {
                 put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L, 
null));
                 put(new TopicPartition(topic, 1), new OffsetAndMetadata(73L, 
null));
             }
         };
-        Map<TopicPartition, OffsetAndMetadata> groupCommit2 = new 
HashMap<TopicPartition, OffsetAndMetadata>() {
+        Map<TopicPartition, OffsetAndMetadata> groupCommit2 = new HashMap<>() {
             {
                 put(new TopicPartition(topic, 1), new OffsetAndMetadata(101L, 
null));
                 put(new TopicPartition(topic, 2), new OffsetAndMetadata(21L, 
null));
@@ -506,7 +506,7 @@ public class MockProducerTest {
         assertTrue(producer.consumerGroupOffsetsHistory().isEmpty());
 
         Map<String, Map<TopicPartition, OffsetAndMetadata>> expectedResult = 
new HashMap<>();
-        expectedResult.put(group, new HashMap<TopicPartition, 
OffsetAndMetadata>() {
+        expectedResult.put(group, new HashMap<>() {
             {
                 put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L, 
null));
                 put(new TopicPartition(topic, 1), new OffsetAndMetadata(101L, 
null));
@@ -525,7 +525,7 @@ public class MockProducerTest {
         producer.beginTransaction();
 
         String group = "g";
-        Map<TopicPartition, OffsetAndMetadata> groupCommit = new 
HashMap<TopicPartition, OffsetAndMetadata>() {
+        Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<>() {
             {
                 put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L, 
null));
                 put(new TopicPartition(topic, 1), new OffsetAndMetadata(73L, 
null));
@@ -554,7 +554,7 @@ public class MockProducerTest {
         producer.beginTransaction();
 
         String group = "g";
-        Map<TopicPartition, OffsetAndMetadata> groupCommit = new 
HashMap<TopicPartition, OffsetAndMetadata>() {
+        Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<>() {
             {
                 put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L, 
null));
                 put(new TopicPartition(topic, 1), new OffsetAndMetadata(73L, 
null));
@@ -579,7 +579,7 @@ public class MockProducerTest {
         producer.beginTransaction();
 
         String group = "g";
-        Map<TopicPartition, OffsetAndMetadata> groupCommit = new 
HashMap<TopicPartition, OffsetAndMetadata>() {
+        Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<>() {
             {
                 put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L, 
null));
                 put(new TopicPartition(topic, 1), new OffsetAndMetadata(73L, 
null));
@@ -591,7 +591,7 @@ public class MockProducerTest {
         producer.beginTransaction();
 
         String group2 = "g2";
-        Map<TopicPartition, OffsetAndMetadata> groupCommit2 = new 
HashMap<TopicPartition, OffsetAndMetadata>() {
+        Map<TopicPartition, OffsetAndMetadata> groupCommit2 = new HashMap<>() {
             {
                 put(new TopicPartition(topic, 2), new OffsetAndMetadata(53L, 
null));
                 put(new TopicPartition(topic, 3), new OffsetAndMetadata(84L, 
null));
diff --git 
a/clients/src/test/java/org/apache/kafka/common/config/provider/EnvVarConfigProviderTest.java
 
b/clients/src/test/java/org/apache/kafka/common/config/provider/EnvVarConfigProviderTest.java
index 9509d398305..bbd2268e7cb 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/config/provider/EnvVarConfigProviderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/config/provider/EnvVarConfigProviderTest.java
@@ -40,7 +40,7 @@ class EnvVarConfigProviderTest {
 
     @BeforeEach
     public void setup() {
-        Map<String, String> testEnvVars = new HashMap<String, String>() {
+        Map<String, String> testEnvVars = new HashMap<>() {
             {
                 put("test_var1", "value1");
                 put("secret_var2", "value2");
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
 
b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
index ad94ae1dcd9..dd1adb31815 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
@@ -138,9 +138,8 @@ public class SaslChannelBuilderTest {
      */
     @Test
     public void testClientChannelBuilderWithBrokerConfigs() throws Exception {
-        Map<String, Object> configs = new HashMap<>();
         CertStores certStores = new CertStores(false, "client", "localhost");
-        configs.putAll(certStores.getTrustingConfig(certStores));
+        Map<String, Object> configs = new 
HashMap<>(certStores.getTrustingConfig(certStores));
         configs.put(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka");
         configs.putAll(new ConfigDef().withClientSaslSupport().parse(configs));
         for (Field field : BrokerSecurityConfigs.class.getFields()) {
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientTest.java
index 3e9ea7f4db1..94f4f1fc8c4 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientTest.java
@@ -42,7 +42,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 public class OAuthBearerSaslClientTest {
 
-    private static final Map<String, String> TEST_PROPERTIES = new 
LinkedHashMap<String, String>() {
+    private static final Map<String, String> TEST_PROPERTIES = new 
LinkedHashMap<>() {
         {
             put("One", "1");
             put("Two", "2");
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java
index d50fd99a10d..581a72a5207 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java
@@ -52,14 +52,10 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class OAuthBearerSaslServerTest {
     private static final String USER = "user";
-    private static final Map<String, ?> CONFIGS;
-    static {
-        String jaasConfigText = 
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required"
+    private static final String JAAS_CONFIG_TEXT = 
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required"
                 + " unsecuredLoginStringClaim_sub=\"" + USER + "\";";
-        Map<String, Object> tmp = new HashMap<>();
-        tmp.put(SaslConfigs.SASL_JAAS_CONFIG, new Password(jaasConfigText));
-        CONFIGS = Collections.unmodifiableMap(tmp);
-    }
+    private static final Map<String, ?> CONFIGS = 
Map.of(SaslConfigs.SASL_JAAS_CONFIG, new Password(JAAS_CONFIG_TEXT));
+
     private static final AuthenticateCallbackHandler LOGIN_CALLBACK_HANDLER;
     static {
         LOGIN_CALLBACK_HANDLER = new 
OAuthBearerUnsecuredLoginCallbackHandler();
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetrieverTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetrieverTest.java
index b3eb3a026b7..8b1c5a37065 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetrieverTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetrieverTest.java
@@ -27,7 +27,6 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.io.UnsupportedEncodingException;
 import java.net.HttpURLConnection;
 import java.nio.charset.StandardCharsets;
 import java.util.Random;
@@ -172,19 +171,19 @@ public class HttpAccessTokenRetrieverTest extends 
OAuthBearerTest {
     }
 
     @Test
-    public void testFormatAuthorizationHeader() throws 
UnsupportedEncodingException {
+    public void testFormatAuthorizationHeader() {
         assertAuthorizationHeader("id", "secret", false, "Basic aWQ6c2VjcmV0");
     }
 
     @Test
-    public void testFormatAuthorizationHeaderEncoding() throws 
UnsupportedEncodingException {
+    public void testFormatAuthorizationHeaderEncoding() {
         // according to RFC-7617, we need to use the *non-URL safe* base64 
encoder. See KAFKA-14496.
         assertAuthorizationHeader("SOME_RANDOM_LONG_USER_01234", 
"9Q|0`8i~ute-n9ksjLWb\\50\"AX@UUED5E", false, "Basic 
U09NRV9SQU5ET01fTE9OR19VU0VSXzAxMjM0OjlRfDBgOGl+dXRlLW45a3NqTFdiXDUwIkFYQFVVRUQ1RQ==");
         // according to RFC-6749 clientId & clientSecret must be urlencoded, 
see https://tools.ietf.org/html/rfc6749#section-2.3.1
         assertAuthorizationHeader("user!@~'", "secret-(*)!", true, "Basic 
dXNlciUyMSU0MCU3RSUyNzpzZWNyZXQtJTI4KiUyOSUyMQ==");
     }
 
-    private void assertAuthorizationHeader(String clientId, String 
clientSecret, boolean urlencode, String expected) throws 
UnsupportedEncodingException {
+    private void assertAuthorizationHeader(String clientId, String 
clientSecret, boolean urlencode, String expected) {
         String actual = 
HttpAccessTokenRetriever.formatAuthorizationHeader(clientId, clientSecret, 
urlencode);
         assertEquals(expected, actual, String.format("Expected the HTTP 
Authorization header generated for client ID \"%s\" and client secret \"%s\" to 
match", clientId, clientSecret));
     }
@@ -203,14 +202,14 @@ public class HttpAccessTokenRetrieverTest extends 
OAuthBearerTest {
     }
 
     @Test
-    public void testFormatRequestBody() throws IOException {
+    public void testFormatRequestBody() {
         String expected = "grant_type=client_credentials&scope=scope";
         String actual = HttpAccessTokenRetriever.formatRequestBody("scope");
         assertEquals(expected, actual);
     }
 
     @Test
-    public void testFormatRequestBodyWithEscaped() throws IOException {
+    public void testFormatRequestBodyWithEscaped() {
         String questionMark = "%3F";
         String exclamationMark = "%21";
 
@@ -224,7 +223,7 @@ public class HttpAccessTokenRetrieverTest extends 
OAuthBearerTest {
     }
 
     @Test
-    public void testFormatRequestBodyMissingValues() throws IOException {
+    public void testFormatRequestBodyMissingValues() {
         String expected = "grant_type=client_credentials";
         String actual = HttpAccessTokenRetriever.formatRequestBody(null);
         assertEquals(expected, actual);
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandlerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandlerTest.java
index 84e06f2381d..d697dd46ead 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandlerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandlerTest.java
@@ -28,7 +28,6 @@ import java.nio.charset.StandardCharsets;
 import java.util.Base64;
 import java.util.Base64.Encoder;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 
 import javax.security.auth.callback.Callback;
@@ -50,25 +49,16 @@ public class 
OAuthBearerUnsecuredValidatorCallbackHandlerTest {
     private static final String TOO_EARLY_EXPIRATION_TIME_CLAIM_TEXT = 
expClaimText(0);
     private static final String ISSUED_AT_CLAIM_TEXT = 
claimOrHeaderText("iat", MOCK_TIME.milliseconds() / 1000.0);
     private static final String SCOPE_CLAIM_TEXT = claimOrHeaderText("scope", 
"scope1");
-    private static final Map<String, String> 
MODULE_OPTIONS_MAP_NO_SCOPE_REQUIRED;
-    static {
-        Map<String, String> tmp = new HashMap<>();
-        tmp.put("unsecuredValidatorPrincipalClaimName", "principal");
-        tmp.put("unsecuredValidatorAllowableClockSkewMs", "1");
-        MODULE_OPTIONS_MAP_NO_SCOPE_REQUIRED = 
Collections.unmodifiableMap(tmp);
-    }
-    private static final Map<String, String> 
MODULE_OPTIONS_MAP_REQUIRE_EXISTING_SCOPE;
-    static {
-        Map<String, String> tmp = new HashMap<>();
-        tmp.put("unsecuredValidatorRequiredScope", "scope1");
-        MODULE_OPTIONS_MAP_REQUIRE_EXISTING_SCOPE = 
Collections.unmodifiableMap(tmp);
-    }
-    private static final Map<String, String> 
MODULE_OPTIONS_MAP_REQUIRE_ADDITIONAL_SCOPE;
-    static {
-        Map<String, String> tmp = new HashMap<>();
-        tmp.put("unsecuredValidatorRequiredScope", "scope1 scope2");
-        MODULE_OPTIONS_MAP_REQUIRE_ADDITIONAL_SCOPE = 
Collections.unmodifiableMap(tmp);
-    }
+    private static final Map<String, String> 
MODULE_OPTIONS_MAP_NO_SCOPE_REQUIRED = Map.of(
+            "unsecuredValidatorPrincipalClaimName", "principal",
+            "unsecuredValidatorAllowableClockSkewMs", "1");
+
+    private static final Map<String, String> 
MODULE_OPTIONS_MAP_REQUIRE_EXISTING_SCOPE = Map.of(
+            "unsecuredValidatorRequiredScope", "scope1");
+
+    private static final Map<String, String> 
MODULE_OPTIONS_MAP_REQUIRE_ADDITIONAL_SCOPE = Map.of(
+            "unsecuredValidatorRequiredScope", "scope1 scope2");
+
 
     @Test
     public void validToken() {
diff --git 
a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
 
b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
index 70e5d80e354..521a0f19415 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
@@ -45,7 +45,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 public class SerializationTest {
 
     private final String topic = "testTopic";
-    private final Map<Class<?>, List<Object>> testData = new HashMap<Class<?>, 
List<Object>>() {
+    private final Map<Class<?>, List<Object>> testData = new HashMap<>() {
         {
             put(String.class, Arrays.asList(null, "my string"));
             put(Short.class, Arrays.asList(null, (short) 32767, (short) 
-32768));
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java 
b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index 8928159526c..16fc6af154b 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -770,9 +770,7 @@ public class UtilsTest {
         
when(mockIterator.next()).thenReturn(rootDir.toPath()).thenReturn(subDir.toPath());
         
when(mockIterator.hasNext()).thenReturn(true).thenReturn(true).thenReturn(false);
 
-        assertDoesNotThrow(() -> {
-            Utils.delete(spyRootFile);
-        });
+        assertDoesNotThrow(() -> Utils.delete(spyRootFile));
         assertFalse(Files.exists(rootDir.toPath()));
         assertFalse(Files.exists(subDir.toPath()));
     }

Reply via email to