This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 00e5803cd3a MINOR: Various cleanups in client tests (#13094)
00e5803cd3a is described below
commit 00e5803cd3af89011254e734232308618403309d
Author: Mickael Maison <[email protected]>
AuthorDate: Mon Jan 23 13:07:44 2023 +0100
MINOR: Various cleanups in client tests (#13094)
Reviewers: Luke Chen <[email protected]>, Ismael Juma <[email protected]>,
Divij Vaidya <[email protected]>, Christo Lolov <[email protected]>
---
.../kafka/clients/ClusterConnectionStatesTest.java | 3 +-
.../java/org/apache/kafka/clients/MockClient.java | 11 ++---
.../apache/kafka/clients/NetworkClientTest.java | 2 +-
.../clients/consumer/ConsumerRecordsTest.java | 2 +-
.../internals/ConsumerInterceptorsTest.java | 2 +-
.../consumer/internals/ConsumerMetadataTest.java | 3 +-
.../internals/ConsumerNetworkClientTest.java | 21 ++-------
.../kafka/clients/producer/ProducerRecordTest.java | 10 ++---
.../kafka/clients/producer/RecordSendTest.java | 26 +++++------
.../producer/RoundRobinPartitionerTest.java | 10 ++---
.../producer/UniformStickyPartitionerTest.java | 10 ++---
.../clients/producer/internals/BufferPoolTest.java | 12 +++--
.../producer/internals/DefaultPartitionerTest.java | 2 +-
.../producer/internals/ProducerBatchTest.java | 6 +--
.../producer/internals/RecordAccumulatorTest.java | 51 +++++++++-------------
.../clients/producer/internals/SenderTest.java | 27 ++++++------
.../internals/StickyPartitionCacheTest.java | 2 +-
.../org/apache/kafka/common/KafkaFutureTest.java | 2 +-
.../apache/kafka/common/acl/AclOperationTest.java | 4 +-
.../kafka/common/acl/AclPermissionTypeTest.java | 6 +--
.../apache/kafka/common/cache/LRUCacheTest.java | 10 +++--
.../apache/kafka/common/config/ConfigDefTest.java | 38 ++++++++--------
.../kafka/common/config/ConfigTransformerTest.java | 16 +++----
.../provider/DirectoryConfigProviderTest.java | 2 +-
.../config/provider/FileConfigProviderTest.java | 12 ++---
.../common/header/internals/RecordHeadersTest.java | 4 +-
.../common/internals/PartitionStatesTest.java | 2 +-
.../kafka/common/message/RecordsSerdeTest.java | 8 ++--
.../kafka/common/metrics/KafkaMbeanTest.java | 6 +--
.../apache/kafka/common/metrics/MetricsTest.java | 18 ++++----
.../apache/kafka/common/metrics/SensorTest.java | 22 ++++------
.../common/metrics/stats/FrequenciesTest.java | 2 +-
.../apache/kafka/common/network/EchoServer.java | 49 ++++++++++-----------
.../apache/kafka/common/network/NioEchoServer.java | 6 +--
.../kafka/common/network/PlaintextSender.java | 17 +++-----
.../apache/kafka/common/network/SelectorTest.java | 27 +++++-------
.../kafka/common/network/SslSelectorTest.java | 6 +--
.../org/apache/kafka/common/network/SslSender.java | 5 +--
.../kafka/common/record/DefaultRecordTest.java | 2 +-
.../kafka/common/record/FileRecordsTest.java | 2 +-
.../kafka/common/security/JaasContextTest.java | 2 +-
... OAuthBearerSaslClientCallbackHandlerTest.java} | 2 +-
.../OAuthBearerClientInitialResponseTest.java | 2 +-
.../internals/OAuthBearerSaslServerTest.java | 12 +++--
.../ExpiringCredentialRefreshingLoginTest.java | 2 +-
.../secured/HttpAccessTokenRetrieverTest.java | 6 +--
.../oauthbearer/internals/secured/RetryTest.java | 2 +-
...earerUnsecuredValidatorCallbackHandlerTest.java | 11 ++---
.../plain/internals/PlainSaslServerTest.java | 4 +-
.../scram/internals/ScramSaslServerTest.java | 2 +-
.../kafka/common/security/ssl/SslFactoryTest.java | 2 +-
.../security/ssl/mock/TestTrustManagerFactory.java | 13 +++---
.../common/serialization/SerializationTest.java | 4 +-
.../kafka/common/utils/AbstractIteratorTest.java | 4 +-
.../org/apache/kafka/common/utils/ExitTest.java | 4 +-
.../utils/ImplicitLinkedHashCollectionTest.java | 2 +-
.../apache/kafka/common/utils/MockScheduler.java | 42 ++++++++----------
.../org/apache/kafka/common/utils/UtilsTest.java | 5 ++-
.../org/apache/kafka/test/Microbenchmarks.java | 10 ++---
.../java/org/apache/kafka/test/TestSslUtils.java | 4 +-
60 files changed, 274 insertions(+), 327 deletions(-)
diff --git
a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
index 96fe89ca11e..b16de1a3164 100644
---
a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
@@ -183,7 +184,7 @@ public class ClusterConnectionStatesTest {
connectionStates.authenticationFailed(nodeId1, time.milliseconds(),
new AuthenticationException("No path to CA for certificate!"));
time.sleep(1000);
assertEquals(connectionStates.connectionState(nodeId1),
ConnectionState.AUTHENTICATION_FAILED);
- assertTrue(connectionStates.authenticationException(nodeId1)
instanceof AuthenticationException);
+ assertNotNull(connectionStates.authenticationException(nodeId1));
assertFalse(connectionStates.hasReadyNodes(time.milliseconds()));
assertFalse(connectionStates.canConnect(nodeId1, time.milliseconds()));
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 28d363bb8ef..c383df9338d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import java.util.ArrayList;
@@ -471,12 +470,10 @@ public class MockClient implements KafkaClient {
}
public void waitForRequests(final int minRequests, long maxWaitMs) throws
InterruptedException {
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return requests.size() >= minRequests;
- }
- }, maxWaitMs, "Expected requests have not been sent");
+ TestUtils.waitForCondition(
+ () -> requests.size() >= minRequests,
+ maxWaitMs,
+ "Expected requests have not been sent");
}
public void reset() {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 63b44835f63..ab7235e112e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -913,7 +913,7 @@ public class NetworkClientTest {
}
@Test
- public void testCallDisconnect() throws Exception {
+ public void testCallDisconnect() {
awaitReady(client, node);
assertTrue(client.isReady(node, time.milliseconds()),
"Expected NetworkClient to be ready to send to node " +
node.idString());
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java
index d414450ac0e..ea870c24781 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java
@@ -39,7 +39,7 @@ public class ConsumerRecordsTest {
Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records =
new LinkedHashMap<>();
String topic = "topic";
- records.put(new TopicPartition(topic, 0), new
ArrayList<ConsumerRecord<Integer, String>>());
+ records.put(new TopicPartition(topic, 0), new ArrayList<>());
ConsumerRecord<Integer, String> record1 = new ConsumerRecord<>(topic,
1, 0, 0L, TimestampType.CREATE_TIME,
0, 0, 1, "value1", new RecordHeaders(), Optional.empty());
ConsumerRecord<Integer, String> record2 = new ConsumerRecord<>(topic,
1, 1, 0L, TimestampType.CREATE_TIME,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
index 19ac2560929..4331e720541 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
@@ -78,7 +78,7 @@ public class ConsumerInterceptorsTest {
if (tp.partition() != filterPartition)
recordMap.put(tp, records.records(tp));
}
- return new ConsumerRecords<K, V>(recordMap);
+ return new ConsumerRecords<>(recordMap);
}
@Override
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
index 02ab81c500e..24b1fa6fb33 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
@@ -46,6 +46,7 @@ import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ConsumerMetadataTest {
@@ -142,7 +143,7 @@ public class ConsumerMetadataTest {
metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds(1,
topicPartitionCounts, topicIds), false, time.milliseconds());
assertEquals(singleton("foo"), new
HashSet<>(metadata.fetch().topics()));
assertEquals(topicIds.get("foo"), metadata.topicIds().get("foo"));
- assertEquals(topicIds.get("bar"), null);
+ assertNull(topicIds.get("bar"));
}
private void testBasicSubscription(Set<String> expectedTopics, Set<String>
expectedInternalTopics) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index df74f7ede1e..8d0e451ef13 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -221,12 +221,7 @@ public class ConsumerNetworkClientTest {
final RequestFuture<ClientResponse> future = consumerClient.send(node,
heartbeat());
client.enableBlockingUntilWakeup(1);
- Thread t = new Thread() {
- @Override
- public void run() {
- consumerClient.poll(future);
- }
- };
+ Thread t = new Thread(() -> consumerClient.poll(future));
t.start();
consumerClient.disconnectAsync(node);
@@ -284,23 +279,13 @@ public class ConsumerNetworkClientTest {
consumerClient.pollNoWakeup(); // dequeue and send the request
client.enableBlockingUntilWakeup(2);
- Thread t1 = new Thread() {
- @Override
- public void run() {
- consumerClient.pollNoWakeup();
- }
- };
+ Thread t1 = new Thread(() -> consumerClient.pollNoWakeup());
t1.start();
// Sleep a little so that t1 is blocking in poll
Thread.sleep(50);
- Thread t2 = new Thread() {
- @Override
- public void run() {
- consumerClient.poll(future);
- }
- };
+ Thread t2 = new Thread(() -> consumerClient.poll(future));
t2.start();
// Sleep a little so that t2 is awaiting the network client lock
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
index 8cb18d67fc8..b5513cd73e4 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
@@ -19,7 +19,7 @@ package org.apache.kafka.clients.producer;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.fail;
public class ProducerRecordTest {
@@ -35,16 +35,16 @@ public class ProducerRecordTest {
assertEquals(producerRecord.hashCode(), equalRecord.hashCode());
ProducerRecord<String, Integer> topicMisMatch = new
ProducerRecord<>("test-1", 1, "key", 1);
- assertFalse(producerRecord.equals(topicMisMatch));
+ assertNotEquals(producerRecord, topicMisMatch);
ProducerRecord<String, Integer> partitionMismatch = new
ProducerRecord<>("test", 2, "key", 1);
- assertFalse(producerRecord.equals(partitionMismatch));
+ assertNotEquals(producerRecord, partitionMismatch);
ProducerRecord<String, Integer> keyMisMatch = new
ProducerRecord<>("test", 1, "key-1", 1);
- assertFalse(producerRecord.equals(keyMisMatch));
+ assertNotEquals(producerRecord, keyMisMatch);
ProducerRecord<String, Integer> valueMisMatch = new
ProducerRecord<>("test", 1, "key", 2);
- assertFalse(producerRecord.equals(valueMisMatch));
+ assertNotEquals(producerRecord, valueMisMatch);
ProducerRecord<String, Integer> nullFieldsRecord = new
ProducerRecord<>("topic", null, null, null, null, null);
assertEquals(nullFieldsRecord, nullFieldsRecord);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
index 86632eecdc8..6dd93f7fc64 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
@@ -66,7 +66,7 @@ public class RecordSendTest {
* Test that an asynchronous request will eventually throw the right
exception
*/
@Test
- public void testError() throws Exception {
+ public void testError() {
FutureRecordMetadata future = new
FutureRecordMetadata(asyncRequest(baseOffset, new CorruptRecordException(),
50L),
relOffset, RecordBatch.NO_TIMESTAMP, 0, 0, Time.SYSTEM);
assertThrows(ExecutionException.class, future::get);
@@ -85,20 +85,18 @@ public class RecordSendTest {
/* create a new request result that will be completed after the given
timeout */
public ProduceRequestResult asyncRequest(final long baseOffset, final
RuntimeException error, final long timeout) {
final ProduceRequestResult request = new
ProduceRequestResult(topicPartition);
- Thread thread = new Thread() {
- public void run() {
- try {
- sleep(timeout);
- if (error == null) {
- request.set(baseOffset, RecordBatch.NO_TIMESTAMP,
null);
- } else {
- request.set(-1L, RecordBatch.NO_TIMESTAMP, index ->
error);
- }
+ Thread thread = new Thread(() -> {
+ try {
+ Thread.sleep(timeout);
+ if (error == null) {
+ request.set(baseOffset, RecordBatch.NO_TIMESTAMP, null);
+ } else {
+ request.set(-1L, RecordBatch.NO_TIMESTAMP, index -> error);
+ }
- request.done();
- } catch (InterruptedException e) { }
- }
- };
+ request.done();
+ } catch (InterruptedException e) { }
+ });
thread.start();
return request;
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java
index dfb98e9f00b..f65e1766116 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java
@@ -51,7 +51,7 @@ public class RoundRobinPartitionerTest {
int countForPart2 = 0;
Partitioner partitioner = new RoundRobinPartitioner();
Cluster cluster = new Cluster("clusterId", asList(NODES[0], NODES[1],
NODES[2]), partitions,
- Collections.<String>emptySet(), Collections.<String>emptySet());
+ Collections.emptySet(), Collections.emptySet());
for (int i = 1; i <= 100; i++) {
int part = partitioner.partition("test", null, null, null, null,
cluster);
assertTrue(part == 0 || part == 2, "We should never choose a
leader-less node in round robin");
@@ -64,7 +64,7 @@ public class RoundRobinPartitionerTest {
}
@Test
- public void testRoundRobinWithKeyBytes() throws InterruptedException {
+ public void testRoundRobinWithKeyBytes() {
final String topicA = "topicA";
final String topicB = "topicB";
@@ -72,7 +72,7 @@ public class RoundRobinPartitionerTest {
new PartitionInfo(topicA, 1, NODES[1], NODES, NODES), new
PartitionInfo(topicA, 2, NODES[2], NODES, NODES),
new PartitionInfo(topicB, 0, NODES[0], NODES, NODES));
Cluster testCluster = new Cluster("clusterId", asList(NODES[0],
NODES[1], NODES[2]), allPartitions,
- Collections.<String>emptySet(),
Collections.<String>emptySet());
+ Collections.emptySet(), Collections.emptySet());
final Map<Integer, Integer> partitionCount = new HashMap<>();
@@ -96,7 +96,7 @@ public class RoundRobinPartitionerTest {
}
@Test
- public void testRoundRobinWithNullKeyBytes() throws InterruptedException {
+ public void testRoundRobinWithNullKeyBytes() {
final String topicA = "topicA";
final String topicB = "topicB";
@@ -104,7 +104,7 @@ public class RoundRobinPartitionerTest {
new PartitionInfo(topicA, 1, NODES[1], NODES, NODES), new
PartitionInfo(topicA, 2, NODES[2], NODES, NODES),
new PartitionInfo(topicB, 0, NODES[0], NODES, NODES));
Cluster testCluster = new Cluster("clusterId", asList(NODES[0],
NODES[1], NODES[2]), allPartitions,
- Collections.<String>emptySet(),
Collections.<String>emptySet());
+ Collections.emptySet(), Collections.emptySet());
final Map<Integer, Integer> partitionCount = new HashMap<>();
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/UniformStickyPartitionerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/UniformStickyPartitionerTest.java
index f5484071717..0880dccebb2 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/UniformStickyPartitionerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/UniformStickyPartitionerTest.java
@@ -56,7 +56,7 @@ public class UniformStickyPartitionerTest {
int part = 0;
Partitioner partitioner = new UniformStickyPartitioner();
Cluster cluster = new Cluster("clusterId", asList(NODES[0], NODES[1],
NODES[2]), partitions,
- Collections.<String>emptySet(), Collections.<String>emptySet());
+ Collections.emptySet(), Collections.emptySet());
for (int i = 0; i < 50; i++) {
part = partitioner.partition("test", null, null, null, null,
cluster);
assertTrue(part == 0 || part == 2, "We should never choose a
leader-less node in round robin");
@@ -80,12 +80,12 @@ public class UniformStickyPartitionerTest {
@SuppressWarnings("deprecation")
@Test
- public void testRoundRobinWithKeyBytes() throws InterruptedException {
+ public void testRoundRobinWithKeyBytes() {
List<PartitionInfo> allPartitions = asList(new PartitionInfo(TOPIC_A,
0, NODES[0], NODES, NODES),
new PartitionInfo(TOPIC_A, 1, NODES[1], NODES, NODES), new
PartitionInfo(TOPIC_A, 2, NODES[1], NODES, NODES),
new PartitionInfo(TOPIC_B, 0, NODES[0], NODES, NODES));
Cluster testCluster = new Cluster("clusterId", asList(NODES[0],
NODES[1], NODES[2]), allPartitions,
- Collections.<String>emptySet(),
Collections.<String>emptySet());
+ Collections.emptySet(), Collections.emptySet());
final Map<Integer, Integer> partitionCount = new HashMap<>();
@@ -145,12 +145,12 @@ public class UniformStickyPartitionerTest {
@SuppressWarnings("deprecation")
@Test
- public void testRoundRobinWithNullKeyBytes() throws InterruptedException {
+ public void testRoundRobinWithNullKeyBytes() {
List<PartitionInfo> allPartitions = asList(new PartitionInfo(TOPIC_A,
0, NODES[0], NODES, NODES),
new PartitionInfo(TOPIC_A, 1, NODES[1], NODES, NODES), new
PartitionInfo(TOPIC_A, 2, NODES[1], NODES, NODES),
new PartitionInfo(TOPIC_B, 0, NODES[0], NODES, NODES));
Cluster testCluster = new Cluster("clusterId", asList(NODES[0],
NODES[1], NODES[2]), allPartitions,
- Collections.<String>emptySet(),
Collections.<String>emptySet());
+ Collections.emptySet(), Collections.emptySet());
final Map<Integer, Integer> partitionCount = new HashMap<>();
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
index eaf35f9b8d0..ed372e452f4 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
@@ -294,7 +294,7 @@ public class BufferPoolTest {
final int poolableSize = 1024;
final long totalMemory = numThreads / 2 * poolableSize;
final BufferPool pool = new BufferPool(totalMemory, poolableSize,
metrics, time, metricGroup);
- List<StressTestThread> threads = new ArrayList<StressTestThread>();
+ List<StressTestThread> threads = new ArrayList<>();
for (int i = 0; i < numThreads; i++)
threads.add(new StressTestThread(pool, iterations));
for (StressTestThread thread : threads)
@@ -409,12 +409,10 @@ public class BufferPoolTest {
ByteBuffer buffer = pool.allocate(1, Long.MAX_VALUE);
ExecutorService executor = Executors.newFixedThreadPool(numWorkers);
- Callable<Void> work = new Callable<Void>() {
- public Void call() throws Exception {
- assertThrows(KafkaException.class, () -> pool.allocate(1,
Long.MAX_VALUE));
- return null;
- }
- };
+ Callable<Void> work = () -> {
+ assertThrows(KafkaException.class, () -> pool.allocate(1,
Long.MAX_VALUE));
+ return null;
+ };
for (int i = 0; i < numWorkers; ++i) {
executor.submit(work);
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
index e250748643a..c851408f363 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
@@ -45,7 +45,7 @@ public class DefaultPartitionerTest {
@SuppressWarnings("deprecation")
final Partitioner partitioner = new DefaultPartitioner();
final Cluster cluster = new Cluster("clusterId", asList(NODES),
PARTITIONS,
- Collections.<String>emptySet(), Collections.<String>emptySet());
+ Collections.emptySet(), Collections.emptySet());
int partition = partitioner.partition("test", null, KEY_BYTES, null,
null, cluster);
assertEquals(partition, partitioner.partition("test", null, KEY_BYTES,
null, null, cluster), "Same key should yield same partition");
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
index 6af8289f9ce..03f5e0b6aa8 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
@@ -155,9 +155,9 @@ public class ProducerBatchTest {
for (ProducerBatch splitProducerBatch : batches) {
for (RecordBatch splitBatch :
splitProducerBatch.records().batches()) {
for (Record record : splitBatch) {
- assertTrue(record.headers().length == 1, "Header size
should be 1.");
-
assertTrue(record.headers()[0].key().equals("header-key"), "Header key should
be 'header-key'.");
- assertTrue(new
String(record.headers()[0].value()).equals("header-value"), "Header value
should be 'header-value'.");
+ assertEquals(1, record.headers().length, "Header size
should be 1.");
+ assertEquals("header-key", record.headers()[0].key(),
"Header key should be 'header-key'.");
+ assertEquals("header-value", new
String(record.headers()[0].value()), "Header value should be 'header-value'.");
}
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 9408172004e..3605bc11711 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -120,7 +120,7 @@ public class RecordAccumulatorTest {
accum.append(topic, partition4, 0L, key, value, Record.EMPTY_HEADERS,
null, maxBlockTimeMs, false, time.milliseconds(), cluster);
// drain batches from 2 nodes: node1 => tp1, node2 => tp3, because the
max request size is full after the first batch drained
- Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new
HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+ Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new
HashSet<>(Arrays.asList(node1, node2)), (int) batchSize, 0);
verifyTopicPartitionInBatches(batches1, tp1, tp3);
// add record for tp1, tp3
@@ -129,11 +129,11 @@ public class RecordAccumulatorTest {
// drain batches from 2 nodes: node1 => tp2, node2 => tp4, because the
max request size is full after the first batch drained
// The drain index should start from next topic partition, that is,
node1 => tp2, node2 => tp4
- Map<Integer, List<ProducerBatch>> batches2 = accum.drain(cluster, new
HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+ Map<Integer, List<ProducerBatch>> batches2 = accum.drain(cluster, new
HashSet<>(Arrays.asList(node1, node2)), (int) batchSize, 0);
verifyTopicPartitionInBatches(batches2, tp2, tp4);
// make sure in next run, the drain index will start from the beginning
- Map<Integer, List<ProducerBatch>> batches3 = accum.drain(cluster, new
HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+ Map<Integer, List<ProducerBatch>> batches3 = accum.drain(cluster, new
HashSet<>(Arrays.asList(node1, node2)), (int) batchSize, 0);
verifyTopicPartitionInBatches(batches3, tp1, tp3);
// add record for tp2, tp3, tp4 and mute the tp4
@@ -142,7 +142,7 @@ public class RecordAccumulatorTest {
accum.append(topic, partition4, 0L, key, value, Record.EMPTY_HEADERS,
null, maxBlockTimeMs, false, time.milliseconds(), cluster);
accum.mutePartition(tp4);
// drain batches from 2 nodes: node1 => tp2, node2 => tp3 (because tp4
is muted)
- Map<Integer, List<ProducerBatch>> batches4 = accum.drain(cluster, new
HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+ Map<Integer, List<ProducerBatch>> batches4 = accum.drain(cluster, new
HashSet<>(Arrays.asList(node1, node2)), (int) batchSize, 0);
verifyTopicPartitionInBatches(batches4, tp2, tp3);
// add record for tp1, tp2, tp3, and unmute tp4
@@ -151,14 +151,14 @@ public class RecordAccumulatorTest {
accum.append(topic, partition3, 0L, key, value, Record.EMPTY_HEADERS,
null, maxBlockTimeMs, false, time.milliseconds(), cluster);
accum.unmutePartition(tp4);
// set maxSize as a max value, so that the all partitions in 2 nodes
should be drained: node1 => [tp1, tp2], node2 => [tp3, tp4]
- Map<Integer, List<ProducerBatch>> batches5 = accum.drain(cluster, new
HashSet<Node>(Arrays.asList(node1, node2)), Integer.MAX_VALUE, 0);
+ Map<Integer, List<ProducerBatch>> batches5 = accum.drain(cluster, new
HashSet<>(Arrays.asList(node1, node2)), Integer.MAX_VALUE, 0);
verifyTopicPartitionInBatches(batches5, tp1, tp2, tp3, tp4);
}
private void verifyTopicPartitionInBatches(Map<Integer,
List<ProducerBatch>> nodeBatches, TopicPartition... tp) {
- int allTpBatchCount =
nodeBatches.values().stream().flatMap(Collection::stream).collect(Collectors.toList()).size();
+ int allTpBatchCount = (int)
nodeBatches.values().stream().flatMap(Collection::stream).count();
assertEquals(tp.length, allTpBatchCount);
- List<TopicPartition> topicPartitionsInBatch = new
ArrayList<TopicPartition>();
+ List<TopicPartition> topicPartitionsInBatch = new ArrayList<>();
for (Map.Entry<Integer, List<ProducerBatch>> entry :
nodeBatches.entrySet()) {
List<ProducerBatch> tpBatchList = entry.getValue();
List<TopicPartition> tpList =
tpBatchList.stream().map(producerBatch ->
producerBatch.topicPartition).collect(Collectors.toList());
@@ -331,17 +331,15 @@ public class RecordAccumulatorTest {
1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
CompressionType.NONE, 0);
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < numThreads; i++) {
- threads.add(new Thread() {
- public void run() {
- for (int i = 0; i < msgs; i++) {
- try {
- accum.append(topic, i % numParts, 0L, key, value,
Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(),
cluster);
- } catch (Exception e) {
- e.printStackTrace();
- }
+ threads.add(new Thread(() -> {
+ for (int j = 0; j < msgs; j++) {
+ try {
+ accum.append(topic, j % numParts, 0L, key, value,
Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(),
cluster);
+ } catch (Exception e) {
+ e.printStackTrace();
}
}
- });
+ }));
}
for (Thread t : threads)
t.start();
@@ -480,12 +478,10 @@ public class RecordAccumulatorTest {
private void delayedInterrupt(final Thread thread, final long delayMs) {
- Thread t = new Thread() {
- public void run() {
- Time.SYSTEM.sleep(delayMs);
- thread.interrupt();
- }
- };
+ Thread t = new Thread(() -> {
+ Time.SYSTEM.sleep(delayMs);
+ thread.interrupt();
+ });
t.start();
}
@@ -517,7 +513,7 @@ public class RecordAccumulatorTest {
class TestCallback implements RecordAccumulator.AppendCallbacks {
@Override
public void onCompletion(RecordMetadata metadata, Exception
exception) {
- assertTrue(exception.getMessage().equals("Producer is closed
forcefully."));
+ assertEquals("Producer is closed forcefully.",
exception.getMessage());
numExceptionReceivedInCallback.incrementAndGet();
}
@@ -599,7 +595,7 @@ public class RecordAccumulatorTest {
private void doExpireBatchSingle(int deliveryTimeoutMs) throws
InterruptedException {
int lingerMs = 300;
List<Boolean> muteStates = Arrays.asList(false, true);
- Set<Node> readyNodes = null;
+ Set<Node> readyNodes;
List<ProducerBatch> expiredBatches = new ArrayList<>();
// test case assumes that the records do not fill the batch completely
int batchSize = 1025;
@@ -854,12 +850,7 @@ public class RecordAccumulatorTest {
byte[] value = new byte[1024];
final AtomicInteger acked = new AtomicInteger(0);
- Callback cb = new Callback() {
- @Override
- public void onCompletion(RecordMetadata metadata, Exception
exception) {
- acked.incrementAndGet();
- }
- };
+ Callback cb = (metadata, exception) -> acked.incrementAndGet();
// Append two messages so the batch is too big.
Future<RecordMetadata> future1 = batch.tryAppend(now, null, value,
Record.EMPTY_HEADERS, cb, now);
Future<RecordMetadata> future2 = batch.tryAppend(now, null, value,
Record.EMPTY_HEADERS, cb, now);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 3d972b3eb2c..bdbc1bd92e9 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -280,7 +280,6 @@ public class SenderTest {
/*
* Send multiple requests. Verify that the client side quota metrics have
the right values
*/
- @SuppressWarnings("deprecation")
@Test
public void testQuotaMetrics() {
MockSelector selector = new MockSelector(time);
@@ -640,7 +639,7 @@ public class SenderTest {
* polls are necessary to send requests.
*/
@Test
- public void testInitProducerIdWithMaxInFlightOne() throws Exception {
+ public void testInitProducerIdWithMaxInFlightOne() {
final long producerId = 123456L;
createMockClientWithMaxFlightOneMetadataPending();
@@ -668,7 +667,7 @@ public class SenderTest {
* polls are necessary to send requests.
*/
@Test
- public void testIdempotentInitProducerIdWithMaxInFlightOne() throws
Exception {
+ public void testIdempotentInitProducerIdWithMaxInFlightOne() {
final long producerId = 123456L;
createMockClientWithMaxFlightOneMetadataPending();
@@ -774,7 +773,7 @@ public class SenderTest {
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
sender.runOnce();
String nodeId = client.requests().peek().destination();
- Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
+ Node node = new Node(Integer.parseInt(nodeId), "localhost", 0);
assertEquals(1, client.inFlightRequestCount());
assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(OptionalInt.empty(),
transactionManager.lastAckedSequence(tp0));
@@ -824,7 +823,7 @@ public class SenderTest {
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
sender.runOnce();
String nodeId = client.requests().peek().destination();
- Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
+ Node node = new Node(Integer.parseInt(nodeId), "localhost", 0);
assertEquals(1, client.inFlightRequestCount());
assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(OptionalInt.empty(),
transactionManager.lastAckedSequence(tp0));
@@ -924,7 +923,7 @@ public class SenderTest {
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
sender.runOnce();
String nodeId = client.requests().peek().destination();
- Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
+ Node node = new Node(Integer.parseInt(nodeId), "localhost", 0);
assertEquals(1, client.inFlightRequestCount());
assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(OptionalInt.empty(),
transactionManager.lastAckedSequence(tp0));
@@ -984,7 +983,7 @@ public class SenderTest {
appendToAccumulator(tp0);
sender.runOnce();
String nodeId = client.requests().peek().destination();
- Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
+ Node node = new Node(Integer.parseInt(nodeId), "localhost", 0);
assertEquals(1, client.inFlightRequestCount());
// make sure the next sequence number accounts for multi-message
batches.
@@ -1256,7 +1255,7 @@ public class SenderTest {
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
sender.runOnce();
String nodeId = client.requests().peek().destination();
- Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
+ Node node = new Node(Integer.parseInt(nodeId), "localhost", 0);
assertEquals(1, client.inFlightRequestCount());
assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(OptionalInt.empty(),
transactionManager.lastAckedSequence(tp0));
@@ -1338,7 +1337,7 @@ public class SenderTest {
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
sender.runOnce();
String nodeId = client.requests().peek().destination();
- Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
+ Node node = new Node(Integer.parseInt(nodeId), "localhost", 0);
assertEquals(1, client.inFlightRequestCount());
// Send second ProduceRequest
@@ -1779,7 +1778,7 @@ public class SenderTest {
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
sender.runOnce();
String nodeId = client.requests().peek().destination();
- Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
+ Node node = new Node(Integer.parseInt(nodeId), "localhost", 0);
assertEquals(1, client.inFlightRequestCount());
assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(OptionalInt.empty(),
transactionManager.lastAckedSequence(tp0));
@@ -2315,7 +2314,7 @@ public class SenderTest {
sender.runOnce(); // connect.
sender.runOnce(); // send.
String id = client.requests().peek().destination();
- Node node = new Node(Integer.valueOf(id), "localhost", 0);
+ Node node = new Node(Integer.parseInt(id), "localhost", 0);
assertEquals(1, client.inFlightRequestCount());
assertTrue(client.isReady(node, time.milliseconds()), "Client ready
status should be true");
client.disconnect(id);
@@ -2425,7 +2424,7 @@ public class SenderTest {
assertEquals(2, txnManager.sequenceNumber(tp).longValue(), "The
next sequence should be 2");
String id = client.requests().peek().destination();
assertEquals(ApiKeys.PRODUCE,
client.requests().peek().requestBuilder().apiKey());
- Node node = new Node(Integer.valueOf(id), "localhost", 0);
+ Node node = new Node(Integer.parseInt(id), "localhost", 0);
assertEquals(1, client.inFlightRequestCount());
assertTrue(client.isReady(node, time.milliseconds()), "Client
ready status should be true");
@@ -2443,7 +2442,7 @@ public class SenderTest {
assertFalse(f2.isDone(), "The future shouldn't have been done.");
id = client.requests().peek().destination();
assertEquals(ApiKeys.PRODUCE,
client.requests().peek().requestBuilder().apiKey());
- node = new Node(Integer.valueOf(id), "localhost", 0);
+ node = new Node(Integer.parseInt(id), "localhost", 0);
assertEquals(1, client.inFlightRequestCount());
assertTrue(client.isReady(node, time.milliseconds()), "Client
ready status should be true");
@@ -2460,7 +2459,7 @@ public class SenderTest {
sender.runOnce(); // send the seconcd produce request
id = client.requests().peek().destination();
assertEquals(ApiKeys.PRODUCE,
client.requests().peek().requestBuilder().apiKey());
- node = new Node(Integer.valueOf(id), "localhost", 0);
+ node = new Node(Integer.parseInt(id), "localhost", 0);
assertEquals(1, client.inFlightRequestCount());
assertTrue(client.isReady(node, time.milliseconds()), "Client
ready status should be true");
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/StickyPartitionCacheTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/StickyPartitionCacheTest.java
index b1af9a3ac90..3371223afe6 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/StickyPartitionCacheTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/StickyPartitionCacheTest.java
@@ -66,7 +66,7 @@ public class StickyPartitionCacheTest {
int changedPartA3 = stickyPartitionCache.nextPartition(TOPIC_A,
testCluster, partA);
assertEquals(changedPartA3, changedPartA2);
- // Check that the we can still use the partitioner when there is only
one partition
+ // Check that we can still use the partitioner when there is only one
partition
int changedPartB = stickyPartitionCache.nextPartition(TOPIC_B,
testCluster, partB);
assertEquals(changedPartB, stickyPartitionCache.partition(TOPIC_B,
testCluster));
}
diff --git a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
index c9c36926c39..f9dc5e75fc5 100644
--- a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
@@ -147,7 +147,7 @@ public class KafkaFutureTest {
}
@Test
- public void testCompleteFuturesExceptionally() throws Exception {
+ public void testCompleteFuturesExceptionally() {
KafkaFutureImpl<Integer> futureFail = new KafkaFutureImpl<>();
assertTrue(futureFail.completeExceptionally(new RuntimeException("We
require more vespene gas")));
assertIsFailed(futureFail);
diff --git
a/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java
b/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java
index b91db6f206c..2e81a6d1eaa 100644
--- a/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java
@@ -54,7 +54,7 @@ public class AclOperationTest {
};
@Test
- public void testIsUnknown() throws Exception {
+ public void testIsUnknown() {
for (AclOperationTestInfo info : INFOS) {
assertEquals(info.unknown, info.operation.isUnknown(),
info.operation + " was supposed to have unknown == " +
info.unknown);
@@ -62,7 +62,7 @@ public class AclOperationTest {
}
@Test
- public void testCode() throws Exception {
+ public void testCode() {
assertEquals(AclOperation.values().length, INFOS.length);
for (AclOperationTestInfo info : INFOS) {
assertEquals(info.code, info.operation.code(), info.operation + "
was supposed to have code == " + info.code);
diff --git
a/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java
b/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java
index 8da6d2ac6c0..1f935579e11 100644
---
a/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java
@@ -44,14 +44,14 @@ public class AclPermissionTypeTest {
};
@Test
- public void testIsUnknown() throws Exception {
+ public void testIsUnknown() {
for (AclPermissionTypeTestInfo info : INFOS) {
assertEquals(info.unknown, info.ty.isUnknown(), info.ty + " was
supposed to have unknown == " + info.unknown);
}
}
@Test
- public void testCode() throws Exception {
+ public void testCode() {
assertEquals(AclPermissionType.values().length, INFOS.length);
for (AclPermissionTypeTestInfo info : INFOS) {
assertEquals(info.code, info.ty.code(), info.ty + " was supposed
to have code == " + info.code);
@@ -71,7 +71,7 @@ public class AclPermissionTypeTest {
}
@Test
- public void testExhaustive() throws Exception {
+ public void testExhaustive() {
assertEquals(INFOS.length, AclPermissionType.values().length);
for (int i = 0; i < INFOS.length; i++) {
assertEquals(INFOS[i].ty, AclPermissionType.values()[i]);
diff --git
a/clients/src/test/java/org/apache/kafka/common/cache/LRUCacheTest.java
b/clients/src/test/java/org/apache/kafka/common/cache/LRUCacheTest.java
index d7cd2ecc2d1..1e0d32b137b 100644
--- a/clients/src/test/java/org/apache/kafka/common/cache/LRUCacheTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/cache/LRUCacheTest.java
@@ -19,7 +19,9 @@ package org.apache.kafka.common.cache;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class LRUCacheTest {
@@ -49,20 +51,20 @@ public class LRUCacheTest {
cache.put("e", "f");
assertEquals(3, cache.size());
- assertEquals(true, cache.remove("a"));
+ assertTrue(cache.remove("a"));
assertEquals(2, cache.size());
assertNull(cache.get("a"));
assertEquals("d", cache.get("c"));
assertEquals("f", cache.get("e"));
- assertEquals(false, cache.remove("key-does-not-exist"));
+ assertFalse(cache.remove("key-does-not-exist"));
- assertEquals(true, cache.remove("c"));
+ assertTrue(cache.remove("c"));
assertEquals(1, cache.size());
assertNull(cache.get("c"));
assertEquals("f", cache.get("e"));
- assertEquals(true, cache.remove("e"));
+ assertTrue(cache.remove("e"));
assertEquals(0, cache.size());
assertNull(cache.get("e"));
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
index 76c20df4edf..d2c572f06f4 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
@@ -131,7 +131,7 @@ public class ConfigDefTest {
private void testBadInputs(Type type, Object... values) {
for (Object value : values) {
- Map<String, Object> m = new HashMap<String, Object>();
+ Map<String, Object> m = new HashMap<>();
m.put("name", value);
ConfigDef def = new ConfigDef().define("name", type,
Importance.HIGH, "docs");
try {
@@ -158,7 +158,7 @@ public class ConfigDefTest {
@Test
public void testNestedClass() {
// getName(), not getSimpleName() or getCanonicalName(), is the
version that should be able to locate the class
- Map<String, Object> props = Collections.<String,
Object>singletonMap("name", NestedClass.class.getName());
+ Map<String, Object> props = Collections.singletonMap("name",
NestedClass.class.getName());
new ConfigDef().define("name", Type.CLASS, Importance.HIGH,
"docs").parse(props);
}
@@ -240,10 +240,10 @@ public class ConfigDefTest {
Map<String, ConfigValue> expected = new HashMap<>();
String errorMessageB = "Missing required configuration \"b\" which has
no default value.";
String errorMessageC = "Missing required configuration \"c\" which has
no default value.";
- ConfigValue configA = new ConfigValue("a", 1,
Collections.<Object>emptyList(), Collections.<String>emptyList());
- ConfigValue configB = new ConfigValue("b", null,
Collections.<Object>emptyList(), Arrays.asList(errorMessageB, errorMessageB));
- ConfigValue configC = new ConfigValue("c", null,
Collections.<Object>emptyList(), Arrays.asList(errorMessageC));
- ConfigValue configD = new ConfigValue("d", 10,
Collections.<Object>emptyList(), Collections.<String>emptyList());
+ ConfigValue configA = new ConfigValue("a", 1, Collections.emptyList(),
Collections.emptyList());
+ ConfigValue configB = new ConfigValue("b", null,
Collections.emptyList(), Arrays.asList(errorMessageB, errorMessageB));
+ ConfigValue configC = new ConfigValue("c", null,
Collections.emptyList(), Arrays.asList(errorMessageC));
+ ConfigValue configD = new ConfigValue("d", 10,
Collections.emptyList(), Collections.emptyList());
expected.put("a", configA);
expected.put("b", configB);
expected.put("c", configC);
@@ -277,10 +277,10 @@ public class ConfigDefTest {
String errorMessageB = "Missing required configuration \"b\" which has
no default value.";
String errorMessageC = "Missing required configuration \"c\" which has
no default value.";
- ConfigValue configA = new ConfigValue("a", 1, Arrays.<Object>asList(1,
2, 3), Collections.<String>emptyList());
- ConfigValue configB = new ConfigValue("b", null,
Arrays.<Object>asList(4, 5), Arrays.asList(errorMessageB, errorMessageB));
- ConfigValue configC = new ConfigValue("c", null,
Arrays.<Object>asList(4, 5), Arrays.asList(errorMessageC));
- ConfigValue configD = new ConfigValue("d", 10,
Arrays.<Object>asList(1, 2, 3), Collections.<String>emptyList());
+ ConfigValue configA = new ConfigValue("a", 1, Arrays.asList(1, 2, 3),
Collections.emptyList());
+ ConfigValue configB = new ConfigValue("b", null, Arrays.asList(4, 5),
Arrays.asList(errorMessageB, errorMessageB));
+ ConfigValue configC = new ConfigValue("c", null, Arrays.asList(4, 5),
Arrays.asList(errorMessageC));
+ ConfigValue configD = new ConfigValue("d", 10, Arrays.asList(1, 2, 3),
Collections.emptyList());
expected.put("a", configA);
expected.put("b", configB);
@@ -312,9 +312,9 @@ public class ConfigDefTest {
String errorMessageC = "Missing required configuration \"c\" which has
no default value.";
String errorMessageD = "d is referred in the dependents, but not
defined.";
- ConfigValue configA = new ConfigValue("a", 1, Arrays.<Object>asList(1,
2, 3), Collections.<String>emptyList());
- ConfigValue configB = new ConfigValue("b", null,
Arrays.<Object>asList(4, 5), Arrays.asList(errorMessageB));
- ConfigValue configC = new ConfigValue("c", null,
Arrays.<Object>asList(4, 5), Arrays.asList(errorMessageC));
+ ConfigValue configA = new ConfigValue("a", 1, Arrays.asList(1, 2, 3),
Collections.emptyList());
+ ConfigValue configB = new ConfigValue("b", null, Arrays.asList(4, 5),
Arrays.asList(errorMessageB));
+ ConfigValue configC = new ConfigValue("c", null, Arrays.asList(4, 5),
Arrays.asList(errorMessageC));
ConfigValue configD = new ConfigValue("d", null,
Collections.emptyList(), Arrays.asList(errorMessageD));
configD.visible(false);
@@ -359,7 +359,7 @@ public class ConfigDefTest {
}
@Test
- public void testCanAddInternalConfig() throws Exception {
+ public void testCanAddInternalConfig() {
final String configName = "internal.config";
final ConfigDef configDef = new ConfigDef().defineInternal(configName,
Type.STRING, "", Importance.LOW);
final HashMap<String, String> properties = new HashMap<>();
@@ -384,7 +384,7 @@ public class ConfigDefTest {
}
@Test
- public void testDynamicUpdateModeInDocs() throws Exception {
+ public void testDynamicUpdateModeInDocs() {
final ConfigDef configDef = new ConfigDef()
.define("my.broker.config", Type.LONG, Importance.HIGH, "docs")
.define("my.cluster.config", Type.LONG, Importance.HIGH,
"docs")
@@ -477,13 +477,13 @@ public class ConfigDefTest {
ConfigDef def = new ConfigDef().define("name", type, defaultVal,
validator, Importance.HIGH, "docs");
for (Object value : okValues) {
- Map<String, Object> m = new HashMap<String, Object>();
+ Map<String, Object> m = new HashMap<>();
m.put("name", value);
def.parse(m);
}
for (Object value : badValues) {
- Map<String, Object> m = new HashMap<String, Object>();
+ Map<String, Object> m = new HashMap<>();
m.put("name", value);
try {
def.parse(m);
@@ -538,11 +538,11 @@ public class ConfigDefTest {
public void toEnrichedRst() {
final ConfigDef def = new ConfigDef()
.define("opt1.of.group1", Type.STRING, "a",
ValidString.in("a", "b", "c"), Importance.HIGH, "Doc doc.",
- "Group One", 0, Width.NONE, "..",
Collections.<String>emptyList())
+ "Group One", 0, Width.NONE, "..",
Collections.emptyList())
.define("opt2.of.group1", Type.INT,
ConfigDef.NO_DEFAULT_VALUE, Importance.MEDIUM, "Doc doc doc.",
"Group One", 1, Width.NONE, "..",
Arrays.asList("some.option1", "some.option2"))
.define("opt2.of.group2", Type.BOOLEAN, false,
Importance.HIGH, "Doc doc doc doc.",
- "Group Two", 1, Width.NONE, "..",
Collections.<String>emptyList())
+ "Group Two", 1, Width.NONE, "..",
Collections.emptyList())
.define("opt1.of.group2", Type.BOOLEAN, false,
Importance.HIGH, "Doc doc doc doc doc.",
"Group Two", 0, Width.NONE, "..",
singletonList("some.option"))
.define("poor.opt", Type.STRING, "foo", Importance.HIGH, "Doc
doc doc doc.");
diff --git
a/clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java
b/clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java
index 93296d9f492..50f8fd647f6 100644
---
a/clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java
@@ -48,7 +48,7 @@ public class ConfigTransformerTest {
}
@Test
- public void testReplaceVariable() throws Exception {
+ public void testReplaceVariable() {
ConfigTransformerResult result =
configTransformer.transform(Collections.singletonMap(MY_KEY,
"${test:testPath:testKey}"));
Map<String, String> data = result.data();
Map<String, Long> ttls = result.ttls();
@@ -57,7 +57,7 @@ public class ConfigTransformerTest {
}
@Test
- public void testReplaceVariableWithTTL() throws Exception {
+ public void testReplaceVariableWithTTL() {
ConfigTransformerResult result =
configTransformer.transform(Collections.singletonMap(MY_KEY,
"${test:testPath:testKeyWithTTL}"));
Map<String, String> data = result.data();
Map<String, Long> ttls = result.ttls();
@@ -66,28 +66,28 @@ public class ConfigTransformerTest {
}
@Test
- public void testReplaceMultipleVariablesInValue() throws Exception {
+ public void testReplaceMultipleVariablesInValue() {
ConfigTransformerResult result =
configTransformer.transform(Collections.singletonMap(MY_KEY, "hello,
${test:testPath:testKey}; goodbye, ${test:testPath:testKeyWithTTL}!!!"));
Map<String, String> data = result.data();
assertEquals("hello, testResult; goodbye, testResultWithTTL!!!",
data.get(MY_KEY));
}
@Test
- public void testNoReplacement() throws Exception {
+ public void testNoReplacement() {
ConfigTransformerResult result =
configTransformer.transform(Collections.singletonMap(MY_KEY,
"${test:testPath:missingKey}"));
Map<String, String> data = result.data();
assertEquals("${test:testPath:missingKey}", data.get(MY_KEY));
}
@Test
- public void testSingleLevelOfIndirection() throws Exception {
+ public void testSingleLevelOfIndirection() {
ConfigTransformerResult result =
configTransformer.transform(Collections.singletonMap(MY_KEY,
"${test:testPath:testIndirection}"));
Map<String, String> data = result.data();
assertEquals("${test:testPath:testResult}", data.get(MY_KEY));
}
@Test
- public void testReplaceVariableNoPath() throws Exception {
+ public void testReplaceVariableNoPath() {
ConfigTransformerResult result =
configTransformer.transform(Collections.singletonMap(MY_KEY,
"${test:testKey}"));
Map<String, String> data = result.data();
Map<String, Long> ttls = result.ttls();
@@ -96,14 +96,14 @@ public class ConfigTransformerTest {
}
@Test
- public void testReplaceMultipleVariablesWithoutPathInValue() throws
Exception {
+ public void testReplaceMultipleVariablesWithoutPathInValue() {
ConfigTransformerResult result =
configTransformer.transform(Collections.singletonMap(MY_KEY, "first
${test:testKey}; second ${test:testKey}"));
Map<String, String> data = result.data();
assertEquals("first testResultNoPath; second testResultNoPath",
data.get(MY_KEY));
}
@Test
- public void testNullConfigValue() throws Exception {
+ public void testNullConfigValue() {
ConfigTransformerResult result =
configTransformer.transform(Collections.singletonMap(MY_KEY, null));
Map<String, String> data = result.data();
Map<String, Long> ttls = result.ttls();
diff --git
a/clients/src/test/java/org/apache/kafka/common/config/provider/DirectoryConfigProviderTest.java
b/clients/src/test/java/org/apache/kafka/common/config/provider/DirectoryConfigProviderTest.java
index 7cf5422bf7f..d162e8a6ecf 100644
---
a/clients/src/test/java/org/apache/kafka/common/config/provider/DirectoryConfigProviderTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/config/provider/DirectoryConfigProviderTest.java
@@ -82,7 +82,7 @@ public class DirectoryConfigProviderTest {
}
@Test
- public void testGetAllKeysAtPath() throws IOException {
+ public void testGetAllKeysAtPath() {
ConfigData configData = provider.get(dir.getAbsolutePath());
assertEquals(toSet(asList(foo.getName(), bar.getName())),
configData.data().keySet());
assertEquals("FOO", configData.data().get(foo.getName()));
diff --git
a/clients/src/test/java/org/apache/kafka/common/config/provider/FileConfigProviderTest.java
b/clients/src/test/java/org/apache/kafka/common/config/provider/FileConfigProviderTest.java
index 431f382a16a..5bbb60c155d 100644
---
a/clients/src/test/java/org/apache/kafka/common/config/provider/FileConfigProviderTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/config/provider/FileConfigProviderTest.java
@@ -43,7 +43,7 @@ public class FileConfigProviderTest {
}
@Test
- public void testGetAllKeysAtPath() throws Exception {
+ public void testGetAllKeysAtPath() {
ConfigData configData = configProvider.get("dummy");
Map<String, String> result = new HashMap<>();
result.put("testKey", "testResult");
@@ -53,7 +53,7 @@ public class FileConfigProviderTest {
}
@Test
- public void testGetOneKeyAtPath() throws Exception {
+ public void testGetOneKeyAtPath() {
ConfigData configData = configProvider.get("dummy",
Collections.singleton("testKey"));
Map<String, String> result = new HashMap<>();
result.put("testKey", "testResult");
@@ -62,28 +62,28 @@ public class FileConfigProviderTest {
}
@Test
- public void testEmptyPath() throws Exception {
+ public void testEmptyPath() {
ConfigData configData = configProvider.get("",
Collections.singleton("testKey"));
assertTrue(configData.data().isEmpty());
assertNull(configData.ttl());
}
@Test
- public void testEmptyPathWithKey() throws Exception {
+ public void testEmptyPathWithKey() {
ConfigData configData = configProvider.get("");
assertTrue(configData.data().isEmpty());
assertNull(configData.ttl());
}
@Test
- public void testNullPath() throws Exception {
+ public void testNullPath() {
ConfigData configData = configProvider.get(null);
assertTrue(configData.data().isEmpty());
assertNull(configData.ttl());
}
@Test
- public void testNullPathWithKey() throws Exception {
+ public void testNullPathWithKey() {
ConfigData configData = configProvider.get(null,
Collections.singleton("testKey"));
assertTrue(configData.data().isEmpty());
assertNull(configData.ttl());
diff --git
a/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java
b/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java
index f4813fd1486..e6c66fcdba4 100644
---
a/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java
@@ -128,7 +128,7 @@ public class RecordHeadersTest {
}
@Test
- public void testReadOnly() throws IOException {
+ public void testReadOnly() {
RecordHeaders headers = new RecordHeaders();
headers.add(new RecordHeader("key", "value".getBytes()));
Iterator<Header> headerIteratorBeforeClose = headers.iterator();
@@ -190,7 +190,7 @@ public class RecordHeadersTest {
}
@Test
- public void testNew() throws IOException {
+ public void testNew() {
RecordHeaders headers = new RecordHeaders();
headers.add(new RecordHeader("key", "value".getBytes()));
headers.setReadOnly();
diff --git
a/clients/src/test/java/org/apache/kafka/common/internals/PartitionStatesTest.java
b/clients/src/test/java/org/apache/kafka/common/internals/PartitionStatesTest.java
index 52f017514f5..11c58ab734f 100644
---
a/clients/src/test/java/org/apache/kafka/common/internals/PartitionStatesTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/internals/PartitionStatesTest.java
@@ -174,7 +174,7 @@ public class PartitionStatesTest {
LinkedHashMap<TopicPartition, String> map = createMap();
states.set(map);
states.clear();
- checkState(states, new LinkedHashMap<TopicPartition, String>());
+ checkState(states, new LinkedHashMap<>());
}
@Test
diff --git
a/clients/src/test/java/org/apache/kafka/common/message/RecordsSerdeTest.java
b/clients/src/test/java/org/apache/kafka/common/message/RecordsSerdeTest.java
index 739bed9cc46..bbbe1627ae3 100644
---
a/clients/src/test/java/org/apache/kafka/common/message/RecordsSerdeTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/message/RecordsSerdeTest.java
@@ -31,7 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
public class RecordsSerdeTest {
@Test
- public void testSerdeRecords() throws Exception {
+ public void testSerdeRecords() {
MemoryRecords records = MemoryRecords.withRecords(CompressionType.NONE,
new SimpleRecord("foo".getBytes()),
new SimpleRecord("bar".getBytes()));
@@ -44,7 +44,7 @@ public class RecordsSerdeTest {
}
@Test
- public void testSerdeNullRecords() throws Exception {
+ public void testSerdeNullRecords() {
SimpleRecordsMessageData message = new SimpleRecordsMessageData()
.setTopic("foo");
assertNull(message.recordSet());
@@ -53,14 +53,14 @@ public class RecordsSerdeTest {
}
@Test
- public void testSerdeEmptyRecords() throws Exception {
+ public void testSerdeEmptyRecords() {
SimpleRecordsMessageData message = new SimpleRecordsMessageData()
.setTopic("foo")
.setRecordSet(MemoryRecords.EMPTY);
testAllRoundTrips(message);
}
- private void testAllRoundTrips(SimpleRecordsMessageData message) throws
Exception {
+ private void testAllRoundTrips(SimpleRecordsMessageData message) {
for (short version = SimpleRecordsMessageData.LOWEST_SUPPORTED_VERSION;
version <= SimpleRecordsMessageData.HIGHEST_SUPPORTED_VERSION;
version++) {
diff --git
a/clients/src/test/java/org/apache/kafka/common/metrics/KafkaMbeanTest.java
b/clients/src/test/java/org/apache/kafka/common/metrics/KafkaMbeanTest.java
index 5df66dbb22a..c0cd5bab721 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/KafkaMbeanTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/KafkaMbeanTest.java
@@ -116,21 +116,21 @@ public class KafkaMbeanTest {
}
@Test
- public void testInvoke() throws Exception {
+ public void testInvoke() {
RuntimeMBeanException e = assertThrows(RuntimeMBeanException.class,
() -> mBeanServer.invoke(objectName(countMetricName), "something",
null, null));
assertEquals(UnsupportedOperationException.class,
e.getCause().getClass());
}
@Test
- public void testSetAttribute() throws Exception {
+ public void testSetAttribute() {
RuntimeMBeanException e = assertThrows(RuntimeMBeanException.class,
() -> mBeanServer.setAttribute(objectName(countMetricName), new
Attribute("anything", 1)));
assertEquals(UnsupportedOperationException.class,
e.getCause().getClass());
}
@Test
- public void testSetAttributes() throws Exception {
+ public void testSetAttributes() {
RuntimeMBeanException e = assertThrows(RuntimeMBeanException.class,
() -> mBeanServer.setAttributes(objectName(countMetricName), new
AttributeList(1)));
assertEquals(UnsupportedOperationException.class,
e.getCause().getClass());
diff --git
a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index bc1fc5d9e56..b80fafe0dfc 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -20,10 +20,10 @@ import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.util.ArrayList;
@@ -92,7 +92,7 @@ public class MetricsTest {
@Test
public void testMetricName() {
MetricName n1 = metrics.metricName("name", "group", "description",
"key1", "value1", "key2", "value2");
- Map<String, String> tags = new HashMap<String, String>();
+ Map<String, String> tags = new HashMap<>();
tags.put("key1", "value1");
tags.put("key2", "value2");
MetricName n2 = metrics.metricName("name", "group", "description",
tags);
@@ -107,7 +107,7 @@ public class MetricsTest {
}
@Test
- public void testSimpleStats() throws Exception {
+ public void testSimpleStats() {
verifyStats(m -> (double) m.metricValue());
}
@@ -451,11 +451,11 @@ public class MetricsTest {
final Quota quota1 = Quota.upperBound(10.5);
final Quota quota2 = Quota.lowerBound(10.5);
- assertFalse(quota1.equals(quota2), "Quota with different upper values
shouldn't be equal");
+ assertNotEquals(quota1, quota2, "Quota with different upper values
shouldn't be equal");
final Quota quota3 = Quota.lowerBound(10.5);
- assertTrue(quota2.equals(quota3), "Quota with same upper and bound
values should be equal");
+ assertEquals(quota2, quota3, "Quota with same upper and bound values
should be equal");
}
@Test
@@ -582,7 +582,7 @@ public class MetricsTest {
}
@Test
- public void testRateWindowing() throws Exception {
+ public void testRateWindowing() {
// Use the default time window. Set 3 samples
MetricConfig cfg = new MetricConfig().samples(3);
Sensor s = metrics.sensor("test.sensor", cfg);
@@ -700,7 +700,7 @@ public class MetricsTest {
@Test
public void testMetricInstances() {
MetricName n1 = metrics.metricInstance(SampleMetrics.METRIC1, "key1",
"value1", "key2", "value2");
- Map<String, String> tags = new HashMap<String, String>();
+ Map<String, String> tags = new HashMap<>();
tags.put("key1", "value1");
tags.put("key2", "value2");
MetricName n2 = metrics.metricInstance(SampleMetrics.METRIC2, tags);
@@ -752,7 +752,7 @@ public class MetricsTest {
* in errors or deadlock.
*/
@Test
- public void testConcurrentReadUpdate() throws Exception {
+ public void testConcurrentReadUpdate() {
final Random random = new Random();
final Deque<Sensor> sensors = new ConcurrentLinkedDeque<>();
metrics = new Metrics(new MockTime(10));
@@ -784,7 +784,7 @@ public class MetricsTest {
* that synchronizes on every reporter method doesn't result in errors or
deadlock.
*/
@Test
- public void testConcurrentReadUpdateReport() throws Exception {
+ public void testConcurrentReadUpdateReport() {
class LockingReporter implements MetricsReporter {
Map<MetricName, KafkaMetric> activeMetrics = new HashMap<>();
diff --git
a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
index 77176e1f5c5..9254616528f 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
@@ -33,7 +33,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -197,19 +196,16 @@ public class SensorTest {
try {
for (int i = 0; i != threadCount; ++i) {
final int index = i;
- workers.add(service.submit(new Callable<Throwable>() {
- @Override
- public Throwable call() {
- try {
- assertTrue(latch.await(5, TimeUnit.SECONDS));
- for (int j = 0; j != 20; ++j) {
- sensor.record(j * index,
System.currentTimeMillis() + j, false);
- sensor.checkQuotas();
- }
- return null;
- } catch (Throwable e) {
- return e;
+ workers.add(service.submit(() -> {
+ try {
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ for (int j = 0; j != 20; ++j) {
+ sensor.record(j * index,
System.currentTimeMillis() + j, false);
+ sensor.checkQuotas();
}
+ return null;
+ } catch (Throwable e) {
+ return e;
}
}));
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
b/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
index b44306b4b83..344ade22a66 100644
---
a/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java
@@ -149,7 +149,7 @@ public class FrequenciesTest {
}
protected MetricName name(String metricName) {
- return new MetricName(metricName, "group-id", "desc",
Collections.<String, String>emptyMap());
+ return new MetricName(metricName, "group-id", "desc",
Collections.emptyMap());
}
protected Frequency freq(String name, double value) {
diff --git
a/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
b/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
index d0cc05942e3..f72964f12cc 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
@@ -62,8 +62,8 @@ class EchoServer extends Thread {
throw new IllegalArgumentException("Unsupported
securityProtocol " + securityProtocol);
}
this.port = this.serverSocket.getLocalPort();
- this.threads = Collections.synchronizedList(new ArrayList<Thread>());
- this.sockets = Collections.synchronizedList(new ArrayList<Socket>());
+ this.threads = Collections.synchronizedList(new ArrayList<>());
+ this.sockets = Collections.synchronizedList(new ArrayList<>());
}
public void renegotiate() {
@@ -80,35 +80,32 @@ class EchoServer extends Thread {
break;
}
sockets.add(socket);
- Thread thread = new Thread() {
- @Override
- public void run() {
- try {
- DataInputStream input = new
DataInputStream(socket.getInputStream());
- DataOutputStream output = new
DataOutputStream(socket.getOutputStream());
- while (socket.isConnected() &&
!socket.isClosed()) {
- int size = input.readInt();
- if (renegotiate.get()) {
- renegotiate.set(false);
- ((SSLSocket) socket).startHandshake();
- }
- byte[] bytes = new byte[size];
- input.readFully(bytes);
- output.writeInt(size);
- output.write(bytes);
- output.flush();
+ Thread thread = new Thread(() -> {
+ try {
+ DataInputStream input = new
DataInputStream(socket.getInputStream());
+ DataOutputStream output = new
DataOutputStream(socket.getOutputStream());
+ while (socket.isConnected() && !socket.isClosed())
{
+ int size = input.readInt();
+ if (renegotiate.get()) {
+ renegotiate.set(false);
+ ((SSLSocket) socket).startHandshake();
}
+ byte[] bytes = new byte[size];
+ input.readFully(bytes);
+ output.writeInt(size);
+ output.write(bytes);
+ output.flush();
+ }
+ } catch (IOException e) {
+ // ignore
+ } finally {
+ try {
+ socket.close();
} catch (IOException e) {
// ignore
- } finally {
- try {
- socket.close();
- } catch (IOException e) {
- // ignore
- }
}
}
- };
+ });
thread.start();
threads.add(thread);
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index 15bfa559dce..0cd563355e5 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -111,8 +111,8 @@ public class NioEchoServer extends Thread {
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(serverHost,
0));
this.port = serverSocketChannel.socket().getLocalPort();
- this.socketChannels = Collections.synchronizedList(new
ArrayList<SocketChannel>());
- this.newChannels = Collections.synchronizedList(new
ArrayList<SocketChannel>());
+ this.socketChannels = Collections.synchronizedList(new ArrayList<>());
+ this.newChannels = Collections.synchronizedList(new ArrayList<>());
this.credentialCache = credentialCache;
this.tokenCache = tokenCache;
if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT ||
securityProtocol == SecurityProtocol.SASL_SSL) {
@@ -191,7 +191,7 @@ public class NioEchoServer extends Thread {
long thisMaxWaitMs = maxAggregateWaitMs - currentElapsedMs;
String metricName = namePrefix + metricType.metricNameSuffix();
if (expectedValue == 0.0) {
- Double expected = expectedValue;
+ double expected = expectedValue;
if (metricType == MetricType.MAX || metricType ==
MetricType.AVG)
expected = Double.NaN;
diff --git
a/clients/src/test/java/org/apache/kafka/common/network/PlaintextSender.java
b/clients/src/test/java/org/apache/kafka/common/network/PlaintextSender.java
index 3338d039a07..88333e02a26 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/PlaintextSender.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/PlaintextSender.java
@@ -26,16 +26,13 @@ import java.net.Socket;
public class PlaintextSender extends Thread {
public PlaintextSender(final InetSocketAddress serverAddress, final byte[]
payload) {
- super(new Runnable() {
- @Override
- public void run() {
- try (Socket connection = new
Socket(serverAddress.getAddress(), serverAddress.getPort());
- OutputStream os = connection.getOutputStream()) {
- os.write(payload);
- os.flush();
- } catch (Exception e) {
- e.printStackTrace(System.err);
- }
+ super(() -> {
+ try (Socket connection = new Socket(serverAddress.getAddress(),
serverAddress.getPort());
+ OutputStream os = connection.getOutputStream()) {
+ os.write(payload);
+ os.flush();
+ } catch (Exception e) {
+ e.printStackTrace(System.err);
}
});
setDaemon(true);
diff --git
a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 09f14531def..618e86e1dae 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -59,6 +58,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import static java.util.Arrays.asList;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -129,15 +129,12 @@ public class SelectorTest {
// disconnect
this.server.closeConnections();
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- try {
- selector.poll(1000L);
- return selector.disconnected().containsKey(node);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ waitForCondition(() -> {
+ try {
+ selector.poll(1000L);
+ return selector.disconnected().containsKey(node);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
}, 5000, "Failed to observe disconnected node in disconnected set");
@@ -259,7 +256,7 @@ public class SelectorTest {
if (channelBuilder instanceof PlaintextChannelBuilder) {
assertEquals(0, cipherMetrics(metrics).size());
} else {
- TestUtils.waitForCondition(() -> cipherMetrics(metrics).size() ==
1,
+ waitForCondition(() -> cipherMetrics(metrics).size() == 1,
"Waiting for cipher metrics to be created.");
assertEquals(Integer.valueOf(5),
cipherMetrics(metrics).get(0).metricValue());
}
@@ -300,7 +297,7 @@ public class SelectorTest {
KafkaMetric outgoingByteTotal =
findUntaggedMetricByName("outgoing-byte-total");
KafkaMetric incomingByteTotal =
findUntaggedMetricByName("incoming-byte-total");
- TestUtils.waitForCondition(() -> {
+ waitForCondition(() -> {
long bytesSent = send.size() - send.remaining();
assertEquals(bytesSent, ((Double)
outgoingByteTotal.metricValue()).longValue());
@@ -631,7 +628,7 @@ public class SelectorTest {
int receiveCount = 0;
KafkaChannel channel = createConnectionWithPendingReceives(i);
// Poll until one or more receives complete and then close the
server-side connection
- TestUtils.waitForCondition(() -> {
+ waitForCondition(() -> {
selector.poll(1000);
return selector.completedReceives().size() > 0;
}, 5000, "Receive not completed");
@@ -662,7 +659,7 @@ public class SelectorTest {
selector.poll(1000); // Wait until some data arrives, but not a
completed receive
assertEquals(0, selector.completedReceives().size());
server.closeConnections();
- TestUtils.waitForCondition(() -> {
+ waitForCondition(() -> {
try {
selector.poll(100);
return !selector.disconnected().isEmpty();
@@ -681,7 +678,7 @@ public class SelectorTest {
selector.close();
MemoryPool pool = new SimpleMemoryPool(900, 900, false, null);
selector = new Selector(NetworkReceive.UNLIMITED,
CONNECTION_MAX_IDLE_MS, metrics, time, "MetricGroup",
- new HashMap<String, String>(), true, false, channelBuilder, pool,
new LogContext());
+ new HashMap<>(), true, false, channelBuilder, pool, new
LogContext());
try (ServerSocketChannel ss = ServerSocketChannel.open()) {
ss.bind(new InetSocketAddress(0));
diff --git
a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
index 41f0096b120..4f617961a73 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -257,7 +257,7 @@ public abstract class SslSelectorTest extends SelectorTest {
channelBuilder = new SslChannelBuilder(Mode.SERVER, null, false, new
LogContext());
channelBuilder.configure(sslServerConfigs);
selector = new Selector(NetworkReceive.UNLIMITED, 5000, metrics, time,
"MetricGroup",
- new HashMap<String, String>(), true, false, channelBuilder,
pool, new LogContext());
+ new HashMap<>(), true, false, channelBuilder, pool, new
LogContext());
try (ServerSocketChannel ss = ServerSocketChannel.open()) {
ss.bind(new InetSocketAddress(0));
@@ -346,7 +346,7 @@ public abstract class SslSelectorTest extends SelectorTest {
@Override
protected SslTransportLayer buildTransportLayer(SslFactory sslFactory,
String id, SelectionKey key,
-
ChannelMetadataRegistry metadataRegistry) throws IOException {
+
ChannelMetadataRegistry metadataRegistry) {
SocketChannel socketChannel = (SocketChannel) key.channel();
SSLEngine sslEngine =
sslFactory.createSslEngine(socketChannel.socket());
TestSslTransportLayer transportLayer = new
TestSslTransportLayer(id, key, sslEngine, metadataRegistry);
@@ -380,7 +380,7 @@ public abstract class SslSelectorTest extends SelectorTest {
// Leave one byte in network read buffer so that some buffered
bytes are present,
// but not enough to make progress on a read.
- void truncateReadBuffer() throws Exception {
+ void truncateReadBuffer() {
netReadBuffer().position(1);
appReadBuffer().position(0);
muteSocket = true;
diff --git
a/clients/src/test/java/org/apache/kafka/common/network/SslSender.java
b/clients/src/test/java/org/apache/kafka/common/network/SslSender.java
index 22196dde13a..928fe0d26d3 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSender.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSender.java
@@ -22,7 +22,6 @@ import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.io.OutputStream;
import java.net.InetSocketAddress;
-import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -68,12 +67,12 @@ public class SslSender extends Thread {
*/
private static class NaiveTrustManager implements X509TrustManager {
@Override
- public void checkClientTrusted(X509Certificate[] x509Certificates,
String s) throws CertificateException {
+ public void checkClientTrusted(X509Certificate[] x509Certificates,
String s) {
//nop
}
@Override
- public void checkServerTrusted(X509Certificate[] x509Certificates,
String s) throws CertificateException {
+ public void checkServerTrusted(X509Certificate[] x509Certificates,
String s) {
//nop
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
index 67212165fc3..f3868407177 100644
---
a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
@@ -205,7 +205,7 @@ public class DefaultRecordTest {
}
@Test
- public void testInvalidValueSizePartial() throws IOException {
+ public void testInvalidValueSizePartial() {
byte attributes = 0;
long timestampDelta = 2;
int offsetDelta = 1;
diff --git
a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 2e9ff330bba..2fa978e10fa 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -431,7 +431,7 @@ public class FileRecordsTest {
}
@Test
- public void testFormatConversionWithNoMessages() throws IOException {
+ public void testFormatConversionWithNoMessages() {
TopicPartition tp = new TopicPartition("topic-1", 0);
LazyDownConversionRecords lazyRecords = new
LazyDownConversionRecords(tp, MemoryRecords.EMPTY, RecordBatch.MAGIC_VALUE_V0,
0, Time.SYSTEM);
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java
b/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java
index 05c1bb84a20..e4f70222d13 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java
@@ -65,7 +65,7 @@ public class JaasContextTest {
@Test
public void testConfigNoOptions() throws Exception {
- checkConfiguration("test.testConfigNoOptions",
LoginModuleControlFlag.REQUIRED, new HashMap<String, Object>());
+ checkConfiguration("test.testConfigNoOptions",
LoginModuleControlFlag.REQUIRED, new HashMap<>());
}
@Test
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerSaslClienCallbackHandlerTest.java
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerSaslClientCallbackHandlerTest.java
similarity index 98%
rename from
clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerSaslClienCallbackHandlerTest.java
rename to
clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerSaslClientCallbackHandlerTest.java
index b13c0f823c8..ae3ef55f21a 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerSaslClienCallbackHandlerTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerSaslClientCallbackHandlerTest.java
@@ -32,7 +32,7 @@ import javax.security.auth.callback.Callback;
import
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler;
import org.junit.jupiter.api.Test;
-public class OAuthBearerSaslClienCallbackHandlerTest {
+public class OAuthBearerSaslClientCallbackHandlerTest {
private static OAuthBearerToken createTokenWithLifetimeMillis(final long
lifetimeMillis) {
return new OAuthBearerToken() {
@Override
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerClientInitialResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerClientInitialResponseTest.java
index 2bf3f841b11..e7204df660a 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerClientInitialResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerClientInitialResponseTest.java
@@ -57,7 +57,7 @@ public class OAuthBearerClientInitialResponseTest {
}
@Test
- public void testThrowsSaslExceptionOnInvalidExtensionKey() throws
Exception {
+ public void testThrowsSaslExceptionOnInvalidExtensionKey() {
Map<String, String> extensions = new HashMap<>();
extensions.put("19", "42"); // keys can only be a-z
assertThrows(SaslException.class, () -> new
OAuthBearerClientInitialResponse("123.345.567", new
SaslExtensions(extensions)));
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java
index 089c9084975..39bd7b89f15 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java
@@ -19,7 +19,6 @@ package
org.apache.kafka.common.security.oauthbearer.internals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertNull;
import java.io.IOException;
@@ -30,7 +29,6 @@ import java.util.Map;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.auth.login.LoginException;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.types.Password;
@@ -103,7 +101,7 @@ public class OAuthBearerSaslServerTest {
byte[] nextChallenge = saslServer
.evaluateResponse(clientInitialResponse(null));
// also asserts that no authentication error is thrown if
OAuthBearerExtensionsValidatorCallback is not supported
- assertTrue(nextChallenge.length == 0, "Next challenge is not empty");
+ assertEquals(0, nextChallenge.length, "Next challenge is not empty");
}
@Test
@@ -127,7 +125,7 @@ public class OAuthBearerSaslServerTest {
byte[] nextChallenge = saslServer
.evaluateResponse(clientInitialResponse(null, false,
customExtensions));
- assertTrue(nextChallenge.length == 0, "Next challenge is not empty");
+ assertEquals(0, nextChallenge.length, "Next challenge is not empty");
assertEquals("value1", saslServer.getNegotiatedProperty("firstKey"));
assertEquals("value2", saslServer.getNegotiatedProperty("secondKey"));
}
@@ -147,7 +145,7 @@ public class OAuthBearerSaslServerTest {
byte[] nextChallenge = saslServer
.evaluateResponse(clientInitialResponse(null, false,
customExtensions));
- assertTrue(nextChallenge.length == 0, "Next challenge is not empty");
+ assertEquals(0, nextChallenge.length, "Next challenge is not empty");
assertNull(saslServer.getNegotiatedProperty("thirdKey"), "Extensions
not recognized by the server must be ignored");
}
@@ -186,7 +184,7 @@ public class OAuthBearerSaslServerTest {
public void authorizatonIdEqualsAuthenticationId() throws Exception {
byte[] nextChallenge = saslServer
.evaluateResponse(clientInitialResponse(USER));
- assertTrue(nextChallenge.length == 0, "Next challenge is not empty");
+ assertEquals(0, nextChallenge.length, "Next challenge is not empty");
}
@Test
@@ -203,7 +201,7 @@ public class OAuthBearerSaslServerTest {
}
private byte[] clientInitialResponse(String authorizationId)
- throws OAuthBearerConfigException, IOException,
UnsupportedCallbackException, LoginException {
+ throws OAuthBearerConfigException, IOException,
UnsupportedCallbackException {
return clientInitialResponse(authorizationId, false);
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java
index 85f6622f090..cd946fa0eb0 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java
@@ -753,7 +753,7 @@ public class ExpiringCredentialRefreshingLoginTest {
int numWaiters) {
List<KafkaFutureImpl<Long>> retvalWaiters = new
ArrayList<>(numWaiters);
for (int i = 1; i <= numWaiters; ++i) {
- KafkaFutureImpl<Long> waiter = new KafkaFutureImpl<Long>();
+ KafkaFutureImpl<Long> waiter = new KafkaFutureImpl<>();
mockScheduler.addWaiter(i * refreshEveryMillis, waiter);
retvalWaiters.add(waiter);
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetrieverTest.java
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetrieverTest.java
index 5086a1dab29..59f4319e106 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetrieverTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetrieverTest.java
@@ -172,17 +172,17 @@ public class HttpAccessTokenRetrieverTest extends
OAuthBearerTest {
}
@Test
- public void testFormatAuthorizationHeader() throws IOException {
+ public void testFormatAuthorizationHeader() {
assertAuthorizationHeader("id", "secret");
}
@Test
- public void testFormatAuthorizationHeaderEncoding() throws IOException {
+ public void testFormatAuthorizationHeaderEncoding() {
// See KAFKA-14496
assertAuthorizationHeader("SOME_RANDOM_LONG_USER_01234",
"9Q|0`8i~ute-n9ksjLWb\\50\"AX@UUED5E");
}
- private void assertAuthorizationHeader(String clientId, String
clientSecret) throws IOException {
+ private void assertAuthorizationHeader(String clientId, String
clientSecret) {
String expected = "Basic " +
Base64.getEncoder().encodeToString(Utils.utf8(clientId + ":" + clientSecret));
String actual =
HttpAccessTokenRetriever.formatAuthorizationHeader(clientId, clientSecret);
assertEquals(expected, actual, String.format("Expected the HTTP
Authorization header generated for client ID \"%s\" and client secret \"%s\" to
match", clientId, clientSecret));
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RetryTest.java
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RetryTest.java
index 803cf10741c..b8d2b019e1f 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RetryTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RetryTest.java
@@ -115,7 +115,7 @@ public class RetryTest extends OAuthBearerTest {
}
@Test
- public void testUseMaxTimeout() throws IOException {
+ public void testUseMaxTimeout() {
Exception[] attempts = new Exception[] {
new IOException("pretend connect error"),
new IOException("pretend timeout error"),
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandlerTest.java
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandlerTest.java
index bc1b6600bc1..0153abcc838 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandlerTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandlerTest.java
@@ -20,7 +20,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
@@ -30,7 +29,6 @@ import java.util.HashMap;
import java.util.Map;
import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.UnsupportedCallbackException;
import org.apache.kafka.common.security.authenticator.TestJaasConfig;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
@@ -85,7 +83,7 @@ public class OAuthBearerUnsecuredValidatorCallbackHandlerTest
{
}
@Test
- public void badOrMissingPrincipal() throws IOException,
UnsupportedCallbackException {
+ public void badOrMissingPrincipal() {
for (boolean exists : new boolean[] {true, false}) {
String claimsJson = "{" + EXPIRATION_TIME_CLAIM_TEXT + (exists ?
comma(BAD_PRINCIPAL_CLAIM_TEXT) : "")
+ "}";
@@ -94,7 +92,7 @@ public class OAuthBearerUnsecuredValidatorCallbackHandlerTest
{
}
@Test
- public void tooEarlyExpirationTime() throws IOException,
UnsupportedCallbackException {
+ public void tooEarlyExpirationTime() {
String claimsJson = "{" + PRINCIPAL_CLAIM_TEXT +
comma(ISSUED_AT_CLAIM_TEXT)
+ comma(TOO_EARLY_EXPIRATION_TIME_CLAIM_TEXT) + "}";
confirmFailsValidation(UNSECURED_JWT_HEADER_JSON, claimsJson,
MODULE_OPTIONS_MAP_NO_SCOPE_REQUIRED);
@@ -110,15 +108,14 @@ public class
OAuthBearerUnsecuredValidatorCallbackHandlerTest {
}
@Test
- public void missingRequiredScope() throws IOException,
UnsupportedCallbackException {
+ public void missingRequiredScope() {
String claimsJson = "{" + SUB_CLAIM_TEXT +
comma(EXPIRATION_TIME_CLAIM_TEXT) + comma(SCOPE_CLAIM_TEXT) + "}";
confirmFailsValidation(UNSECURED_JWT_HEADER_JSON, claimsJson,
MODULE_OPTIONS_MAP_REQUIRE_ADDITIONAL_SCOPE,
"[scope1, scope2]");
}
private static void confirmFailsValidation(String headerJson, String
claimsJson,
- Map<String, String> moduleOptionsMap) throws
OAuthBearerConfigException, OAuthBearerIllegalTokenException,
- IOException, UnsupportedCallbackException {
+ Map<String, String> moduleOptionsMap) throws
OAuthBearerConfigException, OAuthBearerIllegalTokenException {
confirmFailsValidation(headerJson, claimsJson, moduleOptionsMap, null);
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/plain/internals/PlainSaslServerTest.java
b/clients/src/test/java/org/apache/kafka/common/security/plain/internals/PlainSaslServerTest.java
index 77882b6f3fb..6c71f51b40e 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/plain/internals/PlainSaslServerTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/plain/internals/PlainSaslServerTest.java
@@ -54,13 +54,13 @@ public class PlainSaslServerTest {
}
@Test
- public void noAuthorizationIdSpecified() throws Exception {
+ public void noAuthorizationIdSpecified() {
byte[] nextChallenge = saslServer.evaluateResponse(saslMessage("",
USER_A, PASSWORD_A));
assertEquals(0, nextChallenge.length);
}
@Test
- public void authorizatonIdEqualsAuthenticationId() throws Exception {
+ public void authorizatonIdEqualsAuthenticationId() {
byte[] nextChallenge = saslServer.evaluateResponse(saslMessage(USER_A,
USER_A, PASSWORD_A));
assertEquals(0, nextChallenge.length);
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/scram/internals/ScramSaslServerTest.java
b/clients/src/test/java/org/apache/kafka/common/security/scram/internals/ScramSaslServerTest.java
index 121ac5951d7..f291e2d7d9c 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/scram/internals/ScramSaslServerTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/scram/internals/ScramSaslServerTest.java
@@ -48,7 +48,7 @@ public class ScramSaslServerTest {
credentialCache.put(USER_A, formatter.generateCredential("passwordA",
4096));
credentialCache.put(USER_B, formatter.generateCredential("passwordB",
4096));
ScramServerCallbackHandler callbackHandler = new
ScramServerCallbackHandler(credentialCache, new
DelegationTokenCache(ScramMechanism.mechanismNames()));
- saslServer = new ScramSaslServer(mechanism, new HashMap<String,
Object>(), callbackHandler);
+ saslServer = new ScramSaslServer(mechanism, new HashMap<>(),
callbackHandler);
}
@Test
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
index 742631b0231..21dcd6e4b0f 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
@@ -77,7 +77,7 @@ public abstract class SslFactoryTest {
SSLEngine engine = sslFactory.createSslEngine("localhost", 0);
assertNotNull(engine);
assertEquals(Utils.mkSet(tlsProtocol),
Utils.mkSet(engine.getEnabledProtocols()));
- assertEquals(false, engine.getUseClientMode());
+ assertFalse(engine.getUseClientMode());
}
@Test
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestTrustManagerFactory.java
b/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestTrustManagerFactory.java
index 4115a5f8cbd..8e5e584f97c 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestTrustManagerFactory.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestTrustManagerFactory.java
@@ -23,7 +23,6 @@ import javax.net.ssl.TrustManagerFactorySpi;
import javax.net.ssl.X509ExtendedTrustManager;
import java.net.Socket;
import java.security.KeyStore;
-import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
public class TestTrustManagerFactory extends TrustManagerFactorySpi {
@@ -49,12 +48,12 @@ public class TestTrustManagerFactory extends
TrustManagerFactorySpi {
public static final String ALIAS = "TestAlias";
@Override
- public void checkClientTrusted(X509Certificate[] x509Certificates,
String s) throws CertificateException {
+ public void checkClientTrusted(X509Certificate[] x509Certificates,
String s) {
}
@Override
- public void checkServerTrusted(X509Certificate[] x509Certificates,
String s) throws CertificateException {
+ public void checkServerTrusted(X509Certificate[] x509Certificates,
String s) {
}
@@ -64,22 +63,22 @@ public class TestTrustManagerFactory extends
TrustManagerFactorySpi {
}
@Override
- public void checkClientTrusted(X509Certificate[] x509Certificates,
String s, Socket socket) throws CertificateException {
+ public void checkClientTrusted(X509Certificate[] x509Certificates,
String s, Socket socket) {
}
@Override
- public void checkServerTrusted(X509Certificate[] x509Certificates,
String s, Socket socket) throws CertificateException {
+ public void checkServerTrusted(X509Certificate[] x509Certificates,
String s, Socket socket) {
}
@Override
- public void checkClientTrusted(X509Certificate[] x509Certificates,
String s, SSLEngine sslEngine) throws CertificateException {
+ public void checkClientTrusted(X509Certificate[] x509Certificates,
String s, SSLEngine sslEngine) {
}
@Override
- public void checkServerTrusted(X509Certificate[] x509Certificates,
String s, SSLEngine sslEngine) throws CertificateException {
+ public void checkServerTrusted(X509Certificate[] x509Certificates,
String s, SSLEngine sslEngine) {
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
index a0b67a03d42..844ce500667 100644
---
a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
@@ -360,12 +360,12 @@ public class SerializationTest {
}
private Serde<String> getStringSerde(String encoder) {
- Map<String, Object> serializerConfigs = new HashMap<String, Object>();
+ Map<String, Object> serializerConfigs = new HashMap<>();
serializerConfigs.put("key.serializer.encoding", encoder);
Serializer<String> serializer = Serdes.String().serializer();
serializer.configure(serializerConfigs, true);
- Map<String, Object> deserializerConfigs = new HashMap<String,
Object>();
+ Map<String, Object> deserializerConfigs = new HashMap<>();
deserializerConfigs.put("key.deserializer.encoding", encoder);
Deserializer<String> deserializer = Serdes.String().deserializer();
deserializer.configure(deserializerConfigs, true);
diff --git
a/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java
b/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java
index e100d24ba42..939b2f6daa2 100644
---
a/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java
@@ -34,10 +34,10 @@ public class AbstractIteratorTest {
@Test
public void testIterator() {
int max = 10;
- List<Integer> l = new ArrayList<Integer>();
+ List<Integer> l = new ArrayList<>();
for (int i = 0; i < max; i++)
l.add(i);
- ListIterator<Integer> iter = new ListIterator<Integer>(l);
+ ListIterator<Integer> iter = new ListIterator<>(l);
for (int i = 0; i < max; i++) {
Integer value = i;
assertEquals(value, iter.peek());
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ExitTest.java
b/clients/src/test/java/org/apache/kafka/common/utils/ExitTest.java
index 8adcca3084f..b345c5c6fbe 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/ExitTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/ExitTest.java
@@ -34,7 +34,7 @@ public class ExitTest {
});
try {
int statusCode = 0;
- String message = "mesaage";
+ String message = "message";
Exit.halt(statusCode);
Exit.halt(statusCode, message);
assertEquals(Arrays.asList(statusCode, null, statusCode, message),
list);
@@ -52,7 +52,7 @@ public class ExitTest {
});
try {
int statusCode = 0;
- String message = "mesaage";
+ String message = "message";
Exit.exit(statusCode);
Exit.exit(statusCode, message);
assertEquals(Arrays.asList(statusCode, null, statusCode, message),
list);
diff --git
a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java
b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java
index 3c12c989f4d..d62595a2ae4 100644
---
a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java
@@ -576,7 +576,7 @@ public class ImplicitLinkedHashCollectionTest {
assertFalse(coll.add(new TestElement(1, 2)));
TestElement element2 = new TestElement(1, 2);
TestElement element1 = coll.find(element2);
- assertFalse(element2.equals(element1));
+ assertNotEquals(element2, element1);
assertTrue(element2.elementKeysAreEqual(element1));
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
b/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
index 78a9060c404..d6edd914e18 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
@@ -87,32 +87,26 @@ public class MockScheduler implements Scheduler,
MockTime.Listener {
final Callable<T> callable, long delayMs) {
final KafkaFutureImpl<T> future = new KafkaFutureImpl<>();
KafkaFutureImpl<Long> waiter = new KafkaFutureImpl<>();
- waiter.thenApply(new KafkaFuture.BaseFunction<Long, Void>() {
- @Override
- public Void apply(final Long now) {
- executor.submit(new Callable<Void>() {
- @Override
- public Void call() {
- // Note: it is possible that we'll execute
Callable#call right after
- // the future is cancelled. This is a valid sequence
of events
- // that the author of the Callable needs to be able to
handle.
- //
- // Note 2: If the future is cancelled, we will not
remove the waiter
- // from this MockTime object. This small bit of
inefficiency is acceptable
- // in testing code (at least we aren't polling!)
- if (!future.isCancelled()) {
- try {
- log.trace("Invoking {} at {}", callable, now);
- future.complete(callable.call());
- } catch (Throwable throwable) {
- future.completeExceptionally(throwable);
- }
- }
- return null;
+ waiter.thenApply((KafkaFuture.BaseFunction<Long, Void>) now -> {
+ executor.submit((Callable<Void>) () -> {
+ // Note: it is possible that we'll execute Callable#call right
after
+ // the future is cancelled. This is a valid sequence of events
+ // that the author of the Callable needs to be able to handle.
+ //
+ // Note 2: If the future is cancelled, we will not remove the
waiter
+ // from this MockTime object. This small bit of inefficiency
is acceptable
+ // in testing code (at least we aren't polling!)
+ if (!future.isCancelled()) {
+ try {
+ log.trace("Invoking {} at {}", callable, now);
+ future.complete(callable.call());
+ } catch (Throwable throwable) {
+ future.completeExceptionally(throwable);
}
- });
+ }
return null;
- }
+ });
+ return null;
});
log.trace("Scheduling {} for {} ms from now.", callable, delayMs);
addWaiter(delayMs, waiter);
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index 50e6f179f61..2366da33662 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -71,6 +71,7 @@ import static
org.apache.kafka.common.utils.Utils.validHostPattern;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -895,7 +896,7 @@ public class UtilsTest {
if (a == null) {
assertNotNull(b);
} else {
- assertFalse(a.equals(b));
+ assertNotEquals(a, b);
}
assertFalse(Utils.isEqualConstantTime(first, second));
assertFalse(Utils.isEqualConstantTime(second, first));
@@ -907,7 +908,7 @@ public class UtilsTest {
if (a == null) {
assertNull(b);
} else {
- assertTrue(a.equals(b));
+ assertEquals(a, b);
}
assertTrue(Utils.isEqualConstantTime(first, second));
assertTrue(Utils.isEqualConstantTime(second, first));
diff --git a/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
b/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
index cfb5f6ca196..1d2b1c54e4e 100644
--- a/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
+++ b/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
@@ -149,20 +149,20 @@ public class Microbenchmarks {
t3.join();
t4.join();
- Map<String, Integer> values = new HashMap<String, Integer>();
+ Map<String, Integer> values = new HashMap<>();
for (int i = 0; i < 100; i++)
values.put(Integer.toString(i), i);
System.out.println("HashMap:");
benchMap(2, 1000000, values);
System.out.println("ConcurentHashMap:");
- benchMap(2, 1000000, new ConcurrentHashMap<String, Integer>(values));
+ benchMap(2, 1000000, new ConcurrentHashMap<>(values));
System.out.println("CopyOnWriteMap:");
- benchMap(2, 1000000, new CopyOnWriteMap<String, Integer>(values));
+ benchMap(2, 1000000, new CopyOnWriteMap<>(values));
}
private static void benchMap(int numThreads, final int iters, final
Map<String, Integer> map) throws Exception {
- final List<String> keys = new ArrayList<String>(map.keySet());
- final List<Thread> threads = new ArrayList<Thread>();
+ final List<String> keys = new ArrayList<>(map.keySet());
+ final List<Thread> threads = new ArrayList<>();
for (int i = 0; i < numThreads; i++) {
threads.add(new Thread() {
public void run() {
diff --git a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
index 62f53d7cdee..6d006ea9c9e 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
@@ -330,7 +330,7 @@ public class TestSslUtils {
static String pem(Certificate cert) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
- try (PemWriter pemWriter = new PemWriter(new OutputStreamWriter(out,
StandardCharsets.UTF_8.name()))) {
+ try (PemWriter pemWriter = new PemWriter(new OutputStreamWriter(out,
StandardCharsets.UTF_8))) {
pemWriter.writeObject(new JcaMiscPEMGenerator(cert));
}
return new String(out.toByteArray(), StandardCharsets.UTF_8);
@@ -338,7 +338,7 @@ public class TestSslUtils {
static String pem(PrivateKey privateKey, Password password) throws
IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
- try (PemWriter pemWriter = new PemWriter(new OutputStreamWriter(out,
StandardCharsets.UTF_8.name()))) {
+ try (PemWriter pemWriter = new PemWriter(new OutputStreamWriter(out,
StandardCharsets.UTF_8))) {
if (password == null) {
pemWriter.writeObject(new JcaPKCS8Generator(privateKey, null));
} else {