This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 00e5803cd3a MINOR: Various cleanups in client tests (#13094)
00e5803cd3a is described below

commit 00e5803cd3af89011254e734232308618403309d
Author: Mickael Maison <[email protected]>
AuthorDate: Mon Jan 23 13:07:44 2023 +0100

    MINOR: Various cleanups in client tests (#13094)
    
    
    Reviewers: Luke Chen <[email protected]>, Ismael Juma <[email protected]>, 
Divij Vaidya <[email protected]>, Christo Lolov <[email protected]>
---
 .../kafka/clients/ClusterConnectionStatesTest.java |  3 +-
 .../java/org/apache/kafka/clients/MockClient.java  | 11 ++---
 .../apache/kafka/clients/NetworkClientTest.java    |  2 +-
 .../clients/consumer/ConsumerRecordsTest.java      |  2 +-
 .../internals/ConsumerInterceptorsTest.java        |  2 +-
 .../consumer/internals/ConsumerMetadataTest.java   |  3 +-
 .../internals/ConsumerNetworkClientTest.java       | 21 ++-------
 .../kafka/clients/producer/ProducerRecordTest.java | 10 ++---
 .../kafka/clients/producer/RecordSendTest.java     | 26 +++++------
 .../producer/RoundRobinPartitionerTest.java        | 10 ++---
 .../producer/UniformStickyPartitionerTest.java     | 10 ++---
 .../clients/producer/internals/BufferPoolTest.java | 12 +++--
 .../producer/internals/DefaultPartitionerTest.java |  2 +-
 .../producer/internals/ProducerBatchTest.java      |  6 +--
 .../producer/internals/RecordAccumulatorTest.java  | 51 +++++++++-------------
 .../clients/producer/internals/SenderTest.java     | 27 ++++++------
 .../internals/StickyPartitionCacheTest.java        |  2 +-
 .../org/apache/kafka/common/KafkaFutureTest.java   |  2 +-
 .../apache/kafka/common/acl/AclOperationTest.java  |  4 +-
 .../kafka/common/acl/AclPermissionTypeTest.java    |  6 +--
 .../apache/kafka/common/cache/LRUCacheTest.java    | 10 +++--
 .../apache/kafka/common/config/ConfigDefTest.java  | 38 ++++++++--------
 .../kafka/common/config/ConfigTransformerTest.java | 16 +++----
 .../provider/DirectoryConfigProviderTest.java      |  2 +-
 .../config/provider/FileConfigProviderTest.java    | 12 ++---
 .../common/header/internals/RecordHeadersTest.java |  4 +-
 .../common/internals/PartitionStatesTest.java      |  2 +-
 .../kafka/common/message/RecordsSerdeTest.java     |  8 ++--
 .../kafka/common/metrics/KafkaMbeanTest.java       |  6 +--
 .../apache/kafka/common/metrics/MetricsTest.java   | 18 ++++----
 .../apache/kafka/common/metrics/SensorTest.java    | 22 ++++------
 .../common/metrics/stats/FrequenciesTest.java      |  2 +-
 .../apache/kafka/common/network/EchoServer.java    | 49 ++++++++++-----------
 .../apache/kafka/common/network/NioEchoServer.java |  6 +--
 .../kafka/common/network/PlaintextSender.java      | 17 +++-----
 .../apache/kafka/common/network/SelectorTest.java  | 27 +++++-------
 .../kafka/common/network/SslSelectorTest.java      |  6 +--
 .../org/apache/kafka/common/network/SslSender.java |  5 +--
 .../kafka/common/record/DefaultRecordTest.java     |  2 +-
 .../kafka/common/record/FileRecordsTest.java       |  2 +-
 .../kafka/common/security/JaasContextTest.java     |  2 +-
 ... OAuthBearerSaslClientCallbackHandlerTest.java} |  2 +-
 .../OAuthBearerClientInitialResponseTest.java      |  2 +-
 .../internals/OAuthBearerSaslServerTest.java       | 12 +++--
 .../ExpiringCredentialRefreshingLoginTest.java     |  2 +-
 .../secured/HttpAccessTokenRetrieverTest.java      |  6 +--
 .../oauthbearer/internals/secured/RetryTest.java   |  2 +-
 ...earerUnsecuredValidatorCallbackHandlerTest.java | 11 ++---
 .../plain/internals/PlainSaslServerTest.java       |  4 +-
 .../scram/internals/ScramSaslServerTest.java       |  2 +-
 .../kafka/common/security/ssl/SslFactoryTest.java  |  2 +-
 .../security/ssl/mock/TestTrustManagerFactory.java | 13 +++---
 .../common/serialization/SerializationTest.java    |  4 +-
 .../kafka/common/utils/AbstractIteratorTest.java   |  4 +-
 .../org/apache/kafka/common/utils/ExitTest.java    |  4 +-
 .../utils/ImplicitLinkedHashCollectionTest.java    |  2 +-
 .../apache/kafka/common/utils/MockScheduler.java   | 42 ++++++++----------
 .../org/apache/kafka/common/utils/UtilsTest.java   |  5 ++-
 .../org/apache/kafka/test/Microbenchmarks.java     | 10 ++---
 .../java/org/apache/kafka/test/TestSslUtils.java   |  4 +-
 60 files changed, 274 insertions(+), 327 deletions(-)

diff --git 
a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
index 96fe89ca11e..b16de1a3164 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNotSame;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
@@ -183,7 +184,7 @@ public class ClusterConnectionStatesTest {
         connectionStates.authenticationFailed(nodeId1, time.milliseconds(), 
new AuthenticationException("No path to CA for certificate!"));
         time.sleep(1000);
         assertEquals(connectionStates.connectionState(nodeId1), 
ConnectionState.AUTHENTICATION_FAILED);
-        assertTrue(connectionStates.authenticationException(nodeId1) 
instanceof AuthenticationException);
+        assertNotNull(connectionStates.authenticationException(nodeId1));
         assertFalse(connectionStates.hasReadyNodes(time.milliseconds()));
         assertFalse(connectionStates.canConnect(nodeId1, time.milliseconds()));
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java 
b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 28d363bb8ef..c383df9338d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 
 import java.util.ArrayList;
@@ -471,12 +470,10 @@ public class MockClient implements KafkaClient {
     }
 
     public void waitForRequests(final int minRequests, long maxWaitMs) throws 
InterruptedException {
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return requests.size() >= minRequests;
-            }
-        }, maxWaitMs, "Expected requests have not been sent");
+        TestUtils.waitForCondition(
+                () -> requests.size() >= minRequests,
+                maxWaitMs,
+                "Expected requests have not been sent");
     }
 
     public void reset() {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 63b44835f63..ab7235e112e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -913,7 +913,7 @@ public class NetworkClientTest {
     }
 
     @Test
-    public void testCallDisconnect() throws Exception {
+    public void testCallDisconnect() {
         awaitReady(client, node);
         assertTrue(client.isReady(node, time.milliseconds()),
             "Expected NetworkClient to be ready to send to node " + 
node.idString());
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java
index d414450ac0e..ea870c24781 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java
@@ -39,7 +39,7 @@ public class ConsumerRecordsTest {
         Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = 
new LinkedHashMap<>();
 
         String topic = "topic";
-        records.put(new TopicPartition(topic, 0), new 
ArrayList<ConsumerRecord<Integer, String>>());
+        records.put(new TopicPartition(topic, 0), new ArrayList<>());
         ConsumerRecord<Integer, String> record1 = new ConsumerRecord<>(topic, 
1, 0, 0L, TimestampType.CREATE_TIME,
             0, 0, 1, "value1", new RecordHeaders(), Optional.empty());
         ConsumerRecord<Integer, String> record2 = new ConsumerRecord<>(topic, 
1, 1, 0L, TimestampType.CREATE_TIME,
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
index 19ac2560929..4331e720541 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
@@ -78,7 +78,7 @@ public class ConsumerInterceptorsTest {
                 if (tp.partition() != filterPartition)
                     recordMap.put(tp, records.records(tp));
             }
-            return new ConsumerRecords<K, V>(recordMap);
+            return new ConsumerRecords<>(recordMap);
         }
 
         @Override
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
index 02ab81c500e..24b1fa6fb33 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
@@ -46,6 +46,7 @@ import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class ConsumerMetadataTest {
@@ -142,7 +143,7 @@ public class ConsumerMetadataTest {
         
metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds(1,
 topicPartitionCounts, topicIds), false, time.milliseconds());
         assertEquals(singleton("foo"), new 
HashSet<>(metadata.fetch().topics()));
         assertEquals(topicIds.get("foo"), metadata.topicIds().get("foo"));
-        assertEquals(topicIds.get("bar"), null);
+        assertNull(topicIds.get("bar"));
     }
 
     private void testBasicSubscription(Set<String> expectedTopics, Set<String> 
expectedInternalTopics) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index df74f7ede1e..8d0e451ef13 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -221,12 +221,7 @@ public class ConsumerNetworkClientTest {
         final RequestFuture<ClientResponse> future = consumerClient.send(node, 
heartbeat());
 
         client.enableBlockingUntilWakeup(1);
-        Thread t = new Thread() {
-            @Override
-            public void run() {
-                consumerClient.poll(future);
-            }
-        };
+        Thread t = new Thread(() -> consumerClient.poll(future));
         t.start();
 
         consumerClient.disconnectAsync(node);
@@ -284,23 +279,13 @@ public class ConsumerNetworkClientTest {
         consumerClient.pollNoWakeup(); // dequeue and send the request
 
         client.enableBlockingUntilWakeup(2);
-        Thread t1 = new Thread() {
-            @Override
-            public void run() {
-                consumerClient.pollNoWakeup();
-            }
-        };
+        Thread t1 = new Thread(() -> consumerClient.pollNoWakeup());
         t1.start();
 
         // Sleep a little so that t1 is blocking in poll
         Thread.sleep(50);
 
-        Thread t2 = new Thread() {
-            @Override
-            public void run() {
-                consumerClient.poll(future);
-            }
-        };
+        Thread t2 = new Thread(() -> consumerClient.poll(future));
         t2.start();
 
         // Sleep a little so that t2 is awaiting the network client lock
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
index 8cb18d67fc8..b5513cd73e4 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
@@ -19,7 +19,7 @@ package org.apache.kafka.clients.producer;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
 public class ProducerRecordTest {
@@ -35,16 +35,16 @@ public class ProducerRecordTest {
         assertEquals(producerRecord.hashCode(), equalRecord.hashCode());
 
         ProducerRecord<String, Integer> topicMisMatch = new 
ProducerRecord<>("test-1", 1, "key", 1);
-        assertFalse(producerRecord.equals(topicMisMatch));
+        assertNotEquals(producerRecord, topicMisMatch);
 
         ProducerRecord<String, Integer> partitionMismatch = new 
ProducerRecord<>("test", 2, "key", 1);
-        assertFalse(producerRecord.equals(partitionMismatch));
+        assertNotEquals(producerRecord, partitionMismatch);
 
         ProducerRecord<String, Integer> keyMisMatch = new 
ProducerRecord<>("test", 1, "key-1", 1);
-        assertFalse(producerRecord.equals(keyMisMatch));
+        assertNotEquals(producerRecord, keyMisMatch);
 
         ProducerRecord<String, Integer> valueMisMatch = new 
ProducerRecord<>("test", 1, "key", 2);
-        assertFalse(producerRecord.equals(valueMisMatch));
+        assertNotEquals(producerRecord, valueMisMatch);
 
         ProducerRecord<String, Integer> nullFieldsRecord = new 
ProducerRecord<>("topic", null, null, null, null, null);
         assertEquals(nullFieldsRecord, nullFieldsRecord);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java 
b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
index 86632eecdc8..6dd93f7fc64 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
@@ -66,7 +66,7 @@ public class RecordSendTest {
      * Test that an asynchronous request will eventually throw the right 
exception
      */
     @Test
-    public void testError() throws Exception {
+    public void testError() {
         FutureRecordMetadata future = new 
FutureRecordMetadata(asyncRequest(baseOffset, new CorruptRecordException(), 
50L),
                 relOffset, RecordBatch.NO_TIMESTAMP, 0, 0, Time.SYSTEM);
         assertThrows(ExecutionException.class, future::get);
@@ -85,20 +85,18 @@ public class RecordSendTest {
     /* create a new request result that will be completed after the given 
timeout */
     public ProduceRequestResult asyncRequest(final long baseOffset, final 
RuntimeException error, final long timeout) {
         final ProduceRequestResult request = new 
ProduceRequestResult(topicPartition);
-        Thread thread = new Thread() {
-            public void run() {
-                try {
-                    sleep(timeout);
-                    if (error == null) {
-                        request.set(baseOffset, RecordBatch.NO_TIMESTAMP, 
null);
-                    } else {
-                        request.set(-1L, RecordBatch.NO_TIMESTAMP, index -> 
error);
-                    }
+        Thread thread = new Thread(() -> {
+            try {
+                Thread.sleep(timeout);
+                if (error == null) {
+                    request.set(baseOffset, RecordBatch.NO_TIMESTAMP, null);
+                } else {
+                    request.set(-1L, RecordBatch.NO_TIMESTAMP, index -> error);
+                }
 
-                    request.done();
-                } catch (InterruptedException e) { }
-            }
-        };
+                request.done();
+            } catch (InterruptedException e) { }
+        });
         thread.start();
         return request;
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java
index dfb98e9f00b..f65e1766116 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java
@@ -51,7 +51,7 @@ public class RoundRobinPartitionerTest {
         int countForPart2 = 0;
         Partitioner partitioner = new RoundRobinPartitioner();
         Cluster cluster = new Cluster("clusterId", asList(NODES[0], NODES[1], 
NODES[2]), partitions,
-            Collections.<String>emptySet(), Collections.<String>emptySet());
+            Collections.emptySet(), Collections.emptySet());
         for (int i = 1; i <= 100; i++) {
             int part = partitioner.partition("test", null, null, null, null, 
cluster);
             assertTrue(part == 0 || part == 2, "We should never choose a 
leader-less node in round robin");
@@ -64,7 +64,7 @@ public class RoundRobinPartitionerTest {
     }
 
     @Test
-    public void testRoundRobinWithKeyBytes() throws InterruptedException {
+    public void testRoundRobinWithKeyBytes() {
         final String topicA = "topicA";
         final String topicB = "topicB";
 
@@ -72,7 +72,7 @@ public class RoundRobinPartitionerTest {
                 new PartitionInfo(topicA, 1, NODES[1], NODES, NODES), new 
PartitionInfo(topicA, 2, NODES[2], NODES, NODES),
                 new PartitionInfo(topicB, 0, NODES[0], NODES, NODES));
         Cluster testCluster = new Cluster("clusterId", asList(NODES[0], 
NODES[1], NODES[2]), allPartitions,
-                Collections.<String>emptySet(), 
Collections.<String>emptySet());
+                Collections.emptySet(), Collections.emptySet());
 
         final Map<Integer, Integer> partitionCount = new HashMap<>();
 
@@ -96,7 +96,7 @@ public class RoundRobinPartitionerTest {
     }
     
     @Test
-    public void testRoundRobinWithNullKeyBytes() throws InterruptedException {
+    public void testRoundRobinWithNullKeyBytes() {
         final String topicA = "topicA";
         final String topicB = "topicB";
 
@@ -104,7 +104,7 @@ public class RoundRobinPartitionerTest {
                 new PartitionInfo(topicA, 1, NODES[1], NODES, NODES), new 
PartitionInfo(topicA, 2, NODES[2], NODES, NODES),
                 new PartitionInfo(topicB, 0, NODES[0], NODES, NODES));
         Cluster testCluster = new Cluster("clusterId", asList(NODES[0], 
NODES[1], NODES[2]), allPartitions,
-                Collections.<String>emptySet(), 
Collections.<String>emptySet());
+                Collections.emptySet(), Collections.emptySet());
 
         final Map<Integer, Integer> partitionCount = new HashMap<>();
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/UniformStickyPartitionerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/UniformStickyPartitionerTest.java
index f5484071717..0880dccebb2 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/UniformStickyPartitionerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/UniformStickyPartitionerTest.java
@@ -56,7 +56,7 @@ public class UniformStickyPartitionerTest {
         int part = 0;
         Partitioner partitioner = new UniformStickyPartitioner();
         Cluster cluster = new Cluster("clusterId", asList(NODES[0], NODES[1], 
NODES[2]), partitions,
-            Collections.<String>emptySet(), Collections.<String>emptySet());
+            Collections.emptySet(), Collections.emptySet());
         for (int i = 0; i < 50; i++) {
             part = partitioner.partition("test", null, null, null, null, 
cluster);
             assertTrue(part == 0 || part == 2, "We should never choose a 
leader-less node in round robin");
@@ -80,12 +80,12 @@ public class UniformStickyPartitionerTest {
 
     @SuppressWarnings("deprecation")
     @Test
-    public void testRoundRobinWithKeyBytes() throws InterruptedException {
+    public void testRoundRobinWithKeyBytes() {
         List<PartitionInfo> allPartitions = asList(new PartitionInfo(TOPIC_A, 
0, NODES[0], NODES, NODES),
                 new PartitionInfo(TOPIC_A, 1, NODES[1], NODES, NODES), new 
PartitionInfo(TOPIC_A, 2, NODES[1], NODES, NODES),
                 new PartitionInfo(TOPIC_B, 0, NODES[0], NODES, NODES));
         Cluster testCluster = new Cluster("clusterId", asList(NODES[0], 
NODES[1], NODES[2]), allPartitions,
-                Collections.<String>emptySet(), 
Collections.<String>emptySet());
+                Collections.emptySet(), Collections.emptySet());
 
         final Map<Integer, Integer> partitionCount = new HashMap<>();
 
@@ -145,12 +145,12 @@ public class UniformStickyPartitionerTest {
 
     @SuppressWarnings("deprecation")
     @Test
-    public void testRoundRobinWithNullKeyBytes() throws InterruptedException {
+    public void testRoundRobinWithNullKeyBytes() {
         List<PartitionInfo> allPartitions = asList(new PartitionInfo(TOPIC_A, 
0, NODES[0], NODES, NODES),
                 new PartitionInfo(TOPIC_A, 1, NODES[1], NODES, NODES), new 
PartitionInfo(TOPIC_A, 2, NODES[1], NODES, NODES),
                 new PartitionInfo(TOPIC_B, 0, NODES[0], NODES, NODES));
         Cluster testCluster = new Cluster("clusterId", asList(NODES[0], 
NODES[1], NODES[2]), allPartitions,
-                Collections.<String>emptySet(), 
Collections.<String>emptySet());
+                Collections.emptySet(), Collections.emptySet());
 
         final Map<Integer, Integer> partitionCount = new HashMap<>();
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
index eaf35f9b8d0..ed372e452f4 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
@@ -294,7 +294,7 @@ public class BufferPoolTest {
         final int poolableSize = 1024;
         final long totalMemory = numThreads / 2 * poolableSize;
         final BufferPool pool = new BufferPool(totalMemory, poolableSize, 
metrics, time, metricGroup);
-        List<StressTestThread> threads = new ArrayList<StressTestThread>();
+        List<StressTestThread> threads = new ArrayList<>();
         for (int i = 0; i < numThreads; i++)
             threads.add(new StressTestThread(pool, iterations));
         for (StressTestThread thread : threads)
@@ -409,12 +409,10 @@ public class BufferPoolTest {
         ByteBuffer buffer = pool.allocate(1, Long.MAX_VALUE);
 
         ExecutorService executor = Executors.newFixedThreadPool(numWorkers);
-        Callable<Void> work = new Callable<Void>() {
-                public Void call() throws Exception {
-                    assertThrows(KafkaException.class, () -> pool.allocate(1, 
Long.MAX_VALUE));
-                    return null;
-                }
-            };
+        Callable<Void> work = () -> {
+            assertThrows(KafkaException.class, () -> pool.allocate(1, 
Long.MAX_VALUE));
+            return null;
+        };
         for (int i = 0; i < numWorkers; ++i) {
             executor.submit(work);
         }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
index e250748643a..c851408f363 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
@@ -45,7 +45,7 @@ public class DefaultPartitionerTest {
         @SuppressWarnings("deprecation")
         final Partitioner partitioner = new DefaultPartitioner();
         final Cluster cluster = new Cluster("clusterId", asList(NODES), 
PARTITIONS,
-            Collections.<String>emptySet(), Collections.<String>emptySet());
+            Collections.emptySet(), Collections.emptySet());
         int partition = partitioner.partition("test",  null, KEY_BYTES, null, 
null, cluster);
         assertEquals(partition, partitioner.partition("test", null, KEY_BYTES, 
null, null, cluster), "Same key should yield same partition");
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
index 6af8289f9ce..03f5e0b6aa8 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
@@ -155,9 +155,9 @@ public class ProducerBatchTest {
             for (ProducerBatch splitProducerBatch : batches) {
                 for (RecordBatch splitBatch : 
splitProducerBatch.records().batches()) {
                     for (Record record : splitBatch) {
-                        assertTrue(record.headers().length == 1, "Header size 
should be 1.");
-                        
assertTrue(record.headers()[0].key().equals("header-key"), "Header key should 
be 'header-key'.");
-                        assertTrue(new 
String(record.headers()[0].value()).equals("header-value"), "Header value 
should be 'header-value'.");
+                        assertEquals(1, record.headers().length, "Header size 
should be 1.");
+                        assertEquals("header-key", record.headers()[0].key(), 
"Header key should be 'header-key'.");
+                        assertEquals("header-value", new 
String(record.headers()[0].value()), "Header value should be 'header-value'.");
                     }
                 }
             }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 9408172004e..3605bc11711 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -120,7 +120,7 @@ public class RecordAccumulatorTest {
         accum.append(topic, partition4, 0L, key, value, Record.EMPTY_HEADERS, 
null, maxBlockTimeMs, false, time.milliseconds(), cluster);
 
         // drain batches from 2 nodes: node1 => tp1, node2 => tp3, because the 
max request size is full after the first batch drained
-        Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new 
HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+        Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new 
HashSet<>(Arrays.asList(node1, node2)), (int) batchSize, 0);
         verifyTopicPartitionInBatches(batches1, tp1, tp3);
 
         // add record for tp1, tp3
@@ -129,11 +129,11 @@ public class RecordAccumulatorTest {
 
         // drain batches from 2 nodes: node1 => tp2, node2 => tp4, because the 
max request size is full after the first batch drained
         // The drain index should start from next topic partition, that is, 
node1 => tp2, node2 => tp4
-        Map<Integer, List<ProducerBatch>> batches2 = accum.drain(cluster, new 
HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+        Map<Integer, List<ProducerBatch>> batches2 = accum.drain(cluster, new 
HashSet<>(Arrays.asList(node1, node2)), (int) batchSize, 0);
         verifyTopicPartitionInBatches(batches2, tp2, tp4);
 
         // make sure in next run, the drain index will start from the beginning
-        Map<Integer, List<ProducerBatch>> batches3 = accum.drain(cluster, new 
HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+        Map<Integer, List<ProducerBatch>> batches3 = accum.drain(cluster, new 
HashSet<>(Arrays.asList(node1, node2)), (int) batchSize, 0);
         verifyTopicPartitionInBatches(batches3, tp1, tp3);
 
         // add record for tp2, tp3, tp4 and mute the tp4
@@ -142,7 +142,7 @@ public class RecordAccumulatorTest {
         accum.append(topic, partition4, 0L, key, value, Record.EMPTY_HEADERS, 
null, maxBlockTimeMs, false, time.milliseconds(), cluster);
         accum.mutePartition(tp4);
         // drain batches from 2 nodes: node1 => tp2, node2 => tp3 (because tp4 
is muted)
-        Map<Integer, List<ProducerBatch>> batches4 = accum.drain(cluster, new 
HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+        Map<Integer, List<ProducerBatch>> batches4 = accum.drain(cluster, new 
HashSet<>(Arrays.asList(node1, node2)), (int) batchSize, 0);
         verifyTopicPartitionInBatches(batches4, tp2, tp3);
 
         // add record for tp1, tp2, tp3, and unmute tp4
@@ -151,14 +151,14 @@ public class RecordAccumulatorTest {
         accum.append(topic, partition3, 0L, key, value, Record.EMPTY_HEADERS, 
null, maxBlockTimeMs, false, time.milliseconds(), cluster);
         accum.unmutePartition(tp4);
         // set maxSize as a max value, so that the all partitions in 2 nodes 
should be drained: node1 => [tp1, tp2], node2 => [tp3, tp4]
-        Map<Integer, List<ProducerBatch>> batches5 = accum.drain(cluster, new 
HashSet<Node>(Arrays.asList(node1, node2)), Integer.MAX_VALUE, 0);
+        Map<Integer, List<ProducerBatch>> batches5 = accum.drain(cluster, new 
HashSet<>(Arrays.asList(node1, node2)), Integer.MAX_VALUE, 0);
         verifyTopicPartitionInBatches(batches5, tp1, tp2, tp3, tp4);
     }
 
     private void verifyTopicPartitionInBatches(Map<Integer, 
List<ProducerBatch>> nodeBatches, TopicPartition... tp) {
-        int allTpBatchCount = 
nodeBatches.values().stream().flatMap(Collection::stream).collect(Collectors.toList()).size();
+        int allTpBatchCount = (int) 
nodeBatches.values().stream().flatMap(Collection::stream).count();
         assertEquals(tp.length, allTpBatchCount);
-        List<TopicPartition> topicPartitionsInBatch = new 
ArrayList<TopicPartition>();
+        List<TopicPartition> topicPartitionsInBatch = new ArrayList<>();
         for (Map.Entry<Integer, List<ProducerBatch>> entry : 
nodeBatches.entrySet()) {
             List<ProducerBatch> tpBatchList = entry.getValue();
             List<TopicPartition> tpList = 
tpBatchList.stream().map(producerBatch -> 
producerBatch.topicPartition).collect(Collectors.toList());
@@ -331,17 +331,15 @@ public class RecordAccumulatorTest {
             1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, 
CompressionType.NONE, 0);
         List<Thread> threads = new ArrayList<>();
         for (int i = 0; i < numThreads; i++) {
-            threads.add(new Thread() {
-                public void run() {
-                    for (int i = 0; i < msgs; i++) {
-                        try {
-                            accum.append(topic, i % numParts, 0L, key, value, 
Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), 
cluster);
-                        } catch (Exception e) {
-                            e.printStackTrace();
-                        }
+            threads.add(new Thread(() -> {
+                for (int j = 0; j < msgs; j++) {
+                    try {
+                        accum.append(topic, j % numParts, 0L, key, value, 
Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), 
cluster);
+                    } catch (Exception e) {
+                        e.printStackTrace();
                     }
                 }
-            });
+            }));
         }
         for (Thread t : threads)
             t.start();
@@ -480,12 +478,10 @@ public class RecordAccumulatorTest {
 
 
     private void delayedInterrupt(final Thread thread, final long delayMs) {
-        Thread t = new Thread() {
-            public void run() {
-                Time.SYSTEM.sleep(delayMs);
-                thread.interrupt();
-            }
-        };
+        Thread t = new Thread(() -> {
+            Time.SYSTEM.sleep(delayMs);
+            thread.interrupt();
+        });
         t.start();
     }
 
@@ -517,7 +513,7 @@ public class RecordAccumulatorTest {
         class TestCallback implements RecordAccumulator.AppendCallbacks {
             @Override
             public void onCompletion(RecordMetadata metadata, Exception 
exception) {
-                assertTrue(exception.getMessage().equals("Producer is closed 
forcefully."));
+                assertEquals("Producer is closed forcefully.", 
exception.getMessage());
                 numExceptionReceivedInCallback.incrementAndGet();
             }
 
@@ -599,7 +595,7 @@ public class RecordAccumulatorTest {
     private void doExpireBatchSingle(int deliveryTimeoutMs) throws 
InterruptedException {
         int lingerMs = 300;
         List<Boolean> muteStates = Arrays.asList(false, true);
-        Set<Node> readyNodes = null;
+        Set<Node> readyNodes;
         List<ProducerBatch> expiredBatches = new ArrayList<>();
         // test case assumes that the records do not fill the batch completely
         int batchSize = 1025;
@@ -854,12 +850,7 @@ public class RecordAccumulatorTest {
 
         byte[] value = new byte[1024];
         final AtomicInteger acked = new AtomicInteger(0);
-        Callback cb = new Callback() {
-            @Override
-            public void onCompletion(RecordMetadata metadata, Exception 
exception) {
-                acked.incrementAndGet();
-            }
-        };
+        Callback cb = (metadata, exception) -> acked.incrementAndGet();
         // Append two messages so the batch is too big.
         Future<RecordMetadata> future1 = batch.tryAppend(now, null, value, 
Record.EMPTY_HEADERS, cb, now);
         Future<RecordMetadata> future2 = batch.tryAppend(now, null, value, 
Record.EMPTY_HEADERS, cb, now);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 3d972b3eb2c..bdbc1bd92e9 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -280,7 +280,6 @@ public class SenderTest {
     /*
      * Send multiple requests. Verify that the client side quota metrics have 
the right values
      */
-    @SuppressWarnings("deprecation")
     @Test
     public void testQuotaMetrics() {
         MockSelector selector = new MockSelector(time);
@@ -640,7 +639,7 @@ public class SenderTest {
      * polls are necessary to send requests.
      */
     @Test
-    public void testInitProducerIdWithMaxInFlightOne() throws Exception {
+    public void testInitProducerIdWithMaxInFlightOne() {
         final long producerId = 123456L;
         createMockClientWithMaxFlightOneMetadataPending();
 
@@ -668,7 +667,7 @@ public class SenderTest {
      * polls are necessary to send requests.
      */
     @Test
-    public void testIdempotentInitProducerIdWithMaxInFlightOne() throws 
Exception {
+    public void testIdempotentInitProducerIdWithMaxInFlightOne() {
         final long producerId = 123456L;
         createMockClientWithMaxFlightOneMetadataPending();
 
@@ -774,7 +773,7 @@ public class SenderTest {
         Future<RecordMetadata> request1 = appendToAccumulator(tp0);
         sender.runOnce();
         String nodeId = client.requests().peek().destination();
-        Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
+        Node node = new Node(Integer.parseInt(nodeId), "localhost", 0);
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
         assertEquals(OptionalInt.empty(), 
transactionManager.lastAckedSequence(tp0));
@@ -824,7 +823,7 @@ public class SenderTest {
         Future<RecordMetadata> request1 = appendToAccumulator(tp0);
         sender.runOnce();
         String nodeId = client.requests().peek().destination();
-        Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
+        Node node = new Node(Integer.parseInt(nodeId), "localhost", 0);
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
         assertEquals(OptionalInt.empty(), 
transactionManager.lastAckedSequence(tp0));
@@ -924,7 +923,7 @@ public class SenderTest {
         Future<RecordMetadata> request1 = appendToAccumulator(tp0);
         sender.runOnce();
         String nodeId = client.requests().peek().destination();
-        Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
+        Node node = new Node(Integer.parseInt(nodeId), "localhost", 0);
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
         assertEquals(OptionalInt.empty(), 
transactionManager.lastAckedSequence(tp0));
@@ -984,7 +983,7 @@ public class SenderTest {
         appendToAccumulator(tp0);
         sender.runOnce();
         String nodeId = client.requests().peek().destination();
-        Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
+        Node node = new Node(Integer.parseInt(nodeId), "localhost", 0);
         assertEquals(1, client.inFlightRequestCount());
 
         // make sure the next sequence number accounts for multi-message 
batches.
@@ -1256,7 +1255,7 @@ public class SenderTest {
         Future<RecordMetadata> request1 = appendToAccumulator(tp0);
         sender.runOnce();
         String nodeId = client.requests().peek().destination();
-        Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
+        Node node = new Node(Integer.parseInt(nodeId), "localhost", 0);
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
         assertEquals(OptionalInt.empty(), 
transactionManager.lastAckedSequence(tp0));
@@ -1338,7 +1337,7 @@ public class SenderTest {
         Future<RecordMetadata> request1 = appendToAccumulator(tp0);
         sender.runOnce();
         String nodeId = client.requests().peek().destination();
-        Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
+        Node node = new Node(Integer.parseInt(nodeId), "localhost", 0);
         assertEquals(1, client.inFlightRequestCount());
 
         // Send second ProduceRequest
@@ -1779,7 +1778,7 @@ public class SenderTest {
         Future<RecordMetadata> request1 = appendToAccumulator(tp0);
         sender.runOnce();
         String nodeId = client.requests().peek().destination();
-        Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
+        Node node = new Node(Integer.parseInt(nodeId), "localhost", 0);
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
         assertEquals(OptionalInt.empty(), 
transactionManager.lastAckedSequence(tp0));
@@ -2315,7 +2314,7 @@ public class SenderTest {
         sender.runOnce();  // connect.
         sender.runOnce();  // send.
         String id = client.requests().peek().destination();
-        Node node = new Node(Integer.valueOf(id), "localhost", 0);
+        Node node = new Node(Integer.parseInt(id), "localhost", 0);
         assertEquals(1, client.inFlightRequestCount());
         assertTrue(client.isReady(node, time.milliseconds()), "Client ready 
status should be true");
         client.disconnect(id);
@@ -2425,7 +2424,7 @@ public class SenderTest {
             assertEquals(2, txnManager.sequenceNumber(tp).longValue(), "The 
next sequence should be 2");
             String id = client.requests().peek().destination();
             assertEquals(ApiKeys.PRODUCE, 
client.requests().peek().requestBuilder().apiKey());
-            Node node = new Node(Integer.valueOf(id), "localhost", 0);
+            Node node = new Node(Integer.parseInt(id), "localhost", 0);
             assertEquals(1, client.inFlightRequestCount());
             assertTrue(client.isReady(node, time.milliseconds()), "Client 
ready status should be true");
 
@@ -2443,7 +2442,7 @@ public class SenderTest {
             assertFalse(f2.isDone(), "The future shouldn't have been done.");
             id = client.requests().peek().destination();
             assertEquals(ApiKeys.PRODUCE, 
client.requests().peek().requestBuilder().apiKey());
-            node = new Node(Integer.valueOf(id), "localhost", 0);
+            node = new Node(Integer.parseInt(id), "localhost", 0);
             assertEquals(1, client.inFlightRequestCount());
             assertTrue(client.isReady(node, time.milliseconds()), "Client 
ready status should be true");
 
@@ -2460,7 +2459,7 @@ public class SenderTest {
             sender.runOnce(); // send the seconcd produce request
             id = client.requests().peek().destination();
             assertEquals(ApiKeys.PRODUCE, 
client.requests().peek().requestBuilder().apiKey());
-            node = new Node(Integer.valueOf(id), "localhost", 0);
+            node = new Node(Integer.parseInt(id), "localhost", 0);
             assertEquals(1, client.inFlightRequestCount());
             assertTrue(client.isReady(node, time.milliseconds()), "Client 
ready status should be true");
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/StickyPartitionCacheTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/StickyPartitionCacheTest.java
index b1af9a3ac90..3371223afe6 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/StickyPartitionCacheTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/StickyPartitionCacheTest.java
@@ -66,7 +66,7 @@ public class StickyPartitionCacheTest {
         int changedPartA3 = stickyPartitionCache.nextPartition(TOPIC_A, 
testCluster, partA);
         assertEquals(changedPartA3, changedPartA2);
 
-        // Check that the we can still use the partitioner when there is only 
one partition
+        // Check that we can still use the partitioner when there is only one 
partition
         int changedPartB = stickyPartitionCache.nextPartition(TOPIC_B, 
testCluster, partB);
         assertEquals(changedPartB, stickyPartitionCache.partition(TOPIC_B, 
testCluster));
     }
diff --git a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java 
b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
index c9c36926c39..f9dc5e75fc5 100644
--- a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
@@ -147,7 +147,7 @@ public class KafkaFutureTest {
     }
 
     @Test
-    public void testCompleteFuturesExceptionally() throws Exception {
+    public void testCompleteFuturesExceptionally() {
         KafkaFutureImpl<Integer> futureFail = new KafkaFutureImpl<>();
         assertTrue(futureFail.completeExceptionally(new RuntimeException("We 
require more vespene gas")));
         assertIsFailed(futureFail);
diff --git 
a/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java 
b/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java
index b91db6f206c..2e81a6d1eaa 100644
--- a/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java
@@ -54,7 +54,7 @@ public class AclOperationTest {
     };
 
     @Test
-    public void testIsUnknown() throws Exception {
+    public void testIsUnknown() {
         for (AclOperationTestInfo info : INFOS) {
             assertEquals(info.unknown, info.operation.isUnknown(),
                 info.operation + " was supposed to have unknown == " + 
info.unknown);
@@ -62,7 +62,7 @@ public class AclOperationTest {
     }
 
     @Test
-    public void testCode() throws Exception {
+    public void testCode() {
         assertEquals(AclOperation.values().length, INFOS.length);
         for (AclOperationTestInfo info : INFOS) {
             assertEquals(info.code, info.operation.code(), info.operation + " 
was supposed to have code == " + info.code);
diff --git 
a/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java 
b/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java
index 8da6d2ac6c0..1f935579e11 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java
@@ -44,14 +44,14 @@ public class AclPermissionTypeTest {
     };
 
     @Test
-    public void testIsUnknown() throws Exception {
+    public void testIsUnknown() {
         for (AclPermissionTypeTestInfo info : INFOS) {
             assertEquals(info.unknown, info.ty.isUnknown(), info.ty + " was 
supposed to have unknown == " + info.unknown);
         }
     }
 
     @Test
-    public void testCode() throws Exception {
+    public void testCode() {
         assertEquals(AclPermissionType.values().length, INFOS.length);
         for (AclPermissionTypeTestInfo info : INFOS) {
             assertEquals(info.code, info.ty.code(), info.ty + " was supposed 
to have code == " + info.code);
@@ -71,7 +71,7 @@ public class AclPermissionTypeTest {
     }
 
     @Test
-    public void testExhaustive() throws Exception {
+    public void testExhaustive() {
         assertEquals(INFOS.length, AclPermissionType.values().length);
         for (int i = 0; i < INFOS.length; i++) {
             assertEquals(INFOS[i].ty, AclPermissionType.values()[i]);
diff --git 
a/clients/src/test/java/org/apache/kafka/common/cache/LRUCacheTest.java 
b/clients/src/test/java/org/apache/kafka/common/cache/LRUCacheTest.java
index d7cd2ecc2d1..1e0d32b137b 100644
--- a/clients/src/test/java/org/apache/kafka/common/cache/LRUCacheTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/cache/LRUCacheTest.java
@@ -19,7 +19,9 @@ package org.apache.kafka.common.cache;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class LRUCacheTest {
 
@@ -49,20 +51,20 @@ public class LRUCacheTest {
         cache.put("e", "f");
         assertEquals(3, cache.size());
 
-        assertEquals(true, cache.remove("a"));
+        assertTrue(cache.remove("a"));
         assertEquals(2, cache.size());
         assertNull(cache.get("a"));
         assertEquals("d", cache.get("c"));
         assertEquals("f", cache.get("e"));
 
-        assertEquals(false, cache.remove("key-does-not-exist"));
+        assertFalse(cache.remove("key-does-not-exist"));
 
-        assertEquals(true, cache.remove("c"));
+        assertTrue(cache.remove("c"));
         assertEquals(1, cache.size());
         assertNull(cache.get("c"));
         assertEquals("f", cache.get("e"));
 
-        assertEquals(true, cache.remove("e"));
+        assertTrue(cache.remove("e"));
         assertEquals(0, cache.size());
         assertNull(cache.get("e"));
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java 
b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
index 76c20df4edf..d2c572f06f4 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
@@ -131,7 +131,7 @@ public class ConfigDefTest {
 
     private void testBadInputs(Type type, Object... values) {
         for (Object value : values) {
-            Map<String, Object> m = new HashMap<String, Object>();
+            Map<String, Object> m = new HashMap<>();
             m.put("name", value);
             ConfigDef def = new ConfigDef().define("name", type, 
Importance.HIGH, "docs");
             try {
@@ -158,7 +158,7 @@ public class ConfigDefTest {
     @Test
     public void testNestedClass() {
         // getName(), not getSimpleName() or getCanonicalName(), is the 
version that should be able to locate the class
-        Map<String, Object> props = Collections.<String, 
Object>singletonMap("name", NestedClass.class.getName());
+        Map<String, Object> props = Collections.singletonMap("name", 
NestedClass.class.getName());
         new ConfigDef().define("name", Type.CLASS, Importance.HIGH, 
"docs").parse(props);
     }
 
@@ -240,10 +240,10 @@ public class ConfigDefTest {
         Map<String, ConfigValue> expected = new HashMap<>();
         String errorMessageB = "Missing required configuration \"b\" which has 
no default value.";
         String errorMessageC = "Missing required configuration \"c\" which has 
no default value.";
-        ConfigValue configA = new ConfigValue("a", 1, 
Collections.<Object>emptyList(), Collections.<String>emptyList());
-        ConfigValue configB = new ConfigValue("b", null, 
Collections.<Object>emptyList(), Arrays.asList(errorMessageB, errorMessageB));
-        ConfigValue configC = new ConfigValue("c", null, 
Collections.<Object>emptyList(), Arrays.asList(errorMessageC));
-        ConfigValue configD = new ConfigValue("d", 10, 
Collections.<Object>emptyList(), Collections.<String>emptyList());
+        ConfigValue configA = new ConfigValue("a", 1, Collections.emptyList(), 
Collections.emptyList());
+        ConfigValue configB = new ConfigValue("b", null, 
Collections.emptyList(), Arrays.asList(errorMessageB, errorMessageB));
+        ConfigValue configC = new ConfigValue("c", null, 
Collections.emptyList(), Arrays.asList(errorMessageC));
+        ConfigValue configD = new ConfigValue("d", 10, 
Collections.emptyList(), Collections.emptyList());
         expected.put("a", configA);
         expected.put("b", configB);
         expected.put("c", configC);
@@ -277,10 +277,10 @@ public class ConfigDefTest {
         String errorMessageB = "Missing required configuration \"b\" which has 
no default value.";
         String errorMessageC = "Missing required configuration \"c\" which has 
no default value.";
 
-        ConfigValue configA = new ConfigValue("a", 1, Arrays.<Object>asList(1, 
2, 3), Collections.<String>emptyList());
-        ConfigValue configB = new ConfigValue("b", null, 
Arrays.<Object>asList(4, 5), Arrays.asList(errorMessageB, errorMessageB));
-        ConfigValue configC = new ConfigValue("c", null, 
Arrays.<Object>asList(4, 5), Arrays.asList(errorMessageC));
-        ConfigValue configD = new ConfigValue("d", 10, 
Arrays.<Object>asList(1, 2, 3), Collections.<String>emptyList());
+        ConfigValue configA = new ConfigValue("a", 1, Arrays.asList(1, 2, 3), 
Collections.emptyList());
+        ConfigValue configB = new ConfigValue("b", null, Arrays.asList(4, 5), 
Arrays.asList(errorMessageB, errorMessageB));
+        ConfigValue configC = new ConfigValue("c", null, Arrays.asList(4, 5), 
Arrays.asList(errorMessageC));
+        ConfigValue configD = new ConfigValue("d", 10, Arrays.asList(1, 2, 3), 
Collections.emptyList());
 
         expected.put("a", configA);
         expected.put("b", configB);
@@ -312,9 +312,9 @@ public class ConfigDefTest {
         String errorMessageC = "Missing required configuration \"c\" which has 
no default value.";
         String errorMessageD = "d is referred in the dependents, but not 
defined.";
 
-        ConfigValue configA = new ConfigValue("a", 1, Arrays.<Object>asList(1, 
2, 3), Collections.<String>emptyList());
-        ConfigValue configB = new ConfigValue("b", null, 
Arrays.<Object>asList(4, 5), Arrays.asList(errorMessageB));
-        ConfigValue configC = new ConfigValue("c", null, 
Arrays.<Object>asList(4, 5), Arrays.asList(errorMessageC));
+        ConfigValue configA = new ConfigValue("a", 1, Arrays.asList(1, 2, 3), 
Collections.emptyList());
+        ConfigValue configB = new ConfigValue("b", null, Arrays.asList(4, 5), 
Arrays.asList(errorMessageB));
+        ConfigValue configC = new ConfigValue("c", null, Arrays.asList(4, 5), 
Arrays.asList(errorMessageC));
         ConfigValue configD = new ConfigValue("d", null, 
Collections.emptyList(), Arrays.asList(errorMessageD));
         configD.visible(false);
 
@@ -359,7 +359,7 @@ public class ConfigDefTest {
     }
 
     @Test
-    public void testCanAddInternalConfig() throws Exception {
+    public void testCanAddInternalConfig() {
         final String configName = "internal.config";
         final ConfigDef configDef = new ConfigDef().defineInternal(configName, 
Type.STRING, "", Importance.LOW);
         final HashMap<String, String> properties = new HashMap<>();
@@ -384,7 +384,7 @@ public class ConfigDefTest {
     }
 
     @Test
-    public void testDynamicUpdateModeInDocs() throws Exception {
+    public void testDynamicUpdateModeInDocs() {
         final ConfigDef configDef = new ConfigDef()
                 .define("my.broker.config", Type.LONG, Importance.HIGH, "docs")
                 .define("my.cluster.config", Type.LONG, Importance.HIGH, 
"docs")
@@ -477,13 +477,13 @@ public class ConfigDefTest {
         ConfigDef def = new ConfigDef().define("name", type, defaultVal, 
validator, Importance.HIGH, "docs");
 
         for (Object value : okValues) {
-            Map<String, Object> m = new HashMap<String, Object>();
+            Map<String, Object> m = new HashMap<>();
             m.put("name", value);
             def.parse(m);
         }
 
         for (Object value : badValues) {
-            Map<String, Object> m = new HashMap<String, Object>();
+            Map<String, Object> m = new HashMap<>();
             m.put("name", value);
             try {
                 def.parse(m);
@@ -538,11 +538,11 @@ public class ConfigDefTest {
     public void toEnrichedRst() {
         final ConfigDef def = new ConfigDef()
                 .define("opt1.of.group1", Type.STRING, "a", 
ValidString.in("a", "b", "c"), Importance.HIGH, "Doc doc.",
-                        "Group One", 0, Width.NONE, "..", 
Collections.<String>emptyList())
+                        "Group One", 0, Width.NONE, "..", 
Collections.emptyList())
                 .define("opt2.of.group1", Type.INT, 
ConfigDef.NO_DEFAULT_VALUE, Importance.MEDIUM, "Doc doc doc.",
                         "Group One", 1, Width.NONE, "..", 
Arrays.asList("some.option1", "some.option2"))
                 .define("opt2.of.group2", Type.BOOLEAN, false, 
Importance.HIGH, "Doc doc doc doc.",
-                        "Group Two", 1, Width.NONE, "..", 
Collections.<String>emptyList())
+                        "Group Two", 1, Width.NONE, "..", 
Collections.emptyList())
                 .define("opt1.of.group2", Type.BOOLEAN, false, 
Importance.HIGH, "Doc doc doc doc doc.",
                         "Group Two", 0, Width.NONE, "..", 
singletonList("some.option"))
                 .define("poor.opt", Type.STRING, "foo", Importance.HIGH, "Doc 
doc doc doc.");
diff --git 
a/clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java
index 93296d9f492..50f8fd647f6 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java
@@ -48,7 +48,7 @@ public class ConfigTransformerTest {
     }
 
     @Test
-    public void testReplaceVariable() throws Exception {
+    public void testReplaceVariable() {
         ConfigTransformerResult result = 
configTransformer.transform(Collections.singletonMap(MY_KEY, 
"${test:testPath:testKey}"));
         Map<String, String> data = result.data();
         Map<String, Long> ttls = result.ttls();
@@ -57,7 +57,7 @@ public class ConfigTransformerTest {
     }
 
     @Test
-    public void testReplaceVariableWithTTL() throws Exception {
+    public void testReplaceVariableWithTTL() {
         ConfigTransformerResult result = 
configTransformer.transform(Collections.singletonMap(MY_KEY, 
"${test:testPath:testKeyWithTTL}"));
         Map<String, String> data = result.data();
         Map<String, Long> ttls = result.ttls();
@@ -66,28 +66,28 @@ public class ConfigTransformerTest {
     }
 
     @Test
-    public void testReplaceMultipleVariablesInValue() throws Exception {
+    public void testReplaceMultipleVariablesInValue() {
         ConfigTransformerResult result = 
configTransformer.transform(Collections.singletonMap(MY_KEY, "hello, 
${test:testPath:testKey}; goodbye, ${test:testPath:testKeyWithTTL}!!!"));
         Map<String, String> data = result.data();
         assertEquals("hello, testResult; goodbye, testResultWithTTL!!!", 
data.get(MY_KEY));
     }
 
     @Test
-    public void testNoReplacement() throws Exception {
+    public void testNoReplacement() {
         ConfigTransformerResult result = 
configTransformer.transform(Collections.singletonMap(MY_KEY, 
"${test:testPath:missingKey}"));
         Map<String, String> data = result.data();
         assertEquals("${test:testPath:missingKey}", data.get(MY_KEY));
     }
 
     @Test
-    public void testSingleLevelOfIndirection() throws Exception {
+    public void testSingleLevelOfIndirection() {
         ConfigTransformerResult result = 
configTransformer.transform(Collections.singletonMap(MY_KEY, 
"${test:testPath:testIndirection}"));
         Map<String, String> data = result.data();
         assertEquals("${test:testPath:testResult}", data.get(MY_KEY));
     }
 
     @Test
-    public void testReplaceVariableNoPath() throws Exception {
+    public void testReplaceVariableNoPath() {
         ConfigTransformerResult result = 
configTransformer.transform(Collections.singletonMap(MY_KEY, 
"${test:testKey}"));
         Map<String, String> data = result.data();
         Map<String, Long> ttls = result.ttls();
@@ -96,14 +96,14 @@ public class ConfigTransformerTest {
     }
 
     @Test
-    public void testReplaceMultipleVariablesWithoutPathInValue() throws 
Exception {
+    public void testReplaceMultipleVariablesWithoutPathInValue() {
         ConfigTransformerResult result = 
configTransformer.transform(Collections.singletonMap(MY_KEY, "first 
${test:testKey}; second ${test:testKey}"));
         Map<String, String> data = result.data();
         assertEquals("first testResultNoPath; second testResultNoPath", 
data.get(MY_KEY));
     }
 
     @Test
-    public void testNullConfigValue() throws Exception {
+    public void testNullConfigValue() {
         ConfigTransformerResult result = 
configTransformer.transform(Collections.singletonMap(MY_KEY, null));
         Map<String, String> data = result.data();
         Map<String, Long> ttls = result.ttls();
diff --git 
a/clients/src/test/java/org/apache/kafka/common/config/provider/DirectoryConfigProviderTest.java
 
b/clients/src/test/java/org/apache/kafka/common/config/provider/DirectoryConfigProviderTest.java
index 7cf5422bf7f..d162e8a6ecf 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/config/provider/DirectoryConfigProviderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/config/provider/DirectoryConfigProviderTest.java
@@ -82,7 +82,7 @@ public class DirectoryConfigProviderTest {
     }
 
     @Test
-    public void testGetAllKeysAtPath() throws IOException {
+    public void testGetAllKeysAtPath() {
         ConfigData configData = provider.get(dir.getAbsolutePath());
         assertEquals(toSet(asList(foo.getName(), bar.getName())), 
configData.data().keySet());
         assertEquals("FOO", configData.data().get(foo.getName()));
diff --git 
a/clients/src/test/java/org/apache/kafka/common/config/provider/FileConfigProviderTest.java
 
b/clients/src/test/java/org/apache/kafka/common/config/provider/FileConfigProviderTest.java
index 431f382a16a..5bbb60c155d 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/config/provider/FileConfigProviderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/config/provider/FileConfigProviderTest.java
@@ -43,7 +43,7 @@ public class FileConfigProviderTest {
     }
 
     @Test
-    public void testGetAllKeysAtPath() throws Exception {
+    public void testGetAllKeysAtPath() {
         ConfigData configData = configProvider.get("dummy");
         Map<String, String> result = new HashMap<>();
         result.put("testKey", "testResult");
@@ -53,7 +53,7 @@ public class FileConfigProviderTest {
     }
 
     @Test
-    public void testGetOneKeyAtPath() throws Exception {
+    public void testGetOneKeyAtPath() {
         ConfigData configData = configProvider.get("dummy", 
Collections.singleton("testKey"));
         Map<String, String> result = new HashMap<>();
         result.put("testKey", "testResult");
@@ -62,28 +62,28 @@ public class FileConfigProviderTest {
     }
 
     @Test
-    public void testEmptyPath() throws Exception {
+    public void testEmptyPath() {
         ConfigData configData = configProvider.get("", 
Collections.singleton("testKey"));
         assertTrue(configData.data().isEmpty());
         assertNull(configData.ttl());
     }
 
     @Test
-    public void testEmptyPathWithKey() throws Exception {
+    public void testEmptyPathWithKey() {
         ConfigData configData = configProvider.get("");
         assertTrue(configData.data().isEmpty());
         assertNull(configData.ttl());
     }
 
     @Test
-    public void testNullPath() throws Exception {
+    public void testNullPath() {
         ConfigData configData = configProvider.get(null);
         assertTrue(configData.data().isEmpty());
         assertNull(configData.ttl());
     }
 
     @Test
-    public void testNullPathWithKey() throws Exception {
+    public void testNullPathWithKey() {
         ConfigData configData = configProvider.get(null, 
Collections.singleton("testKey"));
         assertTrue(configData.data().isEmpty());
         assertNull(configData.ttl());
diff --git 
a/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java
 
b/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java
index f4813fd1486..e6c66fcdba4 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java
@@ -128,7 +128,7 @@ public class RecordHeadersTest {
     }
 
     @Test
-    public void testReadOnly() throws IOException {
+    public void testReadOnly() {
         RecordHeaders headers = new RecordHeaders();
         headers.add(new RecordHeader("key", "value".getBytes()));
         Iterator<Header> headerIteratorBeforeClose = headers.iterator();
@@ -190,7 +190,7 @@ public class RecordHeadersTest {
     }
 
     @Test
-    public void testNew() throws IOException {
+    public void testNew() {
         RecordHeaders headers = new RecordHeaders();
         headers.add(new RecordHeader("key", "value".getBytes()));
         headers.setReadOnly();
diff --git 
a/clients/src/test/java/org/apache/kafka/common/internals/PartitionStatesTest.java
 
b/clients/src/test/java/org/apache/kafka/common/internals/PartitionStatesTest.java
index 52f017514f5..11c58ab734f 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/internals/PartitionStatesTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/internals/PartitionStatesTest.java
@@ -174,7 +174,7 @@ public class PartitionStatesTest {
         LinkedHashMap<TopicPartition, String> map = createMap();
         states.set(map);
         states.clear();
-        checkState(states, new LinkedHashMap<TopicPartition, String>());
+        checkState(states, new LinkedHashMap<>());
     }
 
     @Test
diff --git 
a/clients/src/test/java/org/apache/kafka/common/message/RecordsSerdeTest.java 
b/clients/src/test/java/org/apache/kafka/common/message/RecordsSerdeTest.java
index 739bed9cc46..bbbe1627ae3 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/message/RecordsSerdeTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/message/RecordsSerdeTest.java
@@ -31,7 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 public class RecordsSerdeTest {
 
     @Test
-    public void testSerdeRecords() throws Exception {
+    public void testSerdeRecords() {
         MemoryRecords records = MemoryRecords.withRecords(CompressionType.NONE,
             new SimpleRecord("foo".getBytes()),
             new SimpleRecord("bar".getBytes()));
@@ -44,7 +44,7 @@ public class RecordsSerdeTest {
     }
 
     @Test
-    public void testSerdeNullRecords() throws Exception {
+    public void testSerdeNullRecords() {
         SimpleRecordsMessageData message = new SimpleRecordsMessageData()
             .setTopic("foo");
         assertNull(message.recordSet());
@@ -53,14 +53,14 @@ public class RecordsSerdeTest {
     }
 
     @Test
-    public void testSerdeEmptyRecords() throws Exception {
+    public void testSerdeEmptyRecords() {
         SimpleRecordsMessageData message = new SimpleRecordsMessageData()
             .setTopic("foo")
             .setRecordSet(MemoryRecords.EMPTY);
         testAllRoundTrips(message);
     }
 
-    private void testAllRoundTrips(SimpleRecordsMessageData message) throws 
Exception {
+    private void testAllRoundTrips(SimpleRecordsMessageData message) {
         for (short version = SimpleRecordsMessageData.LOWEST_SUPPORTED_VERSION;
              version <= SimpleRecordsMessageData.HIGHEST_SUPPORTED_VERSION;
              version++) {
diff --git 
a/clients/src/test/java/org/apache/kafka/common/metrics/KafkaMbeanTest.java 
b/clients/src/test/java/org/apache/kafka/common/metrics/KafkaMbeanTest.java
index 5df66dbb22a..c0cd5bab721 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/KafkaMbeanTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/KafkaMbeanTest.java
@@ -116,21 +116,21 @@ public class KafkaMbeanTest {
     }
 
     @Test
-    public void testInvoke() throws Exception {
+    public void testInvoke() {
         RuntimeMBeanException e = assertThrows(RuntimeMBeanException.class,
             () -> mBeanServer.invoke(objectName(countMetricName), "something", 
null, null));
         assertEquals(UnsupportedOperationException.class, 
e.getCause().getClass());
     }
 
     @Test
-    public void testSetAttribute() throws Exception {
+    public void testSetAttribute() {
         RuntimeMBeanException e = assertThrows(RuntimeMBeanException.class,
             () -> mBeanServer.setAttribute(objectName(countMetricName), new 
Attribute("anything", 1)));
         assertEquals(UnsupportedOperationException.class, 
e.getCause().getClass());
     }
 
     @Test
-    public void testSetAttributes() throws Exception {
+    public void testSetAttributes() {
         RuntimeMBeanException e = assertThrows(RuntimeMBeanException.class,
             () -> mBeanServer.setAttributes(objectName(countMetricName), new 
AttributeList(1)));
         assertEquals(UnsupportedOperationException.class, 
e.getCause().getClass());
diff --git 
a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java 
b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index bc1fc5d9e56..b80fafe0dfc 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -20,10 +20,10 @@ import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 import java.util.ArrayList;
@@ -92,7 +92,7 @@ public class MetricsTest {
     @Test
     public void testMetricName() {
         MetricName n1 = metrics.metricName("name", "group", "description", 
"key1", "value1", "key2", "value2");
-        Map<String, String> tags = new HashMap<String, String>();
+        Map<String, String> tags = new HashMap<>();
         tags.put("key1", "value1");
         tags.put("key2", "value2");
         MetricName n2 = metrics.metricName("name", "group", "description", 
tags);
@@ -107,7 +107,7 @@ public class MetricsTest {
     }
 
     @Test
-    public void testSimpleStats() throws Exception {
+    public void testSimpleStats() {
         verifyStats(m -> (double) m.metricValue());
     }
 
@@ -451,11 +451,11 @@ public class MetricsTest {
         final Quota quota1 = Quota.upperBound(10.5);
         final Quota quota2 = Quota.lowerBound(10.5);
 
-        assertFalse(quota1.equals(quota2), "Quota with different upper values 
shouldn't be equal");
+        assertNotEquals(quota1, quota2, "Quota with different upper values 
shouldn't be equal");
 
         final Quota quota3 = Quota.lowerBound(10.5);
 
-        assertTrue(quota2.equals(quota3), "Quota with same upper and bound 
values should be equal");
+        assertEquals(quota2, quota3, "Quota with same upper and bound values 
should be equal");
     }
 
     @Test
@@ -582,7 +582,7 @@ public class MetricsTest {
     }
 
     @Test
-    public void testRateWindowing() throws Exception {
+    public void testRateWindowing() {
         // Use the default time window. Set 3 samples
         MetricConfig cfg = new MetricConfig().samples(3);
         Sensor s = metrics.sensor("test.sensor", cfg);
@@ -700,7 +700,7 @@ public class MetricsTest {
     @Test
     public void testMetricInstances() {
         MetricName n1 = metrics.metricInstance(SampleMetrics.METRIC1, "key1", 
"value1", "key2", "value2");
-        Map<String, String> tags = new HashMap<String, String>();
+        Map<String, String> tags = new HashMap<>();
         tags.put("key1", "value1");
         tags.put("key2", "value2");
         MetricName n2 = metrics.metricInstance(SampleMetrics.METRIC2, tags);
@@ -752,7 +752,7 @@ public class MetricsTest {
      * in errors or deadlock.
      */
     @Test
-    public void testConcurrentReadUpdate() throws Exception {
+    public void testConcurrentReadUpdate() {
         final Random random = new Random();
         final Deque<Sensor> sensors = new ConcurrentLinkedDeque<>();
         metrics = new Metrics(new MockTime(10));
@@ -784,7 +784,7 @@ public class MetricsTest {
      * that synchronizes on every reporter method doesn't result in errors or 
deadlock.
      */
     @Test
-    public void testConcurrentReadUpdateReport() throws Exception {
+    public void testConcurrentReadUpdateReport() {
 
         class LockingReporter implements MetricsReporter {
             Map<MetricName, KafkaMetric> activeMetrics = new HashMap<>();
diff --git 
a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java 
b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
index 77176e1f5c5..9254616528f 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
@@ -33,7 +33,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -197,19 +196,16 @@ public class SensorTest {
         try {
             for (int i = 0; i != threadCount; ++i) {
                 final int index = i;
-                workers.add(service.submit(new Callable<Throwable>() {
-                    @Override
-                    public Throwable call() {
-                        try {
-                            assertTrue(latch.await(5, TimeUnit.SECONDS));
-                            for (int j = 0; j != 20; ++j) {
-                                sensor.record(j * index, 
System.currentTimeMillis() + j, false);
-                                sensor.checkQuotas();
-                            }
-                            return null;
-                        } catch (Throwable e) {
-                            return e;
+                workers.add(service.submit(() -> {
+                    try {
+                        assertTrue(latch.await(5, TimeUnit.SECONDS));
+                        for (int j = 0; j != 20; ++j) {
+                            sensor.record(j * index, 
System.currentTimeMillis() + j, false);
+                            sensor.checkQuotas();
                         }
+                        return null;
+                    } catch (Throwable e) {
+                        return e;
                     }
                 }));
             }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
 
b/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
index b44306b4b83..344ade22a66 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
@@ -149,7 +149,7 @@ public class FrequenciesTest {
     }
 
     protected MetricName name(String metricName) {
-        return new MetricName(metricName, "group-id", "desc", 
Collections.<String, String>emptyMap());
+        return new MetricName(metricName, "group-id", "desc", 
Collections.emptyMap());
     }
 
     protected Frequency freq(String name, double value) {
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java 
b/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
index d0cc05942e3..f72964f12cc 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
@@ -62,8 +62,8 @@ class EchoServer extends Thread {
                 throw new IllegalArgumentException("Unsupported 
securityProtocol " + securityProtocol);
         }
         this.port = this.serverSocket.getLocalPort();
-        this.threads = Collections.synchronizedList(new ArrayList<Thread>());
-        this.sockets = Collections.synchronizedList(new ArrayList<Socket>());
+        this.threads = Collections.synchronizedList(new ArrayList<>());
+        this.sockets = Collections.synchronizedList(new ArrayList<>());
     }
 
     public void renegotiate() {
@@ -80,35 +80,32 @@ class EchoServer extends Thread {
                         break;
                     }
                     sockets.add(socket);
-                    Thread thread = new Thread() {
-                        @Override
-                        public void run() {
-                            try {
-                                DataInputStream input = new 
DataInputStream(socket.getInputStream());
-                                DataOutputStream output = new 
DataOutputStream(socket.getOutputStream());
-                                while (socket.isConnected() && 
!socket.isClosed()) {
-                                    int size = input.readInt();
-                                    if (renegotiate.get()) {
-                                        renegotiate.set(false);
-                                        ((SSLSocket) socket).startHandshake();
-                                    }
-                                    byte[] bytes = new byte[size];
-                                    input.readFully(bytes);
-                                    output.writeInt(size);
-                                    output.write(bytes);
-                                    output.flush();
+                    Thread thread = new Thread(() -> {
+                        try {
+                            DataInputStream input = new 
DataInputStream(socket.getInputStream());
+                            DataOutputStream output = new 
DataOutputStream(socket.getOutputStream());
+                            while (socket.isConnected() && !socket.isClosed()) 
{
+                                int size = input.readInt();
+                                if (renegotiate.get()) {
+                                    renegotiate.set(false);
+                                    ((SSLSocket) socket).startHandshake();
                                 }
+                                byte[] bytes = new byte[size];
+                                input.readFully(bytes);
+                                output.writeInt(size);
+                                output.write(bytes);
+                                output.flush();
+                            }
+                        } catch (IOException e) {
+                            // ignore
+                        } finally {
+                            try {
+                                socket.close();
                             } catch (IOException e) {
                                 // ignore
-                            } finally {
-                                try {
-                                    socket.close();
-                                } catch (IOException e) {
-                                    // ignore
-                                }
                             }
                         }
-                    };
+                    });
                     thread.start();
                     threads.add(thread);
                 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java 
b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index 15bfa559dce..0cd563355e5 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -111,8 +111,8 @@ public class NioEchoServer extends Thread {
         serverSocketChannel.configureBlocking(false);
         serverSocketChannel.socket().bind(new InetSocketAddress(serverHost, 
0));
         this.port = serverSocketChannel.socket().getLocalPort();
-        this.socketChannels = Collections.synchronizedList(new 
ArrayList<SocketChannel>());
-        this.newChannels = Collections.synchronizedList(new 
ArrayList<SocketChannel>());
+        this.socketChannels = Collections.synchronizedList(new ArrayList<>());
+        this.newChannels = Collections.synchronizedList(new ArrayList<>());
         this.credentialCache = credentialCache;
         this.tokenCache = tokenCache;
         if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || 
securityProtocol == SecurityProtocol.SASL_SSL) {
@@ -191,7 +191,7 @@ public class NioEchoServer extends Thread {
             long thisMaxWaitMs = maxAggregateWaitMs - currentElapsedMs;
             String metricName = namePrefix + metricType.metricNameSuffix();
             if (expectedValue == 0.0) {
-                Double expected = expectedValue;
+                double expected = expectedValue;
                 if (metricType == MetricType.MAX || metricType == 
MetricType.AVG)
                     expected = Double.NaN;
 
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/PlaintextSender.java 
b/clients/src/test/java/org/apache/kafka/common/network/PlaintextSender.java
index 3338d039a07..88333e02a26 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/PlaintextSender.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/PlaintextSender.java
@@ -26,16 +26,13 @@ import java.net.Socket;
 public class PlaintextSender extends Thread {
 
     public PlaintextSender(final InetSocketAddress serverAddress, final byte[] 
payload) {
-        super(new Runnable() {
-            @Override
-            public void run() {
-                try (Socket connection = new 
Socket(serverAddress.getAddress(), serverAddress.getPort());
-                     OutputStream os = connection.getOutputStream()) {
-                    os.write(payload);
-                    os.flush();
-                } catch (Exception e) {
-                    e.printStackTrace(System.err);
-                }
+        super(() -> {
+            try (Socket connection = new Socket(serverAddress.getAddress(), 
serverAddress.getPort());
+                 OutputStream os = connection.getOutputStream()) {
+                os.write(payload);
+                os.flush();
+            } catch (Exception e) {
+                e.printStackTrace(System.err);
             }
         });
         setDaemon(true);
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 09f14531def..618e86e1dae 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -59,6 +58,7 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static java.util.Arrays.asList;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -129,15 +129,12 @@ public class SelectorTest {
 
         // disconnect
         this.server.closeConnections();
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                try {
-                    selector.poll(1000L);
-                    return selector.disconnected().containsKey(node);
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
+        waitForCondition(() -> {
+            try {
+                selector.poll(1000L);
+                return selector.disconnected().containsKey(node);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
             }
         }, 5000, "Failed to observe disconnected node in disconnected set");
 
@@ -259,7 +256,7 @@ public class SelectorTest {
         if (channelBuilder instanceof PlaintextChannelBuilder) {
             assertEquals(0, cipherMetrics(metrics).size());
         } else {
-            TestUtils.waitForCondition(() -> cipherMetrics(metrics).size() == 
1,
+            waitForCondition(() -> cipherMetrics(metrics).size() == 1,
                 "Waiting for cipher metrics to be created.");
             assertEquals(Integer.valueOf(5), 
cipherMetrics(metrics).get(0).metricValue());
         }
@@ -300,7 +297,7 @@ public class SelectorTest {
         KafkaMetric outgoingByteTotal = 
findUntaggedMetricByName("outgoing-byte-total");
         KafkaMetric incomingByteTotal = 
findUntaggedMetricByName("incoming-byte-total");
 
-        TestUtils.waitForCondition(() -> {
+        waitForCondition(() -> {
             long bytesSent = send.size() - send.remaining();
             assertEquals(bytesSent, ((Double) 
outgoingByteTotal.metricValue()).longValue());
 
@@ -631,7 +628,7 @@ public class SelectorTest {
             int receiveCount = 0;
             KafkaChannel channel = createConnectionWithPendingReceives(i);
             // Poll until one or more receives complete and then close the 
server-side connection
-            TestUtils.waitForCondition(() -> {
+            waitForCondition(() -> {
                 selector.poll(1000);
                 return selector.completedReceives().size() > 0;
             }, 5000, "Receive not completed");
@@ -662,7 +659,7 @@ public class SelectorTest {
         selector.poll(1000); // Wait until some data arrives, but not a 
completed receive
         assertEquals(0, selector.completedReceives().size());
         server.closeConnections();
-        TestUtils.waitForCondition(() -> {
+        waitForCondition(() -> {
             try {
                 selector.poll(100);
                 return !selector.disconnected().isEmpty();
@@ -681,7 +678,7 @@ public class SelectorTest {
         selector.close();
         MemoryPool pool = new SimpleMemoryPool(900, 900, false, null);
         selector = new Selector(NetworkReceive.UNLIMITED, 
CONNECTION_MAX_IDLE_MS, metrics, time, "MetricGroup",
-            new HashMap<String, String>(), true, false, channelBuilder, pool, 
new LogContext());
+            new HashMap<>(), true, false, channelBuilder, pool, new 
LogContext());
 
         try (ServerSocketChannel ss = ServerSocketChannel.open()) {
             ss.bind(new InetSocketAddress(0));
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java 
b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
index 41f0096b120..4f617961a73 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -257,7 +257,7 @@ public abstract class SslSelectorTest extends SelectorTest {
         channelBuilder = new SslChannelBuilder(Mode.SERVER, null, false, new 
LogContext());
         channelBuilder.configure(sslServerConfigs);
         selector = new Selector(NetworkReceive.UNLIMITED, 5000, metrics, time, 
"MetricGroup",
-                new HashMap<String, String>(), true, false, channelBuilder, 
pool, new LogContext());
+                new HashMap<>(), true, false, channelBuilder, pool, new 
LogContext());
 
         try (ServerSocketChannel ss = ServerSocketChannel.open()) {
             ss.bind(new InetSocketAddress(0));
@@ -346,7 +346,7 @@ public abstract class SslSelectorTest extends SelectorTest {
 
         @Override
         protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, 
String id, SelectionKey key,
-                                                        
ChannelMetadataRegistry metadataRegistry) throws IOException {
+                                                        
ChannelMetadataRegistry metadataRegistry) {
             SocketChannel socketChannel = (SocketChannel) key.channel();
             SSLEngine sslEngine = 
sslFactory.createSslEngine(socketChannel.socket());
             TestSslTransportLayer transportLayer = new 
TestSslTransportLayer(id, key, sslEngine, metadataRegistry);
@@ -380,7 +380,7 @@ public abstract class SslSelectorTest extends SelectorTest {
 
             // Leave one byte in network read buffer so that some buffered 
bytes are present,
             // but not enough to make progress on a read.
-            void truncateReadBuffer() throws Exception {
+            void truncateReadBuffer() {
                 netReadBuffer().position(1);
                 appReadBuffer().position(0);
                 muteSocket = true;
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SslSender.java 
b/clients/src/test/java/org/apache/kafka/common/network/SslSender.java
index 22196dde13a..928fe0d26d3 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSender.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSender.java
@@ -22,7 +22,6 @@ import javax.net.ssl.TrustManager;
 import javax.net.ssl.X509TrustManager;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
-import java.security.cert.CertificateException;
 import java.security.cert.X509Certificate;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -68,12 +67,12 @@ public class SslSender extends Thread {
      */
     private static class NaiveTrustManager implements X509TrustManager {
         @Override
-        public void checkClientTrusted(X509Certificate[] x509Certificates, 
String s) throws CertificateException {
+        public void checkClientTrusted(X509Certificate[] x509Certificates, 
String s) {
             //nop
         }
 
         @Override
-        public void checkServerTrusted(X509Certificate[] x509Certificates, 
String s) throws CertificateException {
+        public void checkServerTrusted(X509Certificate[] x509Certificates, 
String s) {
             //nop
         }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java 
b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
index 67212165fc3..f3868407177 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
@@ -205,7 +205,7 @@ public class DefaultRecordTest {
     }
 
     @Test
-    public void testInvalidValueSizePartial() throws IOException {
+    public void testInvalidValueSizePartial() {
         byte attributes = 0;
         long timestampDelta = 2;
         int offsetDelta = 1;
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java 
b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 2e9ff330bba..2fa978e10fa 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -431,7 +431,7 @@ public class FileRecordsTest {
     }
 
     @Test
-    public void testFormatConversionWithNoMessages() throws IOException {
+    public void testFormatConversionWithNoMessages() {
         TopicPartition tp = new TopicPartition("topic-1", 0);
         LazyDownConversionRecords lazyRecords = new 
LazyDownConversionRecords(tp, MemoryRecords.EMPTY, RecordBatch.MAGIC_VALUE_V0,
             0, Time.SYSTEM);
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java 
b/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java
index 05c1bb84a20..e4f70222d13 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java
@@ -65,7 +65,7 @@ public class JaasContextTest {
 
     @Test
     public void testConfigNoOptions() throws Exception {
-        checkConfiguration("test.testConfigNoOptions", 
LoginModuleControlFlag.REQUIRED, new HashMap<String, Object>());
+        checkConfiguration("test.testConfigNoOptions", 
LoginModuleControlFlag.REQUIRED, new HashMap<>());
     }
 
     @Test
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerSaslClienCallbackHandlerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerSaslClientCallbackHandlerTest.java
similarity index 98%
rename from 
clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerSaslClienCallbackHandlerTest.java
rename to 
clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerSaslClientCallbackHandlerTest.java
index b13c0f823c8..ae3ef55f21a 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerSaslClienCallbackHandlerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerSaslClientCallbackHandlerTest.java
@@ -32,7 +32,7 @@ import javax.security.auth.callback.Callback;
 import 
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler;
 import org.junit.jupiter.api.Test;
 
-public class OAuthBearerSaslClienCallbackHandlerTest {
+public class OAuthBearerSaslClientCallbackHandlerTest {
     private static OAuthBearerToken createTokenWithLifetimeMillis(final long 
lifetimeMillis) {
         return new OAuthBearerToken() {
             @Override
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerClientInitialResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerClientInitialResponseTest.java
index 2bf3f841b11..e7204df660a 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerClientInitialResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerClientInitialResponseTest.java
@@ -57,7 +57,7 @@ public class OAuthBearerClientInitialResponseTest {
     }
 
     @Test
-    public void testThrowsSaslExceptionOnInvalidExtensionKey() throws 
Exception {
+    public void testThrowsSaslExceptionOnInvalidExtensionKey() {
         Map<String, String> extensions = new HashMap<>();
         extensions.put("19", "42"); // keys can only be a-z
         assertThrows(SaslException.class, () -> new 
OAuthBearerClientInitialResponse("123.345.567", new 
SaslExtensions(extensions)));
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java
index 089c9084975..39bd7b89f15 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java
@@ -19,7 +19,6 @@ package 
org.apache.kafka.common.security.oauthbearer.internals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertNull;
 
 import java.io.IOException;
@@ -30,7 +29,6 @@ import java.util.Map;
 
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.auth.login.LoginException;
 
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.config.types.Password;
@@ -103,7 +101,7 @@ public class OAuthBearerSaslServerTest {
         byte[] nextChallenge = saslServer
                 .evaluateResponse(clientInitialResponse(null));
         // also asserts that no authentication error is thrown if 
OAuthBearerExtensionsValidatorCallback is not supported
-        assertTrue(nextChallenge.length == 0, "Next challenge is not empty");
+        assertEquals(0, nextChallenge.length, "Next challenge is not empty");
     }
 
     @Test
@@ -127,7 +125,7 @@ public class OAuthBearerSaslServerTest {
         byte[] nextChallenge = saslServer
                 .evaluateResponse(clientInitialResponse(null, false, 
customExtensions));
 
-        assertTrue(nextChallenge.length == 0, "Next challenge is not empty");
+        assertEquals(0, nextChallenge.length, "Next challenge is not empty");
         assertEquals("value1", saslServer.getNegotiatedProperty("firstKey"));
         assertEquals("value2", saslServer.getNegotiatedProperty("secondKey"));
     }
@@ -147,7 +145,7 @@ public class OAuthBearerSaslServerTest {
         byte[] nextChallenge = saslServer
                 .evaluateResponse(clientInitialResponse(null, false, 
customExtensions));
 
-        assertTrue(nextChallenge.length == 0, "Next challenge is not empty");
+        assertEquals(0, nextChallenge.length, "Next challenge is not empty");
         assertNull(saslServer.getNegotiatedProperty("thirdKey"), "Extensions 
not recognized by the server must be ignored");
     }
 
@@ -186,7 +184,7 @@ public class OAuthBearerSaslServerTest {
     public void authorizatonIdEqualsAuthenticationId() throws Exception {
         byte[] nextChallenge = saslServer
                 .evaluateResponse(clientInitialResponse(USER));
-        assertTrue(nextChallenge.length == 0, "Next challenge is not empty");
+        assertEquals(0, nextChallenge.length, "Next challenge is not empty");
     }
 
     @Test
@@ -203,7 +201,7 @@ public class OAuthBearerSaslServerTest {
     }
 
     private byte[] clientInitialResponse(String authorizationId)
-            throws OAuthBearerConfigException, IOException, 
UnsupportedCallbackException, LoginException {
+            throws OAuthBearerConfigException, IOException, 
UnsupportedCallbackException {
         return clientInitialResponse(authorizationId, false);
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java
index 85f6622f090..cd946fa0eb0 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java
@@ -753,7 +753,7 @@ public class ExpiringCredentialRefreshingLoginTest {
             int numWaiters) {
         List<KafkaFutureImpl<Long>> retvalWaiters = new 
ArrayList<>(numWaiters);
         for (int i = 1; i <= numWaiters; ++i) {
-            KafkaFutureImpl<Long> waiter = new KafkaFutureImpl<Long>();
+            KafkaFutureImpl<Long> waiter = new KafkaFutureImpl<>();
             mockScheduler.addWaiter(i * refreshEveryMillis, waiter);
             retvalWaiters.add(waiter);
         }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetrieverTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetrieverTest.java
index 5086a1dab29..59f4319e106 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetrieverTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetrieverTest.java
@@ -172,17 +172,17 @@ public class HttpAccessTokenRetrieverTest extends 
OAuthBearerTest {
     }
 
     @Test
-    public void testFormatAuthorizationHeader() throws IOException {
+    public void testFormatAuthorizationHeader() {
         assertAuthorizationHeader("id", "secret");
     }
 
     @Test
-    public void testFormatAuthorizationHeaderEncoding() throws IOException {
+    public void testFormatAuthorizationHeaderEncoding() {
         // See KAFKA-14496
         assertAuthorizationHeader("SOME_RANDOM_LONG_USER_01234", 
"9Q|0`8i~ute-n9ksjLWb\\50\"AX@UUED5E");
     }
 
-    private void assertAuthorizationHeader(String clientId, String 
clientSecret) throws IOException {
+    private void assertAuthorizationHeader(String clientId, String 
clientSecret) {
         String expected = "Basic " + 
Base64.getEncoder().encodeToString(Utils.utf8(clientId + ":" + clientSecret));
         String actual = 
HttpAccessTokenRetriever.formatAuthorizationHeader(clientId, clientSecret);
         assertEquals(expected, actual, String.format("Expected the HTTP 
Authorization header generated for client ID \"%s\" and client secret \"%s\" to 
match", clientId, clientSecret));
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RetryTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RetryTest.java
index 803cf10741c..b8d2b019e1f 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RetryTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RetryTest.java
@@ -115,7 +115,7 @@ public class RetryTest extends OAuthBearerTest {
     }
 
     @Test
-    public void testUseMaxTimeout() throws IOException {
+    public void testUseMaxTimeout() {
         Exception[] attempts = new Exception[] {
             new IOException("pretend connect error"),
             new IOException("pretend timeout error"),
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandlerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandlerTest.java
index bc1b6600bc1..0153abcc838 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandlerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandlerTest.java
@@ -20,7 +20,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Base64;
@@ -30,7 +29,6 @@ import java.util.HashMap;
 import java.util.Map;
 
 import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.UnsupportedCallbackException;
 
 import org.apache.kafka.common.security.authenticator.TestJaasConfig;
 import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
@@ -85,7 +83,7 @@ public class OAuthBearerUnsecuredValidatorCallbackHandlerTest 
{
     }
 
     @Test
-    public void badOrMissingPrincipal() throws IOException, 
UnsupportedCallbackException {
+    public void badOrMissingPrincipal() {
         for (boolean exists : new boolean[] {true, false}) {
             String claimsJson = "{" + EXPIRATION_TIME_CLAIM_TEXT + (exists ? 
comma(BAD_PRINCIPAL_CLAIM_TEXT) : "")
                     + "}";
@@ -94,7 +92,7 @@ public class OAuthBearerUnsecuredValidatorCallbackHandlerTest 
{
     }
 
     @Test
-    public void tooEarlyExpirationTime() throws IOException, 
UnsupportedCallbackException {
+    public void tooEarlyExpirationTime() {
         String claimsJson = "{" + PRINCIPAL_CLAIM_TEXT + 
comma(ISSUED_AT_CLAIM_TEXT)
                 + comma(TOO_EARLY_EXPIRATION_TIME_CLAIM_TEXT) + "}";
         confirmFailsValidation(UNSECURED_JWT_HEADER_JSON, claimsJson, 
MODULE_OPTIONS_MAP_NO_SCOPE_REQUIRED);
@@ -110,15 +108,14 @@ public class 
OAuthBearerUnsecuredValidatorCallbackHandlerTest {
     }
 
     @Test
-    public void missingRequiredScope() throws IOException, 
UnsupportedCallbackException {
+    public void missingRequiredScope() {
         String claimsJson = "{" + SUB_CLAIM_TEXT + 
comma(EXPIRATION_TIME_CLAIM_TEXT) + comma(SCOPE_CLAIM_TEXT) + "}";
         confirmFailsValidation(UNSECURED_JWT_HEADER_JSON, claimsJson, 
MODULE_OPTIONS_MAP_REQUIRE_ADDITIONAL_SCOPE,
                 "[scope1, scope2]");
     }
 
     private static void confirmFailsValidation(String headerJson, String 
claimsJson,
-            Map<String, String> moduleOptionsMap) throws 
OAuthBearerConfigException, OAuthBearerIllegalTokenException,
-            IOException, UnsupportedCallbackException {
+            Map<String, String> moduleOptionsMap) throws 
OAuthBearerConfigException, OAuthBearerIllegalTokenException {
         confirmFailsValidation(headerJson, claimsJson, moduleOptionsMap, null);
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/plain/internals/PlainSaslServerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/plain/internals/PlainSaslServerTest.java
index 77882b6f3fb..6c71f51b40e 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/plain/internals/PlainSaslServerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/plain/internals/PlainSaslServerTest.java
@@ -54,13 +54,13 @@ public class PlainSaslServerTest {
     }
 
     @Test
-    public void noAuthorizationIdSpecified() throws Exception {
+    public void noAuthorizationIdSpecified() {
         byte[] nextChallenge = saslServer.evaluateResponse(saslMessage("", 
USER_A, PASSWORD_A));
         assertEquals(0, nextChallenge.length);
     }
 
     @Test
-    public void authorizatonIdEqualsAuthenticationId() throws Exception {
+    public void authorizatonIdEqualsAuthenticationId() {
         byte[] nextChallenge = saslServer.evaluateResponse(saslMessage(USER_A, 
USER_A, PASSWORD_A));
         assertEquals(0, nextChallenge.length);
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/scram/internals/ScramSaslServerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/scram/internals/ScramSaslServerTest.java
index 121ac5951d7..f291e2d7d9c 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/scram/internals/ScramSaslServerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/scram/internals/ScramSaslServerTest.java
@@ -48,7 +48,7 @@ public class ScramSaslServerTest {
         credentialCache.put(USER_A, formatter.generateCredential("passwordA", 
4096));
         credentialCache.put(USER_B, formatter.generateCredential("passwordB", 
4096));
         ScramServerCallbackHandler callbackHandler = new 
ScramServerCallbackHandler(credentialCache, new 
DelegationTokenCache(ScramMechanism.mechanismNames()));
-        saslServer = new ScramSaslServer(mechanism, new HashMap<String, 
Object>(), callbackHandler);
+        saslServer = new ScramSaslServer(mechanism, new HashMap<>(), 
callbackHandler);
     }
 
     @Test
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
index 742631b0231..21dcd6e4b0f 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
@@ -77,7 +77,7 @@ public abstract class SslFactoryTest {
         SSLEngine engine = sslFactory.createSslEngine("localhost", 0);
         assertNotNull(engine);
         assertEquals(Utils.mkSet(tlsProtocol), 
Utils.mkSet(engine.getEnabledProtocols()));
-        assertEquals(false, engine.getUseClientMode());
+        assertFalse(engine.getUseClientMode());
     }
 
     @Test
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestTrustManagerFactory.java
 
b/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestTrustManagerFactory.java
index 4115a5f8cbd..8e5e584f97c 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestTrustManagerFactory.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestTrustManagerFactory.java
@@ -23,7 +23,6 @@ import javax.net.ssl.TrustManagerFactorySpi;
 import javax.net.ssl.X509ExtendedTrustManager;
 import java.net.Socket;
 import java.security.KeyStore;
-import java.security.cert.CertificateException;
 import java.security.cert.X509Certificate;
 
 public class TestTrustManagerFactory extends TrustManagerFactorySpi {
@@ -49,12 +48,12 @@ public class TestTrustManagerFactory extends 
TrustManagerFactorySpi {
         public static final String ALIAS = "TestAlias";
 
         @Override
-        public void checkClientTrusted(X509Certificate[] x509Certificates, 
String s) throws CertificateException {
+        public void checkClientTrusted(X509Certificate[] x509Certificates, 
String s) {
 
         }
 
         @Override
-        public void checkServerTrusted(X509Certificate[] x509Certificates, 
String s) throws CertificateException {
+        public void checkServerTrusted(X509Certificate[] x509Certificates, 
String s) {
 
         }
 
@@ -64,22 +63,22 @@ public class TestTrustManagerFactory extends 
TrustManagerFactorySpi {
         }
 
         @Override
-        public void checkClientTrusted(X509Certificate[] x509Certificates, 
String s, Socket socket) throws CertificateException {
+        public void checkClientTrusted(X509Certificate[] x509Certificates, 
String s, Socket socket) {
 
         }
 
         @Override
-        public void checkServerTrusted(X509Certificate[] x509Certificates, 
String s, Socket socket) throws CertificateException {
+        public void checkServerTrusted(X509Certificate[] x509Certificates, 
String s, Socket socket) {
 
         }
 
         @Override
-        public void checkClientTrusted(X509Certificate[] x509Certificates, 
String s, SSLEngine sslEngine) throws CertificateException {
+        public void checkClientTrusted(X509Certificate[] x509Certificates, 
String s, SSLEngine sslEngine) {
 
         }
 
         @Override
-        public void checkServerTrusted(X509Certificate[] x509Certificates, 
String s, SSLEngine sslEngine) throws CertificateException {
+        public void checkServerTrusted(X509Certificate[] x509Certificates, 
String s, SSLEngine sslEngine) {
 
         }
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
 
b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
index a0b67a03d42..844ce500667 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
@@ -360,12 +360,12 @@ public class SerializationTest {
     }
 
     private Serde<String> getStringSerde(String encoder) {
-        Map<String, Object> serializerConfigs = new HashMap<String, Object>();
+        Map<String, Object> serializerConfigs = new HashMap<>();
         serializerConfigs.put("key.serializer.encoding", encoder);
         Serializer<String> serializer = Serdes.String().serializer();
         serializer.configure(serializerConfigs, true);
 
-        Map<String, Object> deserializerConfigs = new HashMap<String, 
Object>();
+        Map<String, Object> deserializerConfigs = new HashMap<>();
         deserializerConfigs.put("key.deserializer.encoding", encoder);
         Deserializer<String> deserializer = Serdes.String().deserializer();
         deserializer.configure(deserializerConfigs, true);
diff --git 
a/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java 
b/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java
index e100d24ba42..939b2f6daa2 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java
@@ -34,10 +34,10 @@ public class AbstractIteratorTest {
     @Test
     public void testIterator() {
         int max = 10;
-        List<Integer> l = new ArrayList<Integer>();
+        List<Integer> l = new ArrayList<>();
         for (int i = 0; i < max; i++)
             l.add(i);
-        ListIterator<Integer> iter = new ListIterator<Integer>(l);
+        ListIterator<Integer> iter = new ListIterator<>(l);
         for (int i = 0; i < max; i++) {
             Integer value = i;
             assertEquals(value, iter.peek());
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ExitTest.java 
b/clients/src/test/java/org/apache/kafka/common/utils/ExitTest.java
index 8adcca3084f..b345c5c6fbe 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/ExitTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/ExitTest.java
@@ -34,7 +34,7 @@ public class ExitTest {
         });
         try {
             int statusCode = 0;
-            String message = "mesaage";
+            String message = "message";
             Exit.halt(statusCode);
             Exit.halt(statusCode, message);
             assertEquals(Arrays.asList(statusCode, null, statusCode, message), 
list);
@@ -52,7 +52,7 @@ public class ExitTest {
         });
         try {
             int statusCode = 0;
-            String message = "mesaage";
+            String message = "message";
             Exit.exit(statusCode);
             Exit.exit(statusCode, message);
             assertEquals(Arrays.asList(statusCode, null, statusCode, message), 
list);
diff --git 
a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java
 
b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java
index 3c12c989f4d..d62595a2ae4 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java
@@ -576,7 +576,7 @@ public class ImplicitLinkedHashCollectionTest {
         assertFalse(coll.add(new TestElement(1, 2)));
         TestElement element2 = new TestElement(1, 2);
         TestElement element1 = coll.find(element2);
-        assertFalse(element2.equals(element1));
+        assertNotEquals(element2, element1);
         assertTrue(element2.elementKeysAreEqual(element1));
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java 
b/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
index 78a9060c404..d6edd914e18 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
@@ -87,32 +87,26 @@ public class MockScheduler implements Scheduler, 
MockTime.Listener {
                                   final Callable<T> callable, long delayMs) {
         final KafkaFutureImpl<T> future = new KafkaFutureImpl<>();
         KafkaFutureImpl<Long> waiter = new KafkaFutureImpl<>();
-        waiter.thenApply(new KafkaFuture.BaseFunction<Long, Void>() {
-            @Override
-            public Void apply(final Long now) {
-                executor.submit(new Callable<Void>() {
-                    @Override
-                    public Void call() {
-                        // Note: it is possible that we'll execute 
Callable#call right after
-                        // the future is cancelled.  This is a valid sequence 
of events
-                        // that the author of the Callable needs to be able to 
handle.
-                        //
-                        // Note 2: If the future is cancelled, we will not 
remove the waiter
-                        // from this MockTime object.  This small bit of 
inefficiency is acceptable
-                        // in testing code (at least we aren't polling!)
-                        if (!future.isCancelled()) {
-                            try {
-                                log.trace("Invoking {} at {}", callable, now);
-                                future.complete(callable.call());
-                            } catch (Throwable throwable) {
-                                future.completeExceptionally(throwable);
-                            }
-                        }
-                        return null;
+        waiter.thenApply((KafkaFuture.BaseFunction<Long, Void>) now -> {
+            executor.submit((Callable<Void>) () -> {
+                // Note: it is possible that we'll execute Callable#call right 
after
+                // the future is cancelled.  This is a valid sequence of events
+                // that the author of the Callable needs to be able to handle.
+                //
+                // Note 2: If the future is cancelled, we will not remove the 
waiter
+                // from this MockTime object.  This small bit of inefficiency 
is acceptable
+                // in testing code (at least we aren't polling!)
+                if (!future.isCancelled()) {
+                    try {
+                        log.trace("Invoking {} at {}", callable, now);
+                        future.complete(callable.call());
+                    } catch (Throwable throwable) {
+                        future.completeExceptionally(throwable);
                     }
-                });
+                }
                 return null;
-            }
+            });
+            return null;
         });
         log.trace("Scheduling {} for {} ms from now.", callable, delayMs);
         addWaiter(delayMs, waiter);
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java 
b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index 50e6f179f61..2366da33662 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -71,6 +71,7 @@ import static 
org.apache.kafka.common.utils.Utils.validHostPattern;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -895,7 +896,7 @@ public class UtilsTest {
         if (a == null) {
             assertNotNull(b);
         } else {
-            assertFalse(a.equals(b));
+            assertNotEquals(a, b);
         }
         assertFalse(Utils.isEqualConstantTime(first, second));
         assertFalse(Utils.isEqualConstantTime(second, first));
@@ -907,7 +908,7 @@ public class UtilsTest {
         if (a == null) {
             assertNull(b);
         } else {
-            assertTrue(a.equals(b));
+            assertEquals(a, b);
         }
         assertTrue(Utils.isEqualConstantTime(first, second));
         assertTrue(Utils.isEqualConstantTime(second, first));
diff --git a/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java 
b/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
index cfb5f6ca196..1d2b1c54e4e 100644
--- a/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
+++ b/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
@@ -149,20 +149,20 @@ public class Microbenchmarks {
         t3.join();
         t4.join();
 
-        Map<String, Integer> values = new HashMap<String, Integer>();
+        Map<String, Integer> values = new HashMap<>();
         for (int i = 0; i < 100; i++)
             values.put(Integer.toString(i), i);
         System.out.println("HashMap:");
         benchMap(2, 1000000, values);
         System.out.println("ConcurentHashMap:");
-        benchMap(2, 1000000, new ConcurrentHashMap<String, Integer>(values));
+        benchMap(2, 1000000, new ConcurrentHashMap<>(values));
         System.out.println("CopyOnWriteMap:");
-        benchMap(2, 1000000, new CopyOnWriteMap<String, Integer>(values));
+        benchMap(2, 1000000, new CopyOnWriteMap<>(values));
     }
 
     private static void benchMap(int numThreads, final int iters, final 
Map<String, Integer> map) throws Exception {
-        final List<String> keys = new ArrayList<String>(map.keySet());
-        final List<Thread> threads = new ArrayList<Thread>();
+        final List<String> keys = new ArrayList<>(map.keySet());
+        final List<Thread> threads = new ArrayList<>();
         for (int i = 0; i < numThreads; i++) {
             threads.add(new Thread() {
                 public void run() {
diff --git a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java 
b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
index 62f53d7cdee..6d006ea9c9e 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
@@ -330,7 +330,7 @@ public class TestSslUtils {
 
     static String pem(Certificate cert) throws IOException {
         ByteArrayOutputStream out = new ByteArrayOutputStream();
-        try (PemWriter pemWriter = new PemWriter(new OutputStreamWriter(out, 
StandardCharsets.UTF_8.name()))) {
+        try (PemWriter pemWriter = new PemWriter(new OutputStreamWriter(out, 
StandardCharsets.UTF_8))) {
             pemWriter.writeObject(new JcaMiscPEMGenerator(cert));
         }
         return new String(out.toByteArray(), StandardCharsets.UTF_8);
@@ -338,7 +338,7 @@ public class TestSslUtils {
 
     static String pem(PrivateKey privateKey, Password password) throws 
IOException {
         ByteArrayOutputStream out = new ByteArrayOutputStream();
-        try (PemWriter pemWriter = new PemWriter(new OutputStreamWriter(out, 
StandardCharsets.UTF_8.name()))) {
+        try (PemWriter pemWriter = new PemWriter(new OutputStreamWriter(out, 
StandardCharsets.UTF_8))) {
             if (password == null) {
                 pemWriter.writeObject(new JcaPKCS8Generator(privateKey, null));
             } else {

Reply via email to