This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d5e270482cf MINOR: Various cleanups in clients (#17895)
d5e270482cf is described below
commit d5e270482cf6601e1b635d69d311957e84d6bb59
Author: Mickael Maison <[email protected]>
AuthorDate: Fri Nov 22 20:38:31 2024 +0100
MINOR: Various cleanups in clients (#17895)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../java/org/apache/kafka/clients/Metadata.java | 6 ++--
.../org/apache/kafka/clients/MetadataSnapshot.java | 2 +-
.../clients/admin/ConsumerGroupDescription.java | 8 ++---
.../clients/admin/DeleteConsumerGroupsResult.java | 2 +-
.../admin/DescribeUserScramCredentialsResult.java | 2 +-
.../kafka/clients/admin/MemberAssignment.java | 4 +--
.../clients/admin/NewPartitionReassignment.java | 4 +--
.../kafka/clients/admin/ShareGroupDescription.java | 5 ++--
.../admin/UserScramCredentialsDescription.java | 4 +--
.../clients/admin/internals/AdminApiDriver.java | 2 +-
.../admin/internals/AdminMetadataManager.java | 2 +-
.../admin/internals/AllBrokersStrategy.java | 2 +-
.../admin/internals/ListTransactionsHandler.java | 2 +-
.../kafka/clients/consumer/ConsumerConfig.java | 9 ++----
.../kafka/clients/consumer/ConsumerRecords.java | 2 +-
.../kafka/clients/consumer/MockConsumer.java | 2 +-
.../consumer/NoOffsetForPartitionException.java | 3 +-
.../kafka/clients/consumer/RangeAssignor.java | 2 +-
.../consumer/internals/AbstractCoordinator.java | 6 ++--
.../clients/consumer/internals/AbstractFetch.java | 2 +-
.../consumer/internals/AbstractStickyAssignor.java | 8 ++---
.../clients/consumer/internals/AsyncClient.java | 2 +-
.../consumer/internals/AsyncKafkaConsumer.java | 10 +++----
.../consumer/internals/ClassicKafkaConsumer.java | 8 ++---
.../consumer/internals/CommitRequestManager.java | 2 +-
.../consumer/internals/ConsumerCoordinator.java | 4 +--
.../internals/CoordinatorRequestManager.java | 2 +-
.../kafka/clients/consumer/internals/Fetch.java | 2 +-
.../clients/consumer/internals/FetchCollector.java | 2 +-
.../kafka/clients/consumer/internals/Fetcher.java | 2 +-
.../consumer/internals/NetworkClientDelegate.java | 2 +-
.../clients/consumer/internals/OffsetFetcher.java | 12 ++++----
.../consumer/internals/OffsetsRequestManager.java | 2 +-
.../clients/consumer/internals/RequestFuture.java | 4 +--
.../consumer/internals/RequestManagers.java | 4 +--
.../internals/ShareConsumeRequestManager.java | 2 +-
.../consumer/internals/ShareConsumerImpl.java | 2 +-
.../consumer/internals/SubscriptionState.java | 2 +-
.../events/ApplicationEventProcessor.java | 22 +++++++-------
.../internals/events/AssignmentChangeEvent.java | 5 ++--
.../consumer/internals/events/CommitEvent.java | 2 +-
.../events/ShareAcknowledgeOnCloseEvent.java | 2 +-
.../events/ShareAcknowledgeSyncEvent.java | 2 +-
.../consumer/internals/events/ShareFetchEvent.java | 2 +-
.../kafka/clients/producer/KafkaProducer.java | 2 +-
.../clients/producer/internals/ProducerBatch.java | 2 +-
.../producer/internals/TxnPartitionMap.java | 2 +-
.../org/apache/kafka/common/cache/LRUCache.java | 2 +-
.../org/apache/kafka/common/config/ConfigDef.java | 4 +--
.../apache/kafka/common/config/SslClientAuth.java | 5 +---
.../common/header/internals/RecordHeaders.java | 2 +-
.../org/apache/kafka/common/protocol/Protocol.java | 5 +---
.../common/record/AbstractLegacyRecordBatch.java | 2 +-
.../kafka/common/record/AbstractRecords.java | 2 +-
.../apache/kafka/common/record/MemoryRecords.java | 2 +-
.../kafka/common/requests/ApiVersionsResponse.java | 2 +-
.../common/requests/DescribeConfigsResponse.java | 1 -
.../apache/kafka/common/security/JaasContext.java | 4 +--
.../kafka/common/security/auth/SaslExtensions.java | 3 +-
.../OAuthBearerClientInitialResponse.java | 2 +-
.../secured/HttpAccessTokenRetriever.java | 35 +++++++++-------------
.../internals/secured/RefreshingHttpsJwks.java | 2 +-
.../unsecured/OAuthBearerUnsecuredJws.java | 4 +--
.../OAuthBearerUnsecuredLoginCallbackHandler.java | 5 +---
...uthBearerUnsecuredValidatorCallbackHandler.java | 2 +-
...ommonNameLoggingTrustManagerFactoryWrapper.java | 4 +--
.../internals/TelemetryMetricNamingConvention.java | 2 +-
.../kafka/common/utils/ChildFirstClassLoader.java | 2 +-
.../kafka/common/utils/CloseableIterator.java | 2 +-
.../org/apache/kafka/common/utils/Sanitizer.java | 35 ++++++++--------------
.../apache/kafka/server/authorizer/Authorizer.java | 4 +--
.../kafka/clients/admin/KafkaAdminClientTest.java | 10 +++----
.../admin/internals/DeleteRecordsHandlerTest.java | 2 +-
.../admin/internals/ListOffsetsHandlerTest.java | 2 +-
.../kafka/clients/consumer/KafkaConsumerTest.java | 10 +++----
.../internals/AbstractCoordinatorTest.java | 2 +-
.../consumer/internals/FetchCollectorTest.java | 2 +-
.../internals/FetchRequestManagerTest.java | 5 ++--
.../clients/consumer/internals/FetcherTest.java | 7 ++---
.../consumer/internals/OffsetFetcherTest.java | 2 +-
.../consumer/internals/RequestFutureTest.java | 4 +--
.../internals/ShareConsumeRequestManagerTest.java | 8 ++---
.../internals/ShareFetchCollectorTest.java | 2 +-
.../internals/TopicMetadataFetcherTest.java | 2 +-
.../kafka/clients/producer/KafkaProducerTest.java | 2 +-
.../kafka/clients/producer/MockProducerTest.java | 24 +++++++--------
.../config/provider/EnvVarConfigProviderTest.java | 2 +-
.../common/network/SaslChannelBuilderTest.java | 3 +-
.../internals/OAuthBearerSaslClientTest.java | 2 +-
.../internals/OAuthBearerSaslServerTest.java | 10 ++-----
.../secured/HttpAccessTokenRetrieverTest.java | 13 ++++----
...earerUnsecuredValidatorCallbackHandlerTest.java | 30 +++++++------------
.../common/serialization/SerializationTest.java | 2 +-
.../org/apache/kafka/common/utils/UtilsTest.java | 4 +--
94 files changed, 199 insertions(+), 268 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index ece1a25adca..b60156aae00 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -294,7 +294,7 @@ public class Metadata implements Closeable {
public synchronized LeaderAndEpoch currentLeader(TopicPartition
topicPartition) {
Optional<MetadataResponse.PartitionMetadata> maybeMetadata =
partitionMetadataIfCurrent(topicPartition);
- if (!maybeMetadata.isPresent())
+ if (maybeMetadata.isEmpty())
return new LeaderAndEpoch(Optional.empty(),
Optional.ofNullable(lastSeenLeaderEpochs.get(topicPartition)));
MetadataResponse.PartitionMetadata partitionMetadata =
maybeMetadata.get();
@@ -392,7 +392,7 @@ public class Metadata implements Closeable {
TopicPartition partition = partitionLeader.getKey();
Metadata.LeaderAndEpoch currentLeader = currentLeader(partition);
Metadata.LeaderIdAndEpoch newLeader = partitionLeader.getValue();
- if (!newLeader.epoch.isPresent() ||
!newLeader.leaderId.isPresent()) {
+ if (newLeader.epoch.isEmpty() || newLeader.leaderId.isEmpty()) {
log.debug("For {}, incoming leader information is incomplete
{}", partition, newLeader);
continue;
}
@@ -404,7 +404,7 @@ public class Metadata implements Closeable {
log.debug("For {}, incoming leader({}), the corresponding node
information for node-id {} is missing, so ignoring.", partition, newLeader,
newLeader.leaderId.get());
continue;
}
- if
(!this.metadataSnapshot.partitionMetadata(partition).isPresent()) {
+ if (this.metadataSnapshot.partitionMetadata(partition).isEmpty()) {
log.debug("For {}, incoming leader({}), partition metadata is
no longer cached, ignoring.", partition, newLeader);
continue;
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/MetadataSnapshot.java
b/clients/src/main/java/org/apache/kafka/clients/MetadataSnapshot.java
index 6d84a738c1d..424ecb0d291 100644
--- a/clients/src/main/java/org/apache/kafka/clients/MetadataSnapshot.java
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataSnapshot.java
@@ -133,7 +133,7 @@ public class MetadataSnapshot {
*/
public OptionalInt leaderEpochFor(TopicPartition tp) {
PartitionMetadata partitionMetadata = metadataByPartition.get(tp);
- if (partitionMetadata == null ||
!partitionMetadata.leaderEpoch.isPresent()) {
+ if (partitionMetadata == null ||
partitionMetadata.leaderEpoch.isEmpty()) {
return OptionalInt.empty();
} else {
return OptionalInt.of(partitionMetadata.leaderEpoch.get());
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
index 4f3dc837cde..4cbc5b4b43b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
@@ -23,9 +23,9 @@ import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.acl.AclOperation;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@@ -84,8 +84,7 @@ public class ConsumerGroupDescription {
Set<AclOperation> authorizedOperations) {
this.groupId = groupId == null ? "" : groupId;
this.isSimpleConsumerGroup = isSimpleConsumerGroup;
- this.members = members == null ? Collections.emptyList() :
- Collections.unmodifiableList(new ArrayList<>(members));
+ this.members = members == null ? Collections.emptyList() :
List.copyOf(members);
this.partitionAssignor = partitionAssignor == null ? "" :
partitionAssignor;
this.type = type;
this.groupState = GroupState.parse(state.name());
@@ -122,8 +121,7 @@ public class ConsumerGroupDescription {
Set<AclOperation> authorizedOperations) {
this.groupId = groupId == null ? "" : groupId;
this.isSimpleConsumerGroup = isSimpleConsumerGroup;
- this.members = members == null ? Collections.emptyList() :
- Collections.unmodifiableList(new ArrayList<>(members));
+ this.members = members == null ? Collections.emptyList() :
List.copyOf(members);
this.partitionAssignor = partitionAssignor == null ? "" :
partitionAssignor;
this.type = type;
this.groupState = groupState;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
index 90ddbd0582c..5c1e60b1a61 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
@@ -42,7 +42,7 @@ public class DeleteConsumerGroupsResult {
*/
public Map<String, KafkaFuture<Void>> deletedGroups() {
Map<String, KafkaFuture<Void>> deletedGroups = new
HashMap<>(futures.size());
- futures.forEach((key, future) -> deletedGroups.put(key, future));
+ deletedGroups.putAll(futures);
return deletedGroups;
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
index 2eddd7ee28c..f13fd2a8e7d 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
@@ -125,7 +125,7 @@ public class DescribeUserScramCredentialsResult {
// for users 1, 2, and 3 but this is looking for user 4), so
explicitly take care of that case
Optional<DescribeUserScramCredentialsResponseData.DescribeUserScramCredentialsResult>
optionalUserResult =
data.results().stream().filter(result ->
result.user().equals(userName)).findFirst();
- if (!optionalUserResult.isPresent()) {
+ if (optionalUserResult.isEmpty()) {
retval.completeExceptionally(new
ResourceNotFoundException("No such user: " + userName));
} else {
DescribeUserScramCredentialsResponseData.DescribeUserScramCredentialsResult
userResult = optionalUserResult.get();
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java
b/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java
index 495ddb09745..ec30b83baf7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java
@@ -19,7 +19,6 @@ package org.apache.kafka.clients.admin;
import org.apache.kafka.common.TopicPartition;
import java.util.Collections;
-import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@@ -36,8 +35,7 @@ public class MemberAssignment {
* @param topicPartitions List of topic partitions
*/
public MemberAssignment(Set<TopicPartition> topicPartitions) {
- this.topicPartitions = topicPartitions == null ?
Collections.emptySet() :
- Collections.unmodifiableSet(new HashSet<>(topicPartitions));
+ this.topicPartitions = topicPartitions == null ?
Collections.emptySet() : Set.copyOf(topicPartitions);
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java
b/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java
index 02bce5e98c4..0a37c012cc7 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java
@@ -17,8 +17,6 @@
package org.apache.kafka.clients.admin;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -34,7 +32,7 @@ public class NewPartitionReassignment {
public NewPartitionReassignment(List<Integer> targetReplicas) {
if (targetReplicas == null || targetReplicas.isEmpty())
throw new IllegalArgumentException("Cannot create a new partition
reassignment without any replicas");
- this.targetReplicas = Collections.unmodifiableList(new
ArrayList<>(targetReplicas));
+ this.targetReplicas = List.copyOf(targetReplicas);
}
public List<Integer> targetReplicas() {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupDescription.java
b/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupDescription.java
index 52fe0136aa2..4a2292a4d45 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupDescription.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupDescription.java
@@ -22,9 +22,9 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.annotation.InterfaceStability;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@@ -53,8 +53,7 @@ public class ShareGroupDescription {
Node coordinator,
Set<AclOperation> authorizedOperations) {
this.groupId = groupId == null ? "" : groupId;
- this.members = members == null ? Collections.emptyList() :
- Collections.unmodifiableList(new ArrayList<>(members));
+ this.members = members == null ? Collections.emptyList() :
List.copyOf(members);
this.groupState = groupState;
this.coordinator = coordinator;
this.authorizedOperations = authorizedOperations;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/UserScramCredentialsDescription.java
b/clients/src/main/java/org/apache/kafka/clients/admin/UserScramCredentialsDescription.java
index 97bc3588af6..03a713149be 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/UserScramCredentialsDescription.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/UserScramCredentialsDescription.java
@@ -17,8 +17,6 @@
package org.apache.kafka.clients.admin;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Objects;
@@ -61,7 +59,7 @@ public class UserScramCredentialsDescription {
*/
public UserScramCredentialsDescription(String name,
List<ScramCredentialInfo> credentialInfos) {
this.name = Objects.requireNonNull(name);
- this.credentialInfos = Collections.unmodifiableList(new
ArrayList<>(credentialInfos));
+ this.credentialInfos = List.copyOf(credentialInfos);
}
/**
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
index 19e3f13ebef..6286f59ed71 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
@@ -339,7 +339,7 @@ public class AdminApiDriver<K, V> {
}
// Copy the keys to avoid exposing the underlying mutable set
- Set<K> copyKeys = Collections.unmodifiableSet(new HashSet<>(keys));
+ Set<K> copyKeys = Set.copyOf(keys);
Collection<AdminApiHandler.RequestAndKeys<K>> newRequests =
buildRequest.apply(copyKeys, scope);
if (newRequests.isEmpty()) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
index 09fd000e50e..c1e92ca26fa 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
@@ -259,7 +259,7 @@ public class AdminMetadataManager {
public void transitionToUpdatePending(long now) {
this.state = State.UPDATE_PENDING;
this.lastMetadataFetchAttemptMs = now;
- if (!metadataAttemptStartMs.isPresent())
+ if (metadataAttemptStartMs.isEmpty())
metadataAttemptStartMs = Optional.of(now);
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AllBrokersStrategy.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AllBrokersStrategy.java
index 433d25e8e54..ed5f513e020 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AllBrokersStrategy.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AllBrokersStrategy.java
@@ -195,7 +195,7 @@ public class AllBrokersStrategy implements
AdminApiLookupStrategy<AllBrokersStra
}
private KafkaFutureImpl<V> futureOrThrow(BrokerKey key) {
- if (!key.brokerId.isPresent()) {
+ if (key.brokerId.isEmpty()) {
throw new IllegalArgumentException("Attempt to complete with
invalid key: " + key);
} else {
int brokerId = key.brokerId.getAsInt();
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandler.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandler.java
index 56318fc0acc..71b8e1a7c56 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandler.java
@@ -124,7 +124,7 @@ public class ListTransactionsHandler extends
AdminApiHandler.Batched<AllBrokersS
}
AllBrokersStrategy.BrokerKey key = keys.iterator().next();
- if (!key.brokerId.isPresent() || key.brokerId.getAsInt() != brokerId) {
+ if (key.brokerId.isEmpty() || key.brokerId.getAsInt() != brokerId) {
throw new IllegalArgumentException("Unexpected broker key: " +
key);
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index ff9bb6c1166..41702835d46 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -61,12 +61,7 @@ public class ConsumerConfig extends AbstractConfig {
// a list contains all the assignor names that only assign subscribed
topics to consumer. Should be updated when new assignor added.
// This is to help optimize ConsumerCoordinator#performAssignment method
public static final List<String> ASSIGN_FROM_SUBSCRIBED_ASSIGNORS =
- Collections.unmodifiableList(Arrays.asList(
- RANGE_ASSIGNOR_NAME,
- ROUNDROBIN_ASSIGNOR_NAME,
- STICKY_ASSIGNOR_NAME,
- COOPERATIVE_STICKY_ASSIGNOR_NAME
- ));
+ List.of(RANGE_ASSIGNOR_NAME, ROUNDROBIN_ASSIGNOR_NAME,
STICKY_ASSIGNOR_NAME, COOPERATIVE_STICKY_ASSIGNOR_NAME);
/*
* NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES
AS
@@ -709,7 +704,7 @@ public class ConsumerConfig extends AbstractConfig {
Optional<String> groupId =
Optional.ofNullable(getString(CommonClientConfigs.GROUP_ID_CONFIG));
Map<String, Object> originals = originals();
boolean enableAutoCommit =
originals.containsKey(ENABLE_AUTO_COMMIT_CONFIG) ?
getBoolean(ENABLE_AUTO_COMMIT_CONFIG) : false;
- if (!groupId.isPresent()) { // overwrite in case of default group id
where the config is not explicitly provided
+ if (groupId.isEmpty()) { // overwrite in case of default group id
where the config is not explicitly provided
if (!originals.containsKey(ENABLE_AUTO_COMMIT_CONFIG)) {
configs.put(ENABLE_AUTO_COMMIT_CONFIG, false);
} else if (enableAutoCommit) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
index f36f4cdc7b8..535313b1f2f 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
@@ -115,7 +115,7 @@ public class ConsumerRecords<K, V> implements
Iterable<ConsumerRecord<K, V>> {
@Override
public Iterator<ConsumerRecord<K, V>> iterator() {
- return new AbstractIterator<ConsumerRecord<K, V>>() {
+ return new AbstractIterator<>() {
final Iterator<? extends Iterable<ConsumerRecord<K, V>>> iters
= iterables.iterator();
Iterator<ConsumerRecord<K, V>> current;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 4bdffc48c23..a5143749b89 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -553,7 +553,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
}
public synchronized Set<TopicPartition> paused() {
- return Collections.unmodifiableSet(new HashSet<>(paused));
+ return Set.copyOf(paused);
}
private void ensureNotClosed() {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
index 2890f8e3332..93b6094fcfc 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.TopicPartition;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
import java.util.Set;
/**
@@ -40,7 +39,7 @@ public class NoOffsetForPartitionException extends
InvalidOffsetException {
public NoOffsetForPartitionException(Collection<TopicPartition>
partitions) {
super("Undefined offset with no reset policy for partitions: " +
partitions);
- this.partitions = Collections.unmodifiableSet(new
HashSet<>(partitions));
+ this.partitions = Set.copyOf(partitions);
}
/**
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
index 6bc064006fe..f5be90b712e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
@@ -269,7 +269,7 @@ public class RangeAssignor extends
AbstractPartitionAssignor {
boolean racksMatch(String consumer, TopicPartition tp) {
Optional<String> consumerRack = consumers.get(consumer);
Set<String> replicaRacks = partitionRacks.get(tp);
- return !consumerRack.isPresent() || (replicaRacks != null &&
replicaRacks.contains(consumerRack.get()));
+ return consumerRack.isEmpty() || (replicaRacks != null &&
replicaRacks.contains(consumerRack.get()));
}
int maxAssignable(String consumer) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index f177dad62b9..9860d2f5890 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -568,7 +568,7 @@ public abstract class AbstractCoordinator implements
Closeable {
if (lastRebalanceStartMs == -1L)
lastRebalanceStartMs = time.milliseconds();
joinFuture = sendJoinGroupRequest();
- joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
+ joinFuture.addListener(new RequestFutureListener<>() {
@Override
public void onSuccess(ByteBuffer value) {
// do nothing since all the handler logic are in
SyncGroupResponseHandler already
@@ -1188,7 +1188,7 @@ public abstract class AbstractCoordinator implements
Closeable {
}
protected boolean isDynamicMember() {
- return !rebalanceConfig.groupInstanceId.isPresent();
+ return rebalanceConfig.groupInstanceId.isEmpty();
}
private class LeaveGroupResponseHandler extends
CoordinatorResponseHandler<LeaveGroupResponse, Void> {
@@ -1528,7 +1528,7 @@ public abstract class AbstractCoordinator implements
Closeable {
} else {
heartbeat.sentHeartbeat(now);
final RequestFuture<Void> heartbeatFuture =
sendHeartbeatRequest();
- heartbeatFuture.addListener(new
RequestFutureListener<Void>() {
+ heartbeatFuture.addListener(new
RequestFutureListener<>() {
@Override
public void onSuccess(Void value) {
synchronized (AbstractCoordinator.this) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
index 4802eb7a120..e3d4eb58af4 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
@@ -416,7 +416,7 @@ public abstract class AbstractFetch implements Closeable {
Optional<Node> leaderOpt = position.currentLeader.leader;
- if (!leaderOpt.isPresent()) {
+ if (leaderOpt.isEmpty()) {
log.debug("Requesting metadata update for partition {} since
the position {} is missing the current leader node", partition, position);
metadata.requestUpdate(false);
continue;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
index 966b44b59a6..4ac1513ede5 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
@@ -892,7 +892,7 @@ public abstract class AbstractStickyAssignor extends
AbstractPartitionAssignor {
List<TopicPartition> unassignedPartitions = new
ArrayList<>(totalPartitionsCount - sortedAssignedPartitions.size());
- Collections.sort(sortedAssignedPartitions,
Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
+
sortedAssignedPartitions.sort(Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
boolean shouldAddDirectly = false;
Iterator<TopicPartition> sortedAssignedPartitionsIter =
sortedAssignedPartitions.iterator();
@@ -991,7 +991,7 @@ public abstract class AbstractStickyAssignor extends
AbstractPartitionAssignor {
currentPartitionConsumer.put(topicPartition,
entry.getKey());
List<String> sortedAllTopics = new
ArrayList<>(topic2AllPotentialConsumers.keySet());
- Collections.sort(sortedAllTopics, new
TopicComparator(topic2AllPotentialConsumers));
+ sortedAllTopics.sort(new
TopicComparator(topic2AllPotentialConsumers));
sortedAllPartitions = getAllTopicPartitions(sortedAllTopics);
sortedCurrentSubscriptions = new TreeSet<>(new
SubscriptionComparator(currentAssignment));
@@ -1084,7 +1084,7 @@ public abstract class AbstractStickyAssignor extends
AbstractPartitionAssignor {
List<TopicPartition> unassignedPartitions = new ArrayList<>();
- Collections.sort(sortedAssignedPartitions, new
PartitionComparator(topic2AllPotentialConsumers));
+ sortedAssignedPartitions.sort(new
PartitionComparator(topic2AllPotentialConsumers));
boolean shouldAddDirectly = false;
Iterator<TopicPartition> sortedAssignedPartitionsIter =
sortedAssignedPartitions.iterator();
@@ -1154,7 +1154,7 @@ public abstract class AbstractStickyAssignor extends
AbstractPartitionAssignor {
if (memberData.generation.isPresent() &&
memberData.generation.get() < maxGeneration) {
// if the current member's generation is lower than
maxGeneration, put into prevAssignment if needed
updatePrevAssignment(prevAssignment,
memberData.partitions, consumer, memberData.generation.get());
- } else if (!memberData.generation.isPresent() && maxGeneration
> DEFAULT_GENERATION) {
+ } else if (memberData.generation.isEmpty() && maxGeneration >
DEFAULT_GENERATION) {
// if maxGeneration is larger than DEFAULT_GENERATION
// put all (no generation) partitions as
DEFAULT_GENERATION into prevAssignment if needed
updatePrevAssignment(prevAssignment,
memberData.partitions, consumer, DEFAULT_GENERATION);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncClient.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncClient.java
index d4265e72c04..05f04cd6659 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncClient.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncClient.java
@@ -37,7 +37,7 @@ public abstract class AsyncClient<T1, Req extends
AbstractRequest, Resp extends
public RequestFuture<T2> sendAsyncRequest(Node node, T1 requestData) {
AbstractRequest.Builder<Req> requestBuilder = prepareRequest(node,
requestData);
- return client.send(node, requestBuilder).compose(new
RequestFutureAdapter<ClientResponse, T2>() {
+ return client.send(node, requestBuilder).compose(new
RequestFutureAdapter<>() {
@Override
@SuppressWarnings("unchecked")
public void onSuccess(ClientResponse value, RequestFuture<T2>
future) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 3ab84fd7aec..033b16ba7b6 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -613,7 +613,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
groupRebalanceConfig.groupId,
groupRebalanceConfig.groupInstanceId
);
- if (!groupMetadata.isPresent()) {
+ if (groupMetadata.isEmpty()) {
config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
config.ignore(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
}
@@ -952,7 +952,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
}
private void maybeThrowInvalidGroupIdException() {
- if (!groupMetadata.get().isPresent()) {
+ if (groupMetadata.get().isEmpty()) {
throw new InvalidGroupIdException("To use the group management or
offset commit APIs, you must " +
"provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the
consumer configuration.");
}
@@ -1350,7 +1350,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
}
private void autoCommitOnClose(final Timer timer) {
- if (!groupMetadata.get().isPresent())
+ if (groupMetadata.get().isEmpty())
return;
if (autoCommitEnabled)
@@ -1390,7 +1390,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
}
private void leaveGroupOnClose(final Timer timer) {
- if (!groupMetadata.get().isPresent())
+ if (groupMetadata.get().isEmpty())
return;
log.debug("Leaving the consumer group during consumer close");
@@ -1490,7 +1490,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
@Override
public Uuid clientInstanceId(Duration timeout) {
- if (!clientTelemetryReporter.isPresent()) {
+ if (clientTelemetryReporter.isEmpty()) {
throw new IllegalStateException("Telemetry is not enabled. Set
config `" + ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG + "` to `true`.");
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
index a3ac5e5698b..f5a672631ff 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
@@ -212,7 +212,7 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
);
// no coordinator will be constructed for the default (null) group
id
- if (!groupId.isPresent()) {
+ if (groupId.isEmpty()) {
config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
config.ignore(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
this.coordinator = null;
@@ -414,7 +414,7 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
public Set<String> subscription() {
acquireAndEnsureOpen();
try {
- return Collections.unmodifiableSet(new
HashSet<>(this.subscriptions.subscription()));
+ return Set.copyOf(this.subscriptions.subscription());
} finally {
release();
}
@@ -906,7 +906,7 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
@Override
public Uuid clientInstanceId(Duration timeout) {
- if (!clientTelemetryReporter.isPresent()) {
+ if (clientTelemetryReporter.isEmpty()) {
throw new IllegalStateException("Telemetry is not enabled. Set
config `" + ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG + "` to `true`.");
}
@@ -1258,7 +1258,7 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
}
private void maybeThrowInvalidGroupIdException() {
- if (!groupId.isPresent())
+ if (groupId.isEmpty())
throw new InvalidGroupIdException("To use the group management or
offset commit APIs, you must " +
"provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in
the consumer configuration.");
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index a5cb4753b38..41522889768 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -177,7 +177,7 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
@Override
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
// poll only when the coordinator node is known.
- if (!coordinatorRequestManager.coordinator().isPresent())
+ if (coordinatorRequestManager.coordinator().isEmpty())
return EMPTY;
if (closing) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 8647476758f..584a03736f9 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -1031,7 +1031,7 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
// the same order that they were added. Note also that
AbstractCoordinator prevents
// multiple concurrent coordinator lookup requests.
pendingAsyncCommits.incrementAndGet();
- lookupCoordinator().addListener(new RequestFutureListener<Void>() {
+ lookupCoordinator().addListener(new RequestFutureListener<>() {
@Override
public void onSuccess(Void value) {
pendingAsyncCommits.decrementAndGet();
@@ -1059,7 +1059,7 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
inFlightAsyncCommits.incrementAndGet();
final OffsetCommitCallback cb = callback == null ?
defaultOffsetCommitCallback : callback;
- future.addListener(new RequestFutureListener<Void>() {
+ future.addListener(new RequestFutureListener<>() {
@Override
public void onSuccess(Void value) {
inFlightAsyncCommits.decrementAndGet();
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
index a9e1bf46bed..c1e367055e4 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
@@ -210,7 +210,7 @@ public class CoordinatorRequestManager implements
RequestManager {
) {
// handles Runtime exception
Optional<FindCoordinatorResponseData.Coordinator> coordinator =
response.coordinatorByKey(this.groupId);
- if (!coordinator.isPresent()) {
+ if (coordinator.isEmpty()) {
String msg = String.format("Response did not contain expected
coordinator section for groupId: %s", this.groupId);
onFailedResponse(currentTimeMs, new IllegalStateException(msg));
return;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java
index 4127603372d..fa45de7e2cb 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java
@@ -32,9 +32,9 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
public class Fetch<K, V> {
private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
+ private final Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadata;
private boolean positionAdvanced;
private int numRecords;
- private Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadata;
public static <K, V> Fetch<K, V> empty() {
return new Fetch<>(new HashMap<>(), false, 0, new HashMap<>());
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java
index 794ff3aceba..94e76edd0a5 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java
@@ -347,7 +347,7 @@ public class FetchCollector<K, V> {
} else if (error == Errors.OFFSET_OUT_OF_RANGE) {
Optional<Integer> clearedReplicaId =
subscriptions.clearPreferredReadReplica(tp);
- if (!clearedReplicaId.isPresent()) {
+ if (clearedReplicaId.isEmpty()) {
// If there's no preferred replica to clear, we're fetching
from the leader so handle this error normally
SubscriptionState.FetchPosition position =
subscriptions.positionOrNull(tp);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 745bbfde992..ac86d1ebeaa 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -192,7 +192,7 @@ public class Fetcher<K, V> extends AbstractFetch {
final FetchRequest.Builder request =
createFetchRequest(fetchTarget, data);
final RequestFuture<ClientResponse> responseFuture =
client.send(fetchTarget, request);
- responseFuture.addListener(new
RequestFutureListener<ClientResponse>() {
+ responseFuture.addListener(new RequestFutureListener<>() {
@Override
public void onSuccess(ClientResponse resp) {
successHandler.handle(fetchTarget, data, resp);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
index 56e4d697748..fd05f898355 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
@@ -413,7 +413,7 @@ public class NetworkClientDelegate implements AutoCloseable
{
final Sensor
throttleTimeSensor,
final
ClientTelemetrySender clientTelemetrySender,
final
BackgroundEventHandler backgroundEventHandler) {
- return new CachedSupplier<NetworkClientDelegate>() {
+ return new CachedSupplier<>() {
@Override
protected NetworkClientDelegate create() {
KafkaClient client = ClientUtils.createNetworkClient(config,
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
index e5a8ba197a1..f7646bff9ed 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
@@ -144,7 +144,7 @@ public class OffsetFetcher {
do {
RequestFuture<ListOffsetResult> future =
sendListOffsetsRequests(remainingToSearch, requireTimestamps);
- future.addListener(new RequestFutureListener<ListOffsetResult>() {
+ future.addListener(new RequestFutureListener<>() {
@Override
public void onSuccess(ListOffsetResult value) {
synchronized (future) {
@@ -218,7 +218,7 @@ public class OffsetFetcher {
subscriptions.setNextAllowedRetry(resetTimestamps.keySet(),
time.milliseconds() + requestTimeoutMs);
RequestFuture<ListOffsetResult> future =
sendListOffsetRequest(node, resetTimestamps, false);
- future.addListener(new RequestFutureListener<ListOffsetResult>() {
+ future.addListener(new RequestFutureListener<>() {
@Override
public void onSuccess(ListOffsetResult result) {
offsetFetcherUtils.onSuccessfulResponseForResettingPositions(resetTimestamps,
result);
@@ -271,7 +271,7 @@ public class OffsetFetcher {
RequestFuture<OffsetForEpochResult> future =
offsetsForLeaderEpochClient.sendAsyncRequest(node,
fetchPositions);
- future.addListener(new
RequestFutureListener<OffsetForEpochResult>() {
+ future.addListener(new RequestFutureListener<>() {
@Override
public void onSuccess(OffsetForEpochResult offsetsResult) {
offsetFetcherUtils.onSuccessfulResponseForValidatingPositions(fetchPositions,
@@ -308,7 +308,7 @@ public class OffsetFetcher {
for (Map.Entry<Node, Map<TopicPartition, ListOffsetsPartition>> entry
: timestampsToSearchByNode.entrySet()) {
RequestFuture<ListOffsetResult> future =
sendListOffsetRequest(entry.getKey(), entry.getValue(), requireTimestamps);
- future.addListener(new RequestFutureListener<ListOffsetResult>() {
+ future.addListener(new RequestFutureListener<>() {
@Override
public void onSuccess(ListOffsetResult partialResult) {
synchronized (listOffsetRequestsFuture) {
@@ -352,7 +352,7 @@ public class OffsetFetcher {
Long offset = entry.getValue();
Metadata.LeaderAndEpoch leaderAndEpoch =
metadata.currentLeader(tp);
- if (!leaderAndEpoch.leader.isPresent()) {
+ if (leaderAndEpoch.leader.isEmpty()) {
log.debug("Leader for partition {} is unknown for fetching
offset {}", tp, offset);
metadata.requestUpdate(true);
partitionsToRetry.add(tp);
@@ -397,7 +397,7 @@ public class OffsetFetcher {
log.debug("Sending ListOffsetRequest {} to broker {}", builder, node);
return client.send(node, builder)
- .compose(new RequestFutureAdapter<ClientResponse,
ListOffsetResult>() {
+ .compose(new RequestFutureAdapter<>() {
@Override
public void onSuccess(ClientResponse response,
RequestFuture<ListOffsetResult> future) {
ListOffsetsResponse lor = (ListOffsetsResponse)
response.responseBody();
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
index 6d296149b70..db847eae831 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
@@ -894,7 +894,7 @@ public final class OffsetsRequestManager implements
RequestManager, ClusterResou
Long offset = entry.getValue();
Metadata.LeaderAndEpoch leaderAndEpoch =
metadata.currentLeader(tp);
- if (!leaderAndEpoch.leader.isPresent()) {
+ if (leaderAndEpoch.leader.isEmpty()) {
log.debug("Leader for partition {} is unknown for fetching
offset {}", tp, offset);
metadata.requestUpdate(true);
listOffsetsRequestState.ifPresent(offsetsRequestState ->
offsetsRequestState.remainingToSearch.put(tp, offset));
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
index 034efd4dba8..ed75524ac81 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
@@ -200,7 +200,7 @@ public class RequestFuture<T> implements
ConsumerNetworkClient.PollCondition {
*/
public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S>
adapter) {
final RequestFuture<S> adapted = new RequestFuture<>();
- addListener(new RequestFutureListener<T>() {
+ addListener(new RequestFutureListener<>() {
@Override
public void onSuccess(T value) {
adapter.onSuccess(value, adapted);
@@ -215,7 +215,7 @@ public class RequestFuture<T> implements
ConsumerNetworkClient.PollCondition {
}
public void chain(final RequestFuture<T> future) {
- addListener(new RequestFutureListener<T>() {
+ addListener(new RequestFutureListener<>() {
@Override
public void onSuccess(T value) {
future.complete(value);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index 7f682c81fc4..304f0fffd4a 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -160,7 +160,7 @@ public class RequestManagers implements Closeable {
final
OffsetCommitCallbackInvoker offsetCommitCallbackInvoker,
final MemberStateListener
applicationThreadMemberStateListener
) {
- return new CachedSupplier<RequestManagers>() {
+ return new CachedSupplier<>() {
@Override
protected RequestManagers create() {
final NetworkClientDelegate networkClientDelegate =
networkClientDelegateSupplier.get();
@@ -284,7 +284,7 @@ public class RequestManagers implements Closeable {
final
Optional<ClientTelemetryReporter> clientTelemetryReporter,
final Metrics metrics
) {
- return new CachedSupplier<RequestManagers>() {
+ return new CachedSupplier<>() {
@Override
protected RequestManagers create() {
long retryBackoffMs =
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index a07b2be9084..cda0196abe9 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -150,7 +150,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
for (TopicPartition partition : partitionsToFetch()) {
Optional<Node> leaderOpt =
metadata.currentLeader(partition).leader;
- if (!leaderOpt.isPresent()) {
+ if (leaderOpt.isEmpty()) {
log.debug("Requesting metadata update for partition {} since
current leader node is missing", partition);
metadata.requestUpdate(false);
continue;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index e209ec00b0d..c4f77bf1c6f 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -785,7 +785,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
*/
@Override
public Uuid clientInstanceId(final Duration timeout) {
- if (!clientTelemetryReporter.isPresent()) {
+ if (clientTelemetryReporter.isEmpty()) {
throw new IllegalStateException("Telemetry is not enabled. Set
config `" + ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG + "` to `true`.");
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 8871694db45..7bd4217d6c2 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -985,7 +985,7 @@ public class SubscriptionState {
return false;
}
- if (!currentLeaderAndEpoch.leader.isPresent()) {
+ if (currentLeaderAndEpoch.leader.isEmpty()) {
return false;
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 7bd2d1f28b7..d2e45370c66 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -281,7 +281,7 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
* it is already a member on the next poll.
*/
private void process(final TopicSubscriptionChangeEvent event) {
- if (!requestManagers.consumerHeartbeatRequestManager.isPresent()) {
+ if (requestManagers.consumerHeartbeatRequestManager.isEmpty()) {
log.warn("Group membership manager not present when processing a
subscribe event");
event.future().complete(null);
return;
@@ -381,7 +381,7 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
}
private void process(final ConsumerRebalanceListenerCallbackCompletedEvent
event) {
- if (!requestManagers.consumerHeartbeatRequestManager.isPresent()) {
+ if (requestManagers.consumerHeartbeatRequestManager.isEmpty()) {
log.warn(
"An internal error occurred; the group membership manager was
not present, so the notification of the {} callback execution could not be
sent",
event.methodName()
@@ -392,14 +392,14 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
}
private void process(@SuppressWarnings("unused") final CommitOnCloseEvent
event) {
- if (!requestManagers.commitRequestManager.isPresent())
+ if (requestManagers.commitRequestManager.isEmpty())
return;
log.debug("Signal CommitRequestManager closing");
requestManagers.commitRequestManager.get().signalClose();
}
private void process(final LeaveGroupOnCloseEvent event) {
- if (!requestManagers.consumerMembershipManager.isPresent())
+ if (requestManagers.consumerMembershipManager.isEmpty())
return;
log.debug("Signal the ConsumerMembershipManager to leave the consumer
group since the consumer is closing");
@@ -418,7 +418,7 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
* Process event that indicates the consumer acknowledged delivery of
records synchronously.
*/
private void process(final ShareAcknowledgeSyncEvent event) {
- if (!requestManagers.shareConsumeRequestManager.isPresent()) {
+ if (requestManagers.shareConsumeRequestManager.isEmpty()) {
return;
}
@@ -432,7 +432,7 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
* Process event that indicates the consumer acknowledged delivery of
records asynchronously.
*/
private void process(final ShareAcknowledgeAsyncEvent event) {
- if (!requestManagers.shareConsumeRequestManager.isPresent()) {
+ if (requestManagers.shareConsumeRequestManager.isEmpty()) {
return;
}
@@ -446,7 +446,7 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
* it is already a member.
*/
private void process(final ShareSubscriptionChangeEvent event) {
- if (!requestManagers.shareHeartbeatRequestManager.isPresent()) {
+ if (requestManagers.shareHeartbeatRequestManager.isEmpty()) {
KafkaException error = new KafkaException("Group membership
manager not present when processing a subscribe event");
event.future().completeExceptionally(error);
return;
@@ -469,7 +469,7 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
* the group is sent out.
*/
private void process(final ShareUnsubscribeEvent event) {
- if (!requestManagers.shareHeartbeatRequestManager.isPresent()) {
+ if (requestManagers.shareHeartbeatRequestManager.isEmpty()) {
KafkaException error = new KafkaException("Group membership
manager not present when processing an unsubscribe event");
event.future().completeExceptionally(error);
return;
@@ -490,7 +490,7 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
* the acknowledgements have responses.
*/
private void process(final ShareAcknowledgeOnCloseEvent event) {
- if (!requestManagers.shareConsumeRequestManager.isPresent()) {
+ if (requestManagers.shareConsumeRequestManager.isEmpty()) {
KafkaException error = new KafkaException("Group membership
manager not present when processing an acknowledge-on-close event");
event.future().completeExceptionally(error);
return;
@@ -507,7 +507,7 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
* @param event Event containing a boolean to indicate if the callback
handler is configured or not.
*/
private void process(final
ShareAcknowledgementCommitCallbackRegistrationEvent event) {
- if (!requestManagers.shareConsumeRequestManager.isPresent()) {
+ if (requestManagers.shareConsumeRequestManager.isEmpty()) {
return;
}
@@ -532,7 +532,7 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
final
ConsumerMetadata metadata,
final
SubscriptionState subscriptions,
final
Supplier<RequestManagers> requestManagersSupplier) {
- return new CachedSupplier<ApplicationEventProcessor>() {
+ return new CachedSupplier<>() {
@Override
protected ApplicationEventProcessor create() {
RequestManagers requestManagers =
requestManagersSupplier.get();
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeEvent.java
index 68e1bbc5e6d..7f48ee644c7 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeEvent.java
@@ -19,8 +19,7 @@ package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.common.TopicPartition;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
+import java.util.Set;
public class AssignmentChangeEvent extends CompletableApplicationEvent<Void> {
@@ -30,7 +29,7 @@ public class AssignmentChangeEvent extends
CompletableApplicationEvent<Void> {
public AssignmentChangeEvent(final long currentTimeMs, final long
deadlineMs, final Collection<TopicPartition> partitions) {
super(Type.ASSIGNMENT_CHANGE, deadlineMs);
this.currentTimeMs = currentTimeMs;
- this.partitions = Collections.unmodifiableSet(new
HashSet<>(partitions));
+ this.partitions = Set.copyOf(partitions);
}
public long currentTimeMs() {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java
index 9c94d3b9d3b..a05ce638ece 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java
@@ -40,7 +40,7 @@ public abstract class CommitEvent extends
CompletableApplicationEvent<Map<TopicP
* {@link Collections#unmodifiableMap(Map) as unmodifiable}.
*/
private static Optional<Map<TopicPartition, OffsetAndMetadata>>
validate(final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets) {
- if (!offsets.isPresent()) {
+ if (offsets.isEmpty()) {
return Optional.empty();
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgeOnCloseEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgeOnCloseEvent.java
index 23d4410d07b..0916ab8666c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgeOnCloseEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgeOnCloseEvent.java
@@ -23,7 +23,7 @@ import java.util.Map;
public class ShareAcknowledgeOnCloseEvent extends
CompletableApplicationEvent<Void> {
- private Map<TopicIdPartition, Acknowledgements> acknowledgementsMap;
+ private final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap;
public ShareAcknowledgeOnCloseEvent(final Map<TopicIdPartition,
Acknowledgements> acknowledgementsMap, final long deadlineMs) {
super(Type.SHARE_ACKNOWLEDGE_ON_CLOSE, deadlineMs);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgeSyncEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgeSyncEvent.java
index db3259a6779..49cb422e633 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgeSyncEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgeSyncEvent.java
@@ -23,7 +23,7 @@ import java.util.Map;
public class ShareAcknowledgeSyncEvent extends
CompletableApplicationEvent<Map<TopicIdPartition, Acknowledgements>> {
- private Map<TopicIdPartition, Acknowledgements> acknowledgementsMap;
+ private final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap;
public ShareAcknowledgeSyncEvent(final Map<TopicIdPartition,
Acknowledgements> acknowledgementsMap, final long deadlineMs) {
super(Type.SHARE_ACKNOWLEDGE_SYNC, deadlineMs);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareFetchEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareFetchEvent.java
index d5ce57b947a..2a2b56e87cd 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareFetchEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareFetchEvent.java
@@ -23,7 +23,7 @@ import java.util.Map;
public class ShareFetchEvent extends ApplicationEvent {
- private Map<TopicIdPartition, Acknowledgements> acknowledgementsMap;
+ private final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap;
public ShareFetchEvent(Map<TopicIdPartition, Acknowledgements>
acknowledgementsMap) {
super(Type.SHARE_FETCH);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index a77b3f8809f..7bc4b47ab08 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -1337,7 +1337,7 @@ public class KafkaProducer<K, V> implements Producer<K,
V> {
*/
@Override
public Uuid clientInstanceId(Duration timeout) {
- if (!clientTelemetryReporter.isPresent()) {
+ if (clientTelemetryReporter.isEmpty()) {
throw new IllegalStateException("Telemetry is not enabled. Set
config `" + ProducerConfig.ENABLE_METRICS_PUSH_CONFIG + "` to `true`.");
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index f70c1a33814..5619819dde7 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -112,7 +112,7 @@ public final class ProducerBatch {
*/
void maybeUpdateLeaderEpoch(OptionalInt latestLeaderEpoch) {
if (latestLeaderEpoch.isPresent()
- && (!currentLeaderEpoch.isPresent() ||
currentLeaderEpoch.getAsInt() < latestLeaderEpoch.getAsInt())) {
+ && (currentLeaderEpoch.isEmpty() || currentLeaderEpoch.getAsInt()
< latestLeaderEpoch.getAsInt())) {
log.trace("For {}, leader will be updated, currentLeaderEpoch: {},
attemptsWhenLeaderLastChanged:{}, latestLeaderEpoch: {}, current attempt: {}",
this, currentLeaderEpoch, attemptsWhenLeaderLastChanged,
latestLeaderEpoch, attempts);
attemptsWhenLeaderLastChanged = attempts();
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionMap.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionMap.java
index 05a0fa3fbbc..5d2be89a9dd 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionMap.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionMap.java
@@ -90,7 +90,7 @@ class TxnPartitionMap {
// It might happen that the TransactionManager has been reset while a
request was reenqueued and got a valid
// response for this. This can happen only if the producer is only
idempotent (not transactional) and in
// this case there will be no tracked bookkeeper entry about it, so we
have to insert one.
- if (!lastAckedOffset.isPresent() && !isTransactional)
+ if (lastAckedOffset.isEmpty() && !isTransactional)
getOrCreate(topicPartition);
if (lastOffset >
lastAckedOffset.orElse(ProduceResponse.INVALID_OFFSET))
get(topicPartition).setLastAckedOffset(lastOffset);
diff --git a/clients/src/main/java/org/apache/kafka/common/cache/LRUCache.java
b/clients/src/main/java/org/apache/kafka/common/cache/LRUCache.java
index 672cb65d66a..11f0be4f6e5 100644
--- a/clients/src/main/java/org/apache/kafka/common/cache/LRUCache.java
+++ b/clients/src/main/java/org/apache/kafka/common/cache/LRUCache.java
@@ -26,7 +26,7 @@ public class LRUCache<K, V> implements Cache<K, V> {
private final LinkedHashMap<K, V> cache;
public LRUCache(final int maxSize) {
- cache = new LinkedHashMap<K, V>(16, .75f, true) {
+ cache = new LinkedHashMap<>(16, .75f, true) {
@Override
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
return this.size() > maxSize; // require this. prefix to make
lgtm.com happy
diff --git
a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 9cf5fbae515..970d9cebf72 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -1501,7 +1501,7 @@ public class ConfigDef {
b.append("``").append(key.name).append("``").append("\n");
if (key.documentation != null) {
for (String docLine : key.documentation.split("\n")) {
- if (docLine.length() == 0) {
+ if (docLine.isEmpty()) {
continue;
}
b.append(" ").append(docLine).append("\n\n");
@@ -1532,7 +1532,7 @@ public class ConfigDef {
}
List<ConfigKey> configs = new ArrayList<>(configKeys.values());
- Collections.sort(configs, (k1, k2) -> compare(k1, k2, groupOrd));
+ configs.sort((k1, k2) -> compare(k1, k2, groupOrd));
return configs;
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/config/SslClientAuth.java
b/clients/src/main/java/org/apache/kafka/common/config/SslClientAuth.java
index 75f8e3640e9..c9552b7c44d 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/SslClientAuth.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/SslClientAuth.java
@@ -17,8 +17,6 @@
package org.apache.kafka.common.config;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.Locale;
@@ -30,8 +28,7 @@ public enum SslClientAuth {
REQUESTED,
NONE;
- public static final List<SslClientAuth> VALUES =
-
Collections.unmodifiableList(Arrays.asList(SslClientAuth.values()));
+ public static final List<SslClientAuth> VALUES =
List.of(SslClientAuth.values());
public static SslClientAuth forConfig(String key) {
if (key == null) {
diff --git
a/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java
b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java
index 7137f723d8f..52863c6c0b5 100644
---
a/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java
+++
b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java
@@ -123,7 +123,7 @@ public class RecordHeaders implements Headers {
}
private Iterator<Header> closeAware(final Iterator<Header> original) {
- return new Iterator<Header>() {
+ return new Iterator<>() {
@Override
public boolean hasNext() {
return original.hasNext();
diff --git
a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 090d7c92eb0..614c8cd22e4 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -31,10 +31,7 @@ import java.util.Set;
public class Protocol {
private static String indentString(int size) {
- StringBuilder b = new StringBuilder(size);
- for (int i = 0; i < size; i++)
- b.append(" ");
- return b.toString();
+ return " ".repeat(Math.max(0, size));
}
private static void schemaToBnfHtml(Schema schema, StringBuilder b, int
indentSize) {
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
index 9ab8715236e..e47d7c866cc 100644
---
a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
+++
b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
@@ -235,7 +235,7 @@ public abstract class AbstractLegacyRecordBatch extends
AbstractRecordBatch impl
if (isCompressed())
return new DeepRecordsIterator(this, false, Integer.MAX_VALUE,
bufferSupplier);
- return new CloseableIterator<Record>() {
+ return new CloseableIterator<>() {
private boolean hasNext = true;
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
index 5fddaae4062..16ee3596ea3 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
@@ -71,7 +71,7 @@ public abstract class AbstractRecords implements Records {
}
private Iterator<Record> recordsIterator() {
- return new AbstractIterator<Record>() {
+ return new AbstractIterator<>() {
private final Iterator<? extends RecordBatch> batches =
batches().iterator();
private Iterator<Record> records;
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index c01bca2496e..650071474db 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -192,7 +192,7 @@ public class MemoryRecords extends AbstractRecords {
// in which case, we need to reset the base timestamp and
overwrite the timestamp deltas
// if the batch does not contain tombstones, then we don't
need to overwrite batch
boolean needToSetDeleteHorizon = batch.magic() >=
RecordBatch.MAGIC_VALUE_V2 && (containsTombstones || containsMarkerForEmptyTxn)
- && !batch.deleteHorizonMs().isPresent();
+ && batch.deleteHorizonMs().isEmpty();
if (writeOriginalBatch && !needToSetDeleteHorizon) {
batch.writeTo(bufferOutputStream);
filterResult.updateRetainedBatchMetadata(batch,
retainedRecords.size(), false);
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index 0155fb8af5f..2327e910abe 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -251,7 +251,7 @@ public class ApiVersionsResponse extends AbstractResponse {
for (ApiKeys apiKey : ApiKeys.apisForListener(listenerType)) {
if (apiKey.minRequiredInterBrokerMagic <= minRecordVersion.value) {
final Optional<ApiVersion> brokerApiVersion =
apiKey.toApiVersion(enableUnstableLastVersion);
- if (!brokerApiVersion.isPresent()) {
+ if (brokerApiVersion.isEmpty()) {
// Broker does not support this API key.
continue;
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
index cb128b8b42c..f352ba4eea9 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
@@ -17,7 +17,6 @@
package org.apache.kafka.common.requests;
-import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.message.DescribeConfigsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java
b/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java
index 6211c5355cc..029b6881fdb 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java
@@ -23,9 +23,7 @@ import org.apache.kafka.common.network.ListenerName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -177,7 +175,7 @@ public class JaasContext {
AppConfigurationEntry[] entries =
configuration.getAppConfigurationEntry(name);
if (entries == null)
throw new IllegalArgumentException("Could not find a '" + name +
"' entry in this JAAS configuration.");
- this.configurationEntries = Collections.unmodifiableList(new
ArrayList<>(Arrays.asList(entries)));
+ this.configurationEntries = List.of(entries);
this.dynamicJaasConfig = dynamicJaasConfig;
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/auth/SaslExtensions.java
b/clients/src/main/java/org/apache/kafka/common/security/auth/SaslExtensions.java
index 8156c6fe231..3210c859baa 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/auth/SaslExtensions.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/auth/SaslExtensions.java
@@ -17,7 +17,6 @@
package org.apache.kafka.common.security.auth;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.StringJoiner;
@@ -51,7 +50,7 @@ public class SaslExtensions {
private final Map<String, String> extensionsMap;
public SaslExtensions(Map<String, String> extensionsMap) {
- this.extensionsMap = Collections.unmodifiableMap(new
HashMap<>(extensionsMap));
+ this.extensionsMap = Map.copyOf(extensionsMap);
}
/**
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerClientInitialResponse.java
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerClientInitialResponse.java
index 3e29841baa5..75c62b696cf 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerClientInitialResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerClientInitialResponse.java
@@ -136,7 +136,7 @@ public class OAuthBearerClientInitialResponse {
/**
* Return the always non-null token value
*
- * @return the always non-null toklen value
+ * @return the always non-null token value
*/
public String tokenValue() {
return tokenValue;
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java
index a878ae7a261..fdc5707278a 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java
@@ -33,7 +33,6 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
@@ -261,14 +260,14 @@ public class HttpAccessTokenRetriever implements
AccessTokenRetriever {
ByteArrayOutputStream os = new ByteArrayOutputStream();
log.debug("handleOutput - preparing to read response body from
{}", con.getURL());
copy(is, os);
- responseBody = os.toString(StandardCharsets.UTF_8.name());
+ responseBody = os.toString(StandardCharsets.UTF_8);
} catch (Exception e) {
// there still can be useful error response from the servers, lets
get it
try (InputStream is = con.getErrorStream()) {
ByteArrayOutputStream os = new ByteArrayOutputStream();
log.debug("handleOutput - preparing to read error response
body from {}", con.getURL());
copy(is, os);
- errorResponseBody = os.toString(StandardCharsets.UTF_8.name());
+ errorResponseBody = os.toString(StandardCharsets.UTF_8);
} catch (Exception e2) {
log.warn("handleOutput - error retrieving error information",
e2);
}
@@ -354,15 +353,14 @@ public class HttpAccessTokenRetriever implements
AccessTokenRetriever {
return sanitizeString("the token endpoint response's access_token JSON
attribute", accessTokenNode.textValue());
}
- static String formatAuthorizationHeader(String clientId, String
clientSecret, boolean urlencode) throws
- UnsupportedEncodingException {
+ static String formatAuthorizationHeader(String clientId, String
clientSecret, boolean urlencode) {
clientId = sanitizeString("the token endpoint request client ID
parameter", clientId);
clientSecret = sanitizeString("the token endpoint request client
secret parameter", clientSecret);
// according to RFC-6749 clientId & clientSecret must be urlencoded,
see https://tools.ietf.org/html/rfc6749#section-2.3.1
if (urlencode) {
- clientId = URLEncoder.encode(clientId,
StandardCharsets.UTF_8.name());
- clientSecret = URLEncoder.encode(clientSecret,
StandardCharsets.UTF_8.name());
+ clientId = URLEncoder.encode(clientId, StandardCharsets.UTF_8);
+ clientSecret = URLEncoder.encode(clientSecret,
StandardCharsets.UTF_8);
}
String s = String.format("%s:%s", clientId, clientSecret);
@@ -371,22 +369,17 @@ public class HttpAccessTokenRetriever implements
AccessTokenRetriever {
return String.format("Basic %s", encoded);
}
- static String formatRequestBody(String scope) throws IOException {
- try {
- StringBuilder requestParameters = new StringBuilder();
- requestParameters.append("grant_type=client_credentials");
-
- if (scope != null && !scope.trim().isEmpty()) {
- scope = scope.trim();
- String encodedScope = URLEncoder.encode(scope,
StandardCharsets.UTF_8.name());
- requestParameters.append("&scope=").append(encodedScope);
- }
+ static String formatRequestBody(String scope) {
+ StringBuilder requestParameters = new StringBuilder();
+ requestParameters.append("grant_type=client_credentials");
- return requestParameters.toString();
- } catch (UnsupportedEncodingException e) {
- // The world has gone crazy!
- throw new IOException(String.format("Encoding %s not supported",
StandardCharsets.UTF_8.name()));
+ if (scope != null && !scope.trim().isEmpty()) {
+ scope = scope.trim();
+ String encodedScope = URLEncoder.encode(scope,
StandardCharsets.UTF_8);
+ requestParameters.append("&scope=").append(encodedScope);
}
+
+ return requestParameters.toString();
}
private static String sanitizeString(String name, String value) {
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwks.java
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwks.java
index 3618005ebe1..62261fed58d 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwks.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwks.java
@@ -142,7 +142,7 @@ public final class RefreshingHttpsJwks implements Initable,
Closeable {
this.refreshRetryBackoffMs = refreshRetryBackoffMs;
this.refreshRetryBackoffMaxMs = refreshRetryBackoffMaxMs;
this.executorService = executorService;
- this.missingKeyIds = new LinkedHashMap<String,
Long>(MISSING_KEY_ID_CACHE_MAX_ENTRIES, .75f, true) {
+ this.missingKeyIds = new
LinkedHashMap<>(MISSING_KEY_ID_CACHE_MAX_ENTRIES, .75f, true) {
@Override
protected boolean removeEldestEntry(Map.Entry<String, Long>
eldest) {
return this.size() > MISSING_KEY_ID_CACHE_MAX_ENTRIES;
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java
index 64f5ddf070b..6b1148e291b 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java
@@ -349,9 +349,7 @@ public class OAuthBearerUnsecuredJws implements
OAuthBearerToken {
if (Utils.isBlank(scopeClaimValue))
return Collections.emptySet();
else {
- Set<String> retval = new HashSet<>();
- retval.add(scopeClaimValue.trim());
- return Collections.unmodifiableSet(retval);
+ return Set.of(scopeClaimValue.trim());
}
}
List<?> scopeClaimValue = claim(scopeClaimName, List.class);
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java
index 2241eb50fdc..455fda983c3 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java
@@ -32,12 +32,10 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
import java.util.Base64;
import java.util.Base64.Encoder;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -104,8 +102,7 @@ public class OAuthBearerUnsecuredLoginCallbackHandler
implements AuthenticateCal
private static final String PRINCIPAL_CLAIM_NAME_OPTION = OPTION_PREFIX +
"PrincipalClaimName";
private static final String LIFETIME_SECONDS_OPTION = OPTION_PREFIX +
"LifetimeSeconds";
private static final String SCOPE_CLAIM_NAME_OPTION = OPTION_PREFIX +
"ScopeClaimName";
- private static final Set<String> RESERVED_CLAIMS = Collections
- .unmodifiableSet(new HashSet<>(Arrays.asList("iat", "exp")));
+ private static final Set<String> RESERVED_CLAIMS = Set.of("iat", "exp");
private static final String DEFAULT_PRINCIPAL_CLAIM_NAME = "sub";
private static final String DEFAULT_LIFETIME_SECONDS_ONE_HOUR = "3600";
private static final String DEFAULT_SCOPE_CLAIM_NAME = "scope";
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java
index a817d7b8f27..53e099688d9 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java
@@ -193,7 +193,7 @@ public class OAuthBearerUnsecuredValidatorCallbackHandler
implements Authenticat
private int allowableClockSkewMs() {
String allowableClockSkewMsValue =
option(ALLOWABLE_CLOCK_SKEW_MILLIS_OPTION);
- int allowableClockSkewMs = 0;
+ int allowableClockSkewMs;
try {
allowableClockSkewMs = Utils.isBlank(allowableClockSkewMsValue) ?
0 : Integer.parseInt(allowableClockSkewMsValue.trim());
} catch (NumberFormatException e) {
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/ssl/CommonNameLoggingTrustManagerFactoryWrapper.java
b/clients/src/main/java/org/apache/kafka/common/security/ssl/CommonNameLoggingTrustManagerFactoryWrapper.java
index cae9e19ad2d..a7b16906d5d 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/ssl/CommonNameLoggingTrustManagerFactoryWrapper.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/ssl/CommonNameLoggingTrustManagerFactoryWrapper.java
@@ -121,7 +121,7 @@ class CommonNameLoggingTrustManagerFactoryWrapper {
this.origTm = originalTrustManager;
this.nrOfRememberedBadCerts = nrOfRememberedBadCerts;
// Restrict maximal size of the LinkedHashMap to avoid security
attacks causing OOM
- this.previouslyRejectedClientCertChains = new
LinkedHashMap<ByteBuffer, String>() {
+ this.previouslyRejectedClientCertChains = new LinkedHashMap<>() {
@Override
protected boolean removeEldestEntry(final
Map.Entry<ByteBuffer, String> eldest) {
return size() > nrOfRememberedBadCerts;
@@ -238,7 +238,7 @@ class CommonNameLoggingTrustManagerFactoryWrapper {
principalToCertMap.put(principal, cert);
}
// Thus, expect certificate chain to be broken, e.g. containing
multiple enbd certificates
- HashSet<X509Certificate> endCertificates = new HashSet<>();
+ Set<X509Certificate> endCertificates = new HashSet<>();
for (X509Certificate cert: origChain) {
X500Principal subjectPrincipal =
cert.getSubjectX500Principal();
if
(!issuedbyPrincipalToCertificatesMap.containsKey(subjectPrincipal)) {
diff --git
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/TelemetryMetricNamingConvention.java
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/TelemetryMetricNamingConvention.java
index 9e6dbc0c559..cb8c245a613 100644
---
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/TelemetryMetricNamingConvention.java
+++
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/TelemetryMetricNamingConvention.java
@@ -41,7 +41,7 @@ public class TelemetryMetricNamingConvention {
public static MetricNamingStrategy<MetricName>
getClientTelemetryMetricNamingStrategy(String prefix) {
Objects.requireNonNull(prefix, "prefix cannot be null");
- return new MetricNamingStrategy<MetricName>() {
+ return new MetricNamingStrategy<>() {
@Override
public MetricKey metricKey(MetricName metricName) {
Objects.requireNonNull(metricName, "metric name cannot be
null");
diff --git
a/clients/src/main/java/org/apache/kafka/common/utils/ChildFirstClassLoader.java
b/clients/src/main/java/org/apache/kafka/common/utils/ChildFirstClassLoader.java
index ce1680b62a7..9a891e08463 100644
---
a/clients/src/main/java/org/apache/kafka/common/utils/ChildFirstClassLoader.java
+++
b/clients/src/main/java/org/apache/kafka/common/utils/ChildFirstClassLoader.java
@@ -112,7 +112,7 @@ public class ChildFirstClassLoader extends URLClassLoader {
Enumeration<URL> urls1 = findResources(name);
Enumeration<URL> urls2 = getParent() != null ?
getParent().getResources(name) : null;
- return new Enumeration<URL>() {
+ return new Enumeration<>() {
@Override
public boolean hasMoreElements() {
return (urls1 != null && urls1.hasMoreElements()) || (urls2 !=
null && urls2.hasMoreElements());
diff --git
a/clients/src/main/java/org/apache/kafka/common/utils/CloseableIterator.java
b/clients/src/main/java/org/apache/kafka/common/utils/CloseableIterator.java
index 50b06369b47..1709b5d47fb 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/CloseableIterator.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/CloseableIterator.java
@@ -29,7 +29,7 @@ public interface CloseableIterator<T> extends Iterator<T>,
Closeable {
void close();
static <R> CloseableIterator<R> wrap(Iterator<R> inner) {
- return new CloseableIterator<R>() {
+ return new CloseableIterator<>() {
@Override
public void close() {}
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java
b/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java
index 61d29a1a353..5b1b99692c9 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java
@@ -16,9 +16,6 @@
*/
package org.apache.kafka.common.utils;
-import org.apache.kafka.common.KafkaException;
-
-import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
@@ -50,23 +47,19 @@ public class Sanitizer {
* using URL-encoding.
*/
public static String sanitize(String name) {
- try {
- String encoded = URLEncoder.encode(name,
StandardCharsets.UTF_8.name());
- StringBuilder builder = new StringBuilder();
- for (int i = 0; i < encoded.length(); i++) {
- char c = encoded.charAt(i);
- if (c == '*') { // Metric ObjectName treats * as
pattern
- builder.append("%2A");
- } else if (c == '+') { // Space URL-encoded as +, replace
with percent encoding
- builder.append("%20");
- } else {
- builder.append(c);
- }
+ String encoded = URLEncoder.encode(name, StandardCharsets.UTF_8);
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < encoded.length(); i++) {
+ char c = encoded.charAt(i);
+ if (c == '*') { // Metric ObjectName treats * as pattern
+ builder.append("%2A");
+ } else if (c == '+') { // Space URL-encoded as +, replace with
percent encoding
+ builder.append("%20");
+ } else {
+ builder.append(c);
}
- return builder.toString();
- } catch (UnsupportedEncodingException e) {
- throw new KafkaException(e);
}
+ return builder.toString();
}
/**
@@ -74,11 +67,7 @@ public class Sanitizer {
* is used to obtain the desanitized version of node names in ZooKeeper.
*/
public static String desanitize(String name) {
- try {
- return URLDecoder.decode(name, StandardCharsets.UTF_8.name());
- } catch (UnsupportedEncodingException e) {
- throw new KafkaException(e);
- }
+ return URLDecoder.decode(name, StandardCharsets.UTF_8);
}
/**
diff --git
a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
index c501dce65d3..ca2a76b4716 100644
--- a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
+++ b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
@@ -201,12 +201,12 @@ public interface Authorizer extends Configurable,
Closeable {
resourceTypeFilter, AccessControlEntryFilter.ANY);
EnumMap<PatternType, Set<String>> denyPatterns =
- new EnumMap<PatternType, Set<String>>(PatternType.class) {{
+ new EnumMap<>(PatternType.class) {{
put(PatternType.LITERAL, new HashSet<>());
put(PatternType.PREFIXED, new HashSet<>());
}};
EnumMap<PatternType, Set<String>> allowPatterns =
- new EnumMap<PatternType, Set<String>>(PatternType.class) {{
+ new EnumMap<>(PatternType.class) {{
put(PatternType.LITERAL, new HashSet<>());
put(PatternType.PREFIXED, new HashSet<>());
}};
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index a836e982140..d713abbcae7 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -3206,7 +3206,7 @@ public class KafkaAdminClientTest {
}
@Test
- public void testListGroupsWithTypesOlderBrokerVersion() throws Exception {
+ public void testListGroupsWithTypesOlderBrokerVersion() {
ApiVersion listGroupV4 = new ApiVersion()
.setApiKey(ApiKeys.LIST_GROUPS.id)
.setMinVersion((short) 0)
@@ -5296,7 +5296,7 @@ public class KafkaAdminClientTest {
}
@Test
- public void testListShareGroupsWithStatesOlderBrokerVersion() throws
Exception {
+ public void testListShareGroupsWithStatesOlderBrokerVersion() {
ApiVersion listGroupV4 = new ApiVersion()
.setApiKey(ApiKeys.LIST_GROUPS.id)
.setMinVersion((short) 0)
@@ -6548,7 +6548,7 @@ public class KafkaAdminClientTest {
.noneMatch(p -> p.timestamp() ==
ListOffsetsRequest.MAX_TIMESTAMP),
new ListOffsetsResponse(responseData), node);
- ListOffsetsResult result = env.adminClient().listOffsets(new
HashMap<TopicPartition, OffsetSpec>() {{
+ ListOffsetsResult result = env.adminClient().listOffsets(new
HashMap<>() {{
put(tp0, OffsetSpec.maxTimestamp());
put(tp1, OffsetSpec.latest());
}});
@@ -6609,7 +6609,7 @@ public class KafkaAdminClientTest {
new ListOffsetsResponse(responseDataWithError), node);
}
ListOffsetsResult result = env.adminClient().listOffsets(
- new HashMap<TopicPartition, OffsetSpec>() {
+ new HashMap<>() {
{
put(tp0, OffsetSpec.latest());
put(tp1, OffsetSpec.latest());
@@ -6632,7 +6632,7 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponseFrom(
request -> request instanceof ListOffsetsRequest, new
ListOffsetsResponse(responseData), node);
result = env.adminClient().listOffsets(
- new HashMap<TopicPartition, OffsetSpec>() {
+ new HashMap<>() {
{
put(tp0, OffsetSpec.latest());
put(tp1, OffsetSpec.latest());
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandlerTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandlerTest.java
index 9651964e1c6..4cd9d613095 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandlerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandlerTest.java
@@ -61,7 +61,7 @@ public class DeleteRecordsHandlerTest {
private final TopicPartition t0p3 = new TopicPartition("t0", 3);
private final Node node1 = new Node(1, "host", 1234);
private final Node node2 = new Node(2, "host", 1235);
- private final Map<TopicPartition, RecordsToDelete> recordsToDelete = new
HashMap<TopicPartition, RecordsToDelete>() {
+ private final Map<TopicPartition, RecordsToDelete> recordsToDelete = new
HashMap<>() {
{
put(t0p0, RecordsToDelete.beforeOffset(10L));
put(t0p1, RecordsToDelete.beforeOffset(10L));
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandlerTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandlerTest.java
index 5ad92ce1c95..a7156554001 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandlerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandlerTest.java
@@ -65,7 +65,7 @@ public final class ListOffsetsHandlerTest {
private final Node node = new Node(1, "host", 1234);
- private final Map<TopicPartition, Long> offsetTimestampsByPartition = new
HashMap<TopicPartition, Long>() {
+ private final Map<TopicPartition, Long> offsetTimestampsByPartition = new
HashMap<>() {
{
put(t0p0, ListOffsetsRequest.LATEST_TIMESTAMP);
put(t0p1, ListOffsetsRequest.EARLIEST_TIMESTAMP);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 2deaec2efea..f39d11a5cb5 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1969,11 +1969,11 @@ public class KafkaConsumerTest {
}
try (KafkaConsumer<byte[], byte[]> consumer =
newConsumer(groupProtocol, null)) {
- assertThrows(InvalidGroupIdException.class, () ->
consumer.commitAsync());
+ assertThrows(InvalidGroupIdException.class, consumer::commitAsync);
}
try (KafkaConsumer<byte[], byte[]> consumer =
newConsumer(groupProtocol, null)) {
- assertThrows(InvalidGroupIdException.class, () ->
consumer.commitSync());
+ assertThrows(InvalidGroupIdException.class, consumer::commitSync);
}
}
@@ -1984,8 +1984,8 @@ public class KafkaConsumerTest {
consumer.assign(singleton(tp0));
assertThrows(InvalidGroupIdException.class, () ->
consumer.committed(Collections.singleton(tp0)).get(tp0));
- assertThrows(InvalidGroupIdException.class, () ->
consumer.commitAsync());
- assertThrows(InvalidGroupIdException.class, () ->
consumer.commitSync());
+ assertThrows(InvalidGroupIdException.class, consumer::commitAsync);
+ assertThrows(InvalidGroupIdException.class, consumer::commitSync);
}
}
@@ -2668,7 +2668,7 @@ public class KafkaConsumerTest {
@ParameterizedTest
@EnumSource(GroupProtocol.class)
- public void testListOffsetShouldUpdateSubscriptions(GroupProtocol
groupProtocol) throws InterruptedException {
+ public void testListOffsetShouldUpdateSubscriptions(GroupProtocol
groupProtocol) {
final ConsumerMetadata metadata = createMetadata(subscription);
final MockClient client = new MockClient(time, metadata);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index f82bb011b84..8b5c2d418df 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -156,7 +156,7 @@ public class AbstractCoordinatorTest {
groupInstanceId,
retryBackoffMs,
retryBackoffMaxMs,
-
!groupInstanceId.isPresent());
+
groupInstanceId.isEmpty());
this.coordinator = new DummyCoordinator(rebalanceConfig,
consumerClient,
metrics,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
index 5960fd28fbf..68b9ecb528b 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
@@ -214,7 +214,7 @@ public class FetchCollectorTest {
assignAndSeek(topicAPartition0);
// Create a FetchCollector that fails on CompletedFetch initialization.
- fetchCollector = new FetchCollector<String, String>(logContext,
+ fetchCollector = new FetchCollector<>(logContext,
metadata,
subscriptions,
fetchConfig,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
index 3e3f70a7443..c2b6ad8b168 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
@@ -161,7 +161,7 @@ public class FetchRequestManagerTest {
private final String topicName = "test";
private final String groupId = "test-group";
private final Uuid topicId = Uuid.randomUuid();
- private final Map<String, Uuid> topicIds = new HashMap<String, Uuid>() {
+ private final Map<String, Uuid> topicIds = new HashMap<>() {
{
put(topicName, topicId);
}
@@ -1723,8 +1723,7 @@ public class FetchRequestManagerTest {
assertFalse(subscriptions.isOffsetResetNeeded(tp0));
for (int i = 0; i < 2; i++) {
- OffsetOutOfRangeException e =
assertThrows(OffsetOutOfRangeException.class, () ->
- collectFetch());
+ OffsetOutOfRangeException e =
assertThrows(OffsetOutOfRangeException.class, this::collectFetch);
assertEquals(singleton(tp0),
e.offsetOutOfRangePartitions().keySet());
assertEquals(0L,
e.offsetOutOfRangePartitions().get(tp0).longValue());
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 6c29d3df82b..c554f1c8e7a 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -157,7 +157,7 @@ public class FetcherTest {
private final String topicName = "test";
private final String groupId = "test-group";
private final Uuid topicId = Uuid.randomUuid();
- private final Map<String, Uuid> topicIds = new HashMap<String, Uuid>() {
+ private final Map<String, Uuid> topicIds = new HashMap<>() {
{
put(topicName, topicId);
}
@@ -1709,8 +1709,7 @@ public class FetcherTest {
assertFalse(subscriptions.isOffsetResetNeeded(tp0));
for (int i = 0; i < 2; i++) {
- OffsetOutOfRangeException e =
assertThrows(OffsetOutOfRangeException.class, () ->
- collectFetch());
+ OffsetOutOfRangeException e =
assertThrows(OffsetOutOfRangeException.class, this::collectFetch);
assertEquals(singleton(tp0),
e.offsetOutOfRangePartitions().keySet());
assertEquals(0L,
e.offsetOutOfRangePartitions().get(tp0).longValue());
}
@@ -2849,7 +2848,7 @@ public class FetcherTest {
true, // check crcs
CommonClientConfigs.DEFAULT_CLIENT_RACK,
isolationLevel);
- fetcher = new Fetcher<byte[], byte[]>(
+ fetcher = new Fetcher<>(
logContext,
consumerClient,
metadata,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
index 4973624b0a0..bcbd0411656 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
@@ -99,7 +99,7 @@ public class OffsetFetcherTest {
private final String topicName = "test";
private final Uuid topicId = Uuid.randomUuid();
- private final Map<String, Uuid> topicIds = new HashMap<String, Uuid>() {
+ private final Map<String, Uuid> topicIds = new HashMap<>() {
{
put(topicName, topicId);
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java
index e218f8109fc..2cc4485f460 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java
@@ -182,7 +182,7 @@ public class RequestFutureTest {
@Test
public void testComposeSuccessCase() {
RequestFuture<String> future = new RequestFuture<>();
- RequestFuture<Integer> composed = future.compose(new
RequestFutureAdapter<String, Integer>() {
+ RequestFuture<Integer> composed = future.compose(new
RequestFutureAdapter<>() {
@Override
public void onSuccess(String value, RequestFuture<Integer> future)
{
future.complete(value.length());
@@ -199,7 +199,7 @@ public class RequestFutureTest {
@Test
public void testComposeFailureCase() {
RequestFuture<String> future = new RequestFuture<>();
- RequestFuture<Integer> composed = future.compose(new
RequestFutureAdapter<String, Integer>() {
+ RequestFuture<Integer> composed = future.compose(new
RequestFutureAdapter<>() {
@Override
public void onSuccess(String value, RequestFuture<Integer> future)
{
future.complete(value.length());
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index 8ec60d4ea97..135574d4e1d 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -124,13 +124,13 @@ public class ShareConsumeRequestManagerTest {
private final String groupId = "test-group";
private final Uuid topicId = Uuid.randomUuid();
private final Uuid topicId2 = Uuid.randomUuid();
- private final Map<String, Uuid> topicIds = new HashMap<String, Uuid>() {
+ private final Map<String, Uuid> topicIds = new HashMap<>() {
{
put(topicName, topicId);
put(topicName2, topicId2);
}
};
- private final Map<String, Integer> topicPartitionCounts = new
HashMap<String, Integer>() {
+ private final Map<String, Integer> topicPartitionCounts = new HashMap<>() {
{
put(topicName, 2);
put(topicName2, 1);
@@ -739,7 +739,7 @@ public class ShareConsumeRequestManagerTest {
}
@Test
- public void testRetryAcknowledgementsWithLeaderChange() throws
InterruptedException {
+ public void testRetryAcknowledgementsWithLeaderChange() {
buildRequestManager();
subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
@@ -1473,7 +1473,7 @@ public class ShareConsumeRequestManagerTest {
}
/**
- * Assert that the {@link ShareFetchCollector#collect(ShareFetchBuffer)}
latest fetch} does not contain any
+ * Assert that the {@link ShareFetchCollector#collect(ShareFetchBuffer)
latest fetch} does not contain any
* {@link ShareFetch#records() user-visible records}, and is {@link
ShareFetch#isEmpty() empty}.
*
* @param reason the reason to include for assertion methods such as
{@link org.junit.jupiter.api.Assertions#assertTrue(boolean, String)}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
index fb6a57ac039..893840de4c6 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
@@ -137,7 +137,7 @@ public class ShareFetchCollectorTest {
subscribeAndAssign(topicAPartition0);
// Create a ShareFetchCollector that fails on ShareCompletedFetch
initialization.
- fetchCollector = new ShareFetchCollector<String, String>(logContext,
+ fetchCollector = new ShareFetchCollector<>(logContext,
metadata,
subscriptions,
fetchConfig,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataFetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataFetcherTest.java
index e5b2833b154..5f34836eaba 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataFetcherTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataFetcherTest.java
@@ -59,7 +59,7 @@ public class TopicMetadataFetcherTest {
private final String topicName = "test";
private final Uuid topicId = Uuid.randomUuid();
- private final Map<String, Uuid> topicIds = new HashMap<String, Uuid>() {
+ private final Map<String, Uuid> topicIds = new HashMap<>() {
{
put(topicName, topicId);
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 3bd0df92b00..e2e8ca7fed2 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -748,7 +748,7 @@ public class KafkaProducerTest {
}
};
- return new KafkaProducer<String, String>(
+ return new KafkaProducer<>(
new
ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, new
StringSerializer(), new StringSerializer())),
new StringSerializer(), new StringSerializer(), metadata,
mockClient, null, time) {
@Override
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index 0045f271c1a..d3964fc87ed 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -393,14 +393,14 @@ public class MockProducerTest {
producer.beginTransaction();
String group1 = "g1";
- Map<TopicPartition, OffsetAndMetadata> group1Commit = new
HashMap<TopicPartition, OffsetAndMetadata>() {
+ Map<TopicPartition, OffsetAndMetadata> group1Commit = new HashMap<>() {
{
put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L,
null));
put(new TopicPartition(topic, 1), new OffsetAndMetadata(73L,
null));
}
};
String group2 = "g2";
- Map<TopicPartition, OffsetAndMetadata> group2Commit = new
HashMap<TopicPartition, OffsetAndMetadata>() {
+ Map<TopicPartition, OffsetAndMetadata> group2Commit = new HashMap<>() {
{
put(new TopicPartition(topic, 0), new OffsetAndMetadata(101L,
null));
put(new TopicPartition(topic, 1), new OffsetAndMetadata(21L,
null));
@@ -435,7 +435,7 @@ public class MockProducerTest {
producer.sendOffsetsToTransaction(Collections.emptyMap(), new
ConsumerGroupMetadata("groupId"));
assertFalse(producer.sentOffsets());
}
-
+
@Test
public void shouldAddOffsetsWhenSendOffsetsToTransactionByGroupMetadata() {
buildMockProducer(true);
@@ -444,7 +444,7 @@ public class MockProducerTest {
assertFalse(producer.sentOffsets());
- Map<TopicPartition, OffsetAndMetadata> groupCommit = new
HashMap<TopicPartition, OffsetAndMetadata>() {
+ Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<>() {
{
put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L,
null));
}
@@ -461,7 +461,7 @@ public class MockProducerTest {
assertFalse(producer.sentOffsets());
- Map<TopicPartition, OffsetAndMetadata> groupCommit = new
HashMap<TopicPartition, OffsetAndMetadata>() {
+ Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<>() {
{
put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L,
null));
}
@@ -488,13 +488,13 @@ public class MockProducerTest {
producer.beginTransaction();
String group = "g";
- Map<TopicPartition, OffsetAndMetadata> groupCommit1 = new
HashMap<TopicPartition, OffsetAndMetadata>() {
+ Map<TopicPartition, OffsetAndMetadata> groupCommit1 = new HashMap<>() {
{
put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L,
null));
put(new TopicPartition(topic, 1), new OffsetAndMetadata(73L,
null));
}
};
- Map<TopicPartition, OffsetAndMetadata> groupCommit2 = new
HashMap<TopicPartition, OffsetAndMetadata>() {
+ Map<TopicPartition, OffsetAndMetadata> groupCommit2 = new HashMap<>() {
{
put(new TopicPartition(topic, 1), new OffsetAndMetadata(101L,
null));
put(new TopicPartition(topic, 2), new OffsetAndMetadata(21L,
null));
@@ -506,7 +506,7 @@ public class MockProducerTest {
assertTrue(producer.consumerGroupOffsetsHistory().isEmpty());
Map<String, Map<TopicPartition, OffsetAndMetadata>> expectedResult =
new HashMap<>();
- expectedResult.put(group, new HashMap<TopicPartition,
OffsetAndMetadata>() {
+ expectedResult.put(group, new HashMap<>() {
{
put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L,
null));
put(new TopicPartition(topic, 1), new OffsetAndMetadata(101L,
null));
@@ -525,7 +525,7 @@ public class MockProducerTest {
producer.beginTransaction();
String group = "g";
- Map<TopicPartition, OffsetAndMetadata> groupCommit = new
HashMap<TopicPartition, OffsetAndMetadata>() {
+ Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<>() {
{
put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L,
null));
put(new TopicPartition(topic, 1), new OffsetAndMetadata(73L,
null));
@@ -554,7 +554,7 @@ public class MockProducerTest {
producer.beginTransaction();
String group = "g";
- Map<TopicPartition, OffsetAndMetadata> groupCommit = new
HashMap<TopicPartition, OffsetAndMetadata>() {
+ Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<>() {
{
put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L,
null));
put(new TopicPartition(topic, 1), new OffsetAndMetadata(73L,
null));
@@ -579,7 +579,7 @@ public class MockProducerTest {
producer.beginTransaction();
String group = "g";
- Map<TopicPartition, OffsetAndMetadata> groupCommit = new
HashMap<TopicPartition, OffsetAndMetadata>() {
+ Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<>() {
{
put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L,
null));
put(new TopicPartition(topic, 1), new OffsetAndMetadata(73L,
null));
@@ -591,7 +591,7 @@ public class MockProducerTest {
producer.beginTransaction();
String group2 = "g2";
- Map<TopicPartition, OffsetAndMetadata> groupCommit2 = new
HashMap<TopicPartition, OffsetAndMetadata>() {
+ Map<TopicPartition, OffsetAndMetadata> groupCommit2 = new HashMap<>() {
{
put(new TopicPartition(topic, 2), new OffsetAndMetadata(53L,
null));
put(new TopicPartition(topic, 3), new OffsetAndMetadata(84L,
null));
diff --git
a/clients/src/test/java/org/apache/kafka/common/config/provider/EnvVarConfigProviderTest.java
b/clients/src/test/java/org/apache/kafka/common/config/provider/EnvVarConfigProviderTest.java
index 9509d398305..bbd2268e7cb 100644
---
a/clients/src/test/java/org/apache/kafka/common/config/provider/EnvVarConfigProviderTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/config/provider/EnvVarConfigProviderTest.java
@@ -40,7 +40,7 @@ class EnvVarConfigProviderTest {
@BeforeEach
public void setup() {
- Map<String, String> testEnvVars = new HashMap<String, String>() {
+ Map<String, String> testEnvVars = new HashMap<>() {
{
put("test_var1", "value1");
put("secret_var2", "value2");
diff --git
a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
index ad94ae1dcd9..dd1adb31815 100644
---
a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
@@ -138,9 +138,8 @@ public class SaslChannelBuilderTest {
*/
@Test
public void testClientChannelBuilderWithBrokerConfigs() throws Exception {
- Map<String, Object> configs = new HashMap<>();
CertStores certStores = new CertStores(false, "client", "localhost");
- configs.putAll(certStores.getTrustingConfig(certStores));
+ Map<String, Object> configs = new
HashMap<>(certStores.getTrustingConfig(certStores));
configs.put(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka");
configs.putAll(new ConfigDef().withClientSaslSupport().parse(configs));
for (Field field : BrokerSecurityConfigs.class.getFields()) {
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientTest.java
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientTest.java
index 3e9ea7f4db1..94f4f1fc8c4 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientTest.java
@@ -42,7 +42,7 @@ import static org.junit.jupiter.api.Assertions.fail;
public class OAuthBearerSaslClientTest {
- private static final Map<String, String> TEST_PROPERTIES = new
LinkedHashMap<String, String>() {
+ private static final Map<String, String> TEST_PROPERTIES = new
LinkedHashMap<>() {
{
put("One", "1");
put("Two", "2");
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java
index d50fd99a10d..581a72a5207 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java
@@ -52,14 +52,10 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
public class OAuthBearerSaslServerTest {
private static final String USER = "user";
- private static final Map<String, ?> CONFIGS;
- static {
- String jaasConfigText =
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required"
+ private static final String JAAS_CONFIG_TEXT =
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required"
+ " unsecuredLoginStringClaim_sub=\"" + USER + "\";";
- Map<String, Object> tmp = new HashMap<>();
- tmp.put(SaslConfigs.SASL_JAAS_CONFIG, new Password(jaasConfigText));
- CONFIGS = Collections.unmodifiableMap(tmp);
- }
+ private static final Map<String, ?> CONFIGS =
Map.of(SaslConfigs.SASL_JAAS_CONFIG, new Password(JAAS_CONFIG_TEXT));
+
private static final AuthenticateCallbackHandler LOGIN_CALLBACK_HANDLER;
static {
LOGIN_CALLBACK_HANDLER = new
OAuthBearerUnsecuredLoginCallbackHandler();
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetrieverTest.java
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetrieverTest.java
index b3eb3a026b7..8b1c5a37065 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetrieverTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetrieverTest.java
@@ -27,7 +27,6 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.nio.charset.StandardCharsets;
import java.util.Random;
@@ -172,19 +171,19 @@ public class HttpAccessTokenRetrieverTest extends
OAuthBearerTest {
}
@Test
- public void testFormatAuthorizationHeader() throws
UnsupportedEncodingException {
+ public void testFormatAuthorizationHeader() {
assertAuthorizationHeader("id", "secret", false, "Basic aWQ6c2VjcmV0");
}
@Test
- public void testFormatAuthorizationHeaderEncoding() throws
UnsupportedEncodingException {
+ public void testFormatAuthorizationHeaderEncoding() {
// according to RFC-7617, we need to use the *non-URL safe* base64
encoder. See KAFKA-14496.
assertAuthorizationHeader("SOME_RANDOM_LONG_USER_01234",
"9Q|0`8i~ute-n9ksjLWb\\50\"AX@UUED5E", false, "Basic
U09NRV9SQU5ET01fTE9OR19VU0VSXzAxMjM0OjlRfDBgOGl+dXRlLW45a3NqTFdiXDUwIkFYQFVVRUQ1RQ==");
// according to RFC-6749 clientId & clientSecret must be urlencoded,
see https://tools.ietf.org/html/rfc6749#section-2.3.1
assertAuthorizationHeader("user!@~'", "secret-(*)!", true, "Basic
dXNlciUyMSU0MCU3RSUyNzpzZWNyZXQtJTI4KiUyOSUyMQ==");
}
- private void assertAuthorizationHeader(String clientId, String
clientSecret, boolean urlencode, String expected) throws
UnsupportedEncodingException {
+ private void assertAuthorizationHeader(String clientId, String
clientSecret, boolean urlencode, String expected) {
String actual =
HttpAccessTokenRetriever.formatAuthorizationHeader(clientId, clientSecret,
urlencode);
assertEquals(expected, actual, String.format("Expected the HTTP
Authorization header generated for client ID \"%s\" and client secret \"%s\" to
match", clientId, clientSecret));
}
@@ -203,14 +202,14 @@ public class HttpAccessTokenRetrieverTest extends
OAuthBearerTest {
}
@Test
- public void testFormatRequestBody() throws IOException {
+ public void testFormatRequestBody() {
String expected = "grant_type=client_credentials&scope=scope";
String actual = HttpAccessTokenRetriever.formatRequestBody("scope");
assertEquals(expected, actual);
}
@Test
- public void testFormatRequestBodyWithEscaped() throws IOException {
+ public void testFormatRequestBodyWithEscaped() {
String questionMark = "%3F";
String exclamationMark = "%21";
@@ -224,7 +223,7 @@ public class HttpAccessTokenRetrieverTest extends
OAuthBearerTest {
}
@Test
- public void testFormatRequestBodyMissingValues() throws IOException {
+ public void testFormatRequestBodyMissingValues() {
String expected = "grant_type=client_credentials";
String actual = HttpAccessTokenRetriever.formatRequestBody(null);
assertEquals(expected, actual);
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandlerTest.java
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandlerTest.java
index 84e06f2381d..d697dd46ead 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandlerTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandlerTest.java
@@ -28,7 +28,6 @@ import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Base64.Encoder;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
import javax.security.auth.callback.Callback;
@@ -50,25 +49,16 @@ public class
OAuthBearerUnsecuredValidatorCallbackHandlerTest {
private static final String TOO_EARLY_EXPIRATION_TIME_CLAIM_TEXT =
expClaimText(0);
private static final String ISSUED_AT_CLAIM_TEXT =
claimOrHeaderText("iat", MOCK_TIME.milliseconds() / 1000.0);
private static final String SCOPE_CLAIM_TEXT = claimOrHeaderText("scope",
"scope1");
- private static final Map<String, String>
MODULE_OPTIONS_MAP_NO_SCOPE_REQUIRED;
- static {
- Map<String, String> tmp = new HashMap<>();
- tmp.put("unsecuredValidatorPrincipalClaimName", "principal");
- tmp.put("unsecuredValidatorAllowableClockSkewMs", "1");
- MODULE_OPTIONS_MAP_NO_SCOPE_REQUIRED =
Collections.unmodifiableMap(tmp);
- }
- private static final Map<String, String>
MODULE_OPTIONS_MAP_REQUIRE_EXISTING_SCOPE;
- static {
- Map<String, String> tmp = new HashMap<>();
- tmp.put("unsecuredValidatorRequiredScope", "scope1");
- MODULE_OPTIONS_MAP_REQUIRE_EXISTING_SCOPE =
Collections.unmodifiableMap(tmp);
- }
- private static final Map<String, String>
MODULE_OPTIONS_MAP_REQUIRE_ADDITIONAL_SCOPE;
- static {
- Map<String, String> tmp = new HashMap<>();
- tmp.put("unsecuredValidatorRequiredScope", "scope1 scope2");
- MODULE_OPTIONS_MAP_REQUIRE_ADDITIONAL_SCOPE =
Collections.unmodifiableMap(tmp);
- }
+ private static final Map<String, String>
MODULE_OPTIONS_MAP_NO_SCOPE_REQUIRED = Map.of(
+ "unsecuredValidatorPrincipalClaimName", "principal",
+ "unsecuredValidatorAllowableClockSkewMs", "1");
+
+ private static final Map<String, String>
MODULE_OPTIONS_MAP_REQUIRE_EXISTING_SCOPE = Map.of(
+ "unsecuredValidatorRequiredScope", "scope1");
+
+ private static final Map<String, String>
MODULE_OPTIONS_MAP_REQUIRE_ADDITIONAL_SCOPE = Map.of(
+ "unsecuredValidatorRequiredScope", "scope1 scope2");
+
@Test
public void validToken() {
diff --git
a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
index 70e5d80e354..521a0f19415 100644
---
a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
@@ -45,7 +45,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
public class SerializationTest {
private final String topic = "testTopic";
- private final Map<Class<?>, List<Object>> testData = new HashMap<Class<?>,
List<Object>>() {
+ private final Map<Class<?>, List<Object>> testData = new HashMap<>() {
{
put(String.class, Arrays.asList(null, "my string"));
put(Short.class, Arrays.asList(null, (short) 32767, (short)
-32768));
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index 8928159526c..16fc6af154b 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -770,9 +770,7 @@ public class UtilsTest {
when(mockIterator.next()).thenReturn(rootDir.toPath()).thenReturn(subDir.toPath());
when(mockIterator.hasNext()).thenReturn(true).thenReturn(true).thenReturn(false);
- assertDoesNotThrow(() -> {
- Utils.delete(spyRootFile);
- });
+ assertDoesNotThrow(() -> Utils.delete(spyRootFile));
assertFalse(Files.exists(rootDir.toPath()));
assertFalse(Files.exists(subDir.toPath()));
}