This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 624cd4f7d0b MINOR: Various cleanups in connect:runtime tests (#17827)
624cd4f7d0b is described below

commit 624cd4f7d0bd17218b2c70c87ba89dc4312ad900
Author: Mickael Maison <[email protected]>
AuthorDate: Tue Nov 19 08:55:54 2024 +0100

    MINOR: Various cleanups in connect:runtime tests (#17827)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../ConnectorClientPolicyIntegrationTest.java      | 13 ++---
 .../ConnectorRestartApiIntegrationTest.java        |  8 +--
 .../integration/ErrorHandlingIntegrationTest.java  |  1 -
 .../connect/integration/StartAndStopCounter.java   | 13 -----
 .../integration/TransformationIntegrationTest.java |  4 --
 .../kafka/connect/runtime/AbstractHerderTest.java  |  2 +-
 .../connect/runtime/SourceConnectorConfigTest.java |  2 +-
 .../connect/runtime/SubmittedRecordsTest.java      |  4 +-
 .../kafka/connect/runtime/WorkerSinkTaskTest.java  |  2 +-
 .../runtime/WorkerSinkTaskThreadedTest.java        |  5 +-
 .../IncrementalCooperativeAssignorTest.java        |  3 +-
 .../errors/RetryWithToleranceOperatorTest.java     |  2 +-
 .../connect/runtime/isolation/PluginUtilsTest.java |  1 -
 .../connect/runtime/isolation/PluginsTest.java     |  3 +-
 .../runtime/isolation/SynchronizationTest.java     |  1 -
 .../runtime/rest/ConnectRestServerTest.java        |  4 +-
 .../kafka/connect/runtime/rest/RestClientTest.java | 24 ++++-----
 .../resources/ConnectorPluginsResourceTest.java    |  8 +--
 .../rest/resources/ConnectorsResourceTest.java     |  2 +-
 .../runtime/standalone/StandaloneConfigTest.java   |  4 +-
 .../storage/KafkaConfigBackingStoreTest.java       | 36 ++++++-------
 .../kafka/connect/util/KafkaBasedLogTest.java      |  2 +-
 .../kafka/connect/util/SharedTopicAdminTest.java   |  2 +-
 .../util/TestBackgroundThreadExceptionHandler.java | 36 -------------
 .../org/apache/kafka/connect/util/TestFuture.java  | 61 ----------------------
 .../apache/kafka/connect/util/TopicAdminTest.java  |  9 +---
 .../connect/util/clusters/EmbeddedConnect.java     |  2 +-
 .../clusters/EmbeddedConnectClusterAssertions.java | 29 ----------
 .../kafka/connect/util/clusters/WorkerHandle.java  |  2 +-
 29 files changed, 59 insertions(+), 226 deletions(-)

diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java
index a127f85b12d..ef55e0b3258 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java
@@ -25,7 +25,6 @@ import 
org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
 import org.apache.kafka.connect.storage.StringConverter;
 import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
 
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 
@@ -49,19 +48,15 @@ public class ConnectorClientPolicyIntegrationTest {
     private static final int NUM_WORKERS = 1;
     private static final String CONNECTOR_NAME = "simple-conn";
 
-    @AfterEach
-    public void close() {
-    }
-
     @Test
-    public void testCreateWithOverridesForNonePolicy() throws Exception {
+    public void testCreateWithOverridesForNonePolicy() {
         Map<String, String> props = basicConnectorConfig();
         props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + 
SaslConfigs.SASL_JAAS_CONFIG, "sasl");
         assertFailCreateConnector("None", props);
     }
 
     @Test
-    public void testCreateWithNotAllowedOverridesForPrincipalPolicy() throws 
Exception {
+    public void testCreateWithNotAllowedOverridesForPrincipalPolicy() {
         Map<String, String> props = basicConnectorConfig();
         props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + 
SaslConfigs.SASL_JAAS_CONFIG, "sasl");
         props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + 
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
@@ -98,7 +93,7 @@ public class ConnectorClientPolicyIntegrationTest {
         assertPassCreateConnector(null, props);
     }
 
-    private EmbeddedConnectCluster connectClusterWithPolicy(String policy) 
throws InterruptedException {
+    private EmbeddedConnectCluster connectClusterWithPolicy(String policy) {
         // setup Connect worker properties
         Map<String, String> workerProps = new HashMap<>();
         workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, 
String.valueOf(5_000));
@@ -125,7 +120,7 @@ public class ConnectorClientPolicyIntegrationTest {
         return connect;
     }
 
-    private void assertFailCreateConnector(String policy, Map<String, String> 
props) throws InterruptedException {
+    private void assertFailCreateConnector(String policy, Map<String, String> 
props) {
         EmbeddedConnectCluster connect = connectClusterWithPolicy(policy);
         try {
             connect.configureConnector(CONNECTOR_NAME, props);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java
index 1c398a22396..a95352dbdbf 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java
@@ -86,7 +86,7 @@ public class ConnectorRestartApiIntegrationTest {
         connectorHandle = RuntimeHandles.get().connectorHandle(connectorName);
     }
 
-    private void startOrReuseConnectWithNumWorkers(int numWorkers) throws 
Exception {
+    private void startOrReuseConnectWithNumWorkers(int numWorkers) {
         connect = CONNECT_CLUSTERS.computeIfAbsent(numWorkers, n -> {
             // setup Connect worker properties
             Map<String, String> workerProps = new HashMap<>();
@@ -123,7 +123,7 @@ public class ConnectorRestartApiIntegrationTest {
     }
 
     @Test
-    public void testRestartUnknownConnectorNoParams() throws Exception {
+    public void testRestartUnknownConnectorNoParams() {
         String connectorName = "Unknown";
 
         // build a Connect cluster backed by a Kafka KRaft cluster
@@ -137,14 +137,14 @@ public class ConnectorRestartApiIntegrationTest {
     }
 
     @Test
-    public void testRestartUnknownConnector() throws Exception {
+    public void testRestartUnknownConnector() {
         restartUnknownConnector(false, false);
         restartUnknownConnector(false, true);
         restartUnknownConnector(true, false);
         restartUnknownConnector(true, true);
     }
 
-    private void restartUnknownConnector(boolean onlyFailed, boolean 
includeTasks) throws Exception {
+    private void restartUnknownConnector(boolean onlyFailed, boolean 
includeTasks) {
         String connectorName = "Unknown";
 
         // build a Connect cluster backed by a Kafka KRaft cluster
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
index 256629f4b11..6f386267e21 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
@@ -74,7 +74,6 @@ import static org.junit.jupiter.api.Assertions.fail;
 public class ErrorHandlingIntegrationTest {
     
     private static final Logger log = 
LoggerFactory.getLogger(ErrorHandlingIntegrationTest.class);
-    private static final int NUM_WORKERS = 1;
     private static final String DLQ_TOPIC = "my-connector-errors";
     private static final String CONNECTOR_NAME = "error-conn";
     private static final String TASK_ID = "error-conn-0";
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopCounter.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopCounter.java
index 25911896430..42b660c0182 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopCounter.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopCounter.java
@@ -115,19 +115,6 @@ public class StartAndStopCounter {
         return expectedRestarts(expectedRestarts, expectedRestarts);
     }
 
-    /**
-     * Obtain a {@link StartAndStopLatch} that can be used to wait until the 
expected number of restarts
-     * has been completed.
-     *
-     * @param expectedRestarts the expected number of restarts
-     * @param dependents       any dependent latches that must also complete 
in order for the
-     *                         resulting latch to complete
-     * @return the latch; never null
-     */
-    public StartAndStopLatch expectedRestarts(int expectedRestarts, 
List<StartAndStopLatch> dependents) {
-        return expectedRestarts(expectedRestarts, expectedRestarts, 
dependents);
-    }
-
     /**
      * Obtain a {@link StartAndStopLatch} that can be used to wait until the 
expected number of starts
      * has been completed.
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
index 105d238d56f..50c7d829a4a 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
@@ -28,8 +28,6 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -58,8 +56,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 @Tag("integration")
 public class TransformationIntegrationTest {
 
-    private static final Logger log = 
LoggerFactory.getLogger(TransformationIntegrationTest.class);
-
     private static final int NUM_RECORDS_PRODUCED = 2000;
     private static final int NUM_TOPIC_PARTITIONS = 3;
     private static final long RECORD_TRANSFER_DURATION_MS = 
TimeUnit.SECONDS.toMillis(30);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 420cf5e745c..01f847d94f6 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -1232,7 +1232,7 @@ public class AbstractHerderTest {
 
     private void testConfigProviderRegex(String rawConnConfig, boolean 
expected) {
         Set<String> keys = 
keysWithVariableValues(Collections.singletonMap("key", rawConnConfig), 
ConfigTransformer.DEFAULT_PATTERN);
-        boolean actual = keys != null && !keys.isEmpty() && 
keys.contains("key");
+        boolean actual = !keys.isEmpty() && keys.contains("key");
         assertEquals(expected, actual, String.format("%s should have matched 
regex", rawConnConfig));
     }
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceConnectorConfigTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceConnectorConfigTest.java
index e8e8fbcbdd8..cb39ac42ebc 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceConnectorConfigTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceConnectorConfigTest.java
@@ -73,7 +73,7 @@ public class SourceConnectorConfigTest {
             TOPIC_CREATION_GROUP_1, TOPIC_CREATION_GROUP_2));
         props.put(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG, 
"1");
         props.put(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG, "1");
-        SourceConnectorConfig config = new SourceConnectorConfig(MOCK_PLUGINS, 
props, true);
+        new SourceConnectorConfig(MOCK_PLUGINS, props, true);
     }
 
     @Test
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java
index 8feeee0588a..6b8368e002c 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java
@@ -342,7 +342,7 @@ public class SubmittedRecordsTest {
     }
 
     @SafeVarargs
-    private final void assertRemovedDeques(Map<String, ?>... partitions) {
+    private void assertRemovedDeques(Map<String, ?>... partitions) {
         for (Map<String, ?> partition : partitions) {
             assertFalse(submittedRecords.records.containsKey(partition), 
"Deque for partition " + partition + " should have been cleaned up from 
internal records map");
         }
@@ -365,7 +365,7 @@ public class SubmittedRecordsTest {
 
     @SafeVarargs
     @SuppressWarnings("varargs")
-    private final void assertMetadata(
+    private void assertMetadata(
             CommittableOffsets committableOffsets,
             int committableMessages,
             int uncommittableMessages,
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index b3d197acc06..edad9a250f0 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -1760,7 +1760,7 @@ public class WorkerSinkTaskTest {
                 transformationChain, mockConsumer, pluginLoader, time,
                 RetryWithToleranceOperatorTest.noopOperator(), null, 
statusBackingStore, Collections::emptyList);
         mockConsumer.updateBeginningOffsets(
-                new HashMap<TopicPartition, Long>() {{
+                new HashMap<>() {{
                     put(TOPIC_PARTITION, 0L);
                     put(TOPIC_PARTITION2, 0L);
                 }}
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index a41ce37c356..67978760e7b 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -641,7 +641,7 @@ public class WorkerSinkTaskThreadedTest {
     }
 
     private void expectPreCommit(ExpectOffsetCommitCommand... commands) {
-        doAnswer(new Answer<Object>() {
+        doAnswer(new Answer<>() {
             int index = 0;
 
             @Override
@@ -662,7 +662,7 @@ public class WorkerSinkTaskThreadedTest {
     }
     
     private void expectOffsetCommit(ExpectOffsetCommitCommand... commands) {
-        doAnswer(new Answer<Object>() {
+        doAnswer(new Answer<>() {
             int index = 0;
 
             @Override
@@ -718,7 +718,6 @@ public class WorkerSinkTaskThreadedTest {
     private abstract static class TestSinkTask extends SinkTask {
     }
 
-    @SuppressWarnings("NewClassNamingConvention")
     private static class ExpectOffsetCommitCommand {
         final long expectedMessages;
         final RuntimeException error;
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
index b59c1863a91..86bc897fafe 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
@@ -923,8 +923,7 @@ public class IncrementalCooperativeAssignorTest {
         assignor.handleLostAssignments(lostAssignments, new 
ConnectorsAndTasks.Builder(),
                 new ArrayList<>(configuredAssignment.values()));
 
-        Set<String> expectedWorkers = new HashSet<>();
-        expectedWorkers.addAll(Arrays.asList(newWorker, flakyWorker));
+        Set<String> expectedWorkers = new HashSet<>(Arrays.asList(newWorker, 
flakyWorker));
         assertEquals(expectedWorkers,
             assignor.candidateWorkersForReassignment,
             "Wrong set of workers for reassignments");
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
index 68931a8a993..dfa4aa353fe 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
@@ -87,7 +87,7 @@ import static org.mockito.Mockito.when;
 @MockitoSettings(strictness = Strictness.STRICT_STUBS)
 public class RetryWithToleranceOperatorTest {
 
-    private static final Map<String, String> PROPERTIES = new HashMap<String, 
String>() {{
+    private static final Map<String, String> PROPERTIES = new HashMap<>() {{
             put(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG, 
Objects.toString(2));
             put(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG, 
Objects.toString(3000));
             put(CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG, 
Sensor.RecordingLevel.INFO.toString());
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
index 30babb1bb65..23041f9c319 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
@@ -518,7 +518,6 @@ public class PluginUtilsTest {
                 Collections.emptySortedSet(),
                 Collections.emptySortedSet()
         );
-        Map<String, String> aliases = PluginUtils.computeAliases(result);
         Map<String, String> actualAliases = PluginUtils.computeAliases(result);
         Map<String, String> expectedAliases = new HashMap<>();
         expectedAliases.put("MockSinkConnector", 
MockSinkConnector.class.getName());
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
index d1c723852bc..55a3445a331 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -186,8 +186,7 @@ public class PluginsTest {
         assertNotNull(connectRestExtensions);
         assertEquals(1, connectRestExtensions.size(), "One Rest Extension 
expected");
         assertNotNull(connectRestExtensions.get(0));
-        assertTrue(connectRestExtensions.get(0) instanceof 
TestConnectRestExtension,
-            "Should be instance of TestConnectRestExtension");
+        assertInstanceOf(TestConnectRestExtension.class, 
connectRestExtensions.get(0), "Should be instance of TestConnectRestExtension");
         assertNotNull(((TestConnectRestExtension) 
connectRestExtensions.get(0)).configs);
         assertEquals(config.originals(),
                      ((TestConnectRestExtension) 
connectRestExtensions.get(0)).configs);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
index 70b875b21b8..1f3d42fd3dc 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
@@ -456,7 +456,6 @@ public class SynchronizationTest {
         }
     }
 
-    @SuppressWarnings("removal")
     private static ThreadFactory threadFactoryWithNamedThreads(String 
threadPrefix) {
         AtomicInteger threadNumber = new AtomicInteger(1);
         return r -> {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java
index 8b69b1c37b2..9a686aa81fd 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java
@@ -184,7 +184,7 @@ public class ConnectRestServerTest {
         response.getEntity().writeTo(baos);
         assertArrayEquals(
             request.getAllowedMethods(response).toArray(),
-            new String(baos.toByteArray(), StandardCharsets.UTF_8).split(", ")
+            baos.toString(StandardCharsets.UTF_8).split(", ")
         );
     }
 
@@ -281,7 +281,7 @@ public class ConnectRestServerTest {
         expectedLogger.put("level", loggingLevel);
         expectedLogger.put("last_modified", lastModified);
         Map<String, Map<String, Object>> expectedLoggers = 
Collections.singletonMap(logger, expectedLogger);
-        Map<String, Map<String, Object>> actualLoggers = 
mapper.readValue(responseStr, new TypeReference<Map<String, Map<String, 
Object>>>() { });
+        Map<String, Map<String, Object>> actualLoggers = 
mapper.readValue(responseStr, new TypeReference<>() { });
         assertEquals(expectedLoggers, actualLoggers);
     }
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java
index 5250fc28f0d..75e224cfa32 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java
@@ -69,7 +69,7 @@ public class RestClientTest {
     private static final String MOCK_URL = 
"http://localhost:1234/api/endpoint";;
     private static final String TEST_METHOD = "GET";
     private static final TestDTO TEST_DTO = new TestDTO("requestBodyData");
-    private static final TypeReference<TestDTO> TEST_TYPE = new 
TypeReference<TestDTO>() { };
+    private static final TypeReference<TestDTO> TEST_TYPE = new 
TypeReference<>() { };
     private static final SecretKey MOCK_SECRET_KEY = getMockSecretKey();
     private static final String TEST_SIGNATURE_ALGORITHM = "HmacSHA1";
 
@@ -138,17 +138,15 @@ public class RestClientTest {
     @Test
     public void testNullUrl() {
         RestClient client = spy(new RestClient(null));
-        assertThrows(NullPointerException.class, () -> {
-            client.httpRequest(
-                    null,
-                    TEST_METHOD,
-                    null,
-                    TEST_DTO,
-                    TEST_TYPE,
-                    MOCK_SECRET_KEY,
-                    TEST_SIGNATURE_ALGORITHM
-            );
-        });
+        assertThrows(NullPointerException.class, () -> client.httpRequest(
+                null,
+                TEST_METHOD,
+                null,
+                TEST_DTO,
+                TEST_TYPE,
+                MOCK_SECRET_KEY,
+                TEST_SIGNATURE_ALGORITHM
+        ));
     }
 
     @Test
@@ -233,7 +231,7 @@ public class RestClientTest {
         when(resp.getContentAsString()).thenReturn(toJsonString(TEST_DTO));
         setupHttpClient(statusCode, req, resp);
 
-        TypeReference<Void> voidResponse = new TypeReference<Void>() { };
+        TypeReference<Void> voidResponse = new TypeReference<>() { };
         RestClient.HttpResponse<Void> httpResp = httpRequest(
                 httpClient, MOCK_URL, TEST_METHOD, voidResponse, 
TEST_SIGNATURE_ALGORITHM
         );
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index 7773618ce25..eb34d619d5c 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -159,22 +159,18 @@ public class ConnectorPluginsResourceTest {
             PREDICATE_PLUGINS.add(new PluginDesc<>(HasHeaderKey.class, 
appVersion, PluginType.PREDICATE, classLoader));
             PREDICATE_PLUGINS.add(new PluginDesc<>(RecordIsTombstone.class, 
appVersion, PluginType.PREDICATE, classLoader));
         } catch (Exception e) {
-            e.printStackTrace();
             fail("Failed setting up plugins");
         }
     }
 
     static {
-        List<ConfigInfo> configs = new LinkedList<>();
-        List<ConfigInfo> partialConfigs = new LinkedList<>();
-
         ConfigDef connectorConfigDef = ConnectorConfig.configDef();
         List<ConfigValue> connectorConfigValues = 
connectorConfigDef.validate(PROPS);
         List<ConfigValue> partialConnectorConfigValues = 
connectorConfigDef.validate(PARTIAL_PROPS);
         ConfigInfos result = 
AbstractHerder.generateResult(ConnectorPluginsResourceTestConnector.class.getName(),
 connectorConfigDef.configKeys(), connectorConfigValues, 
Collections.emptyList());
         ConfigInfos partialResult = 
AbstractHerder.generateResult(ConnectorPluginsResourceTestConnector.class.getName(),
 connectorConfigDef.configKeys(), partialConnectorConfigValues, 
Collections.emptyList());
-        configs.addAll(result.values());
-        partialConfigs.addAll(partialResult.values());
+        List<ConfigInfo> configs = new LinkedList<>(result.values());
+        List<ConfigInfo> partialConfigs = new 
LinkedList<>(partialResult.values());
 
         ConfigKeyInfo configKeyInfo = new ConfigKeyInfo("test.string.config", 
"STRING", true, null, "HIGH", "Test configuration for string type.", null, -1, 
"NONE", "test.string.config", Collections.emptyList());
         ConfigValueInfo configValueInfo = new 
ConfigValueInfo("test.string.config", "testString", Collections.emptyList(), 
Collections.emptyList(), true);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index 6357f731fa4..d4675b51df5 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -629,7 +629,7 @@ public class ConnectorsResourceTest {
     }
 
     @Test
-    public void testPatchConnectorConfigNotFound() throws Throwable {
+    public void testPatchConnectorConfigNotFound() {
         final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = 
ArgumentCaptor.forClass(Callback.class);
         expectAndCallbackException(cb, new NotFoundException("Connector " + 
CONNECTOR_NAME + " not found"))
                 .when(herder).patchConnectorConfig(eq(CONNECTOR_NAME), 
eq(CONNECTOR_CONFIG_PATCH), cb.capture());
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfigTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfigTest.java
index 1630fbf19fa..4d8c25932fe 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfigTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfigTest.java
@@ -34,7 +34,7 @@ public class StandaloneConfigTest {
     private static final String HTTPS_LISTENER_PREFIX = "listeners.https.";
 
     private Map<String, Object> sslProps() {
-        return new HashMap<String, Object>() {
+        return new HashMap<>() {
             {
                 put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, new 
Password("ssl_key_password"));
                 put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "ssl_keystore");
@@ -46,7 +46,7 @@ public class StandaloneConfigTest {
     }
 
     private Map<String, String> baseWorkerProps() {
-        return new HashMap<String, String>() {
+        return new HashMap<>() {
             {
                 put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.json.JsonConverter");
                 put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.json.JsonConverter");
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index 945b88cce6c..d0968db29ce 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -310,7 +310,7 @@ public class KafkaConfigBackingStoreTest {
         
doAnswer(expectReadToEnd(Collections.singletonMap(CONNECTOR_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(0))))
                 
.doAnswer(expectReadToEnd(Collections.singletonMap(CONNECTOR_CONFIG_KEYS.get(1),
 CONFIGS_SERIALIZED.get(1))))
                 // Config deletion
-                .doAnswer(expectReadToEnd(new LinkedHashMap<String, byte[]>() 
{{
+                .doAnswer(expectReadToEnd(new LinkedHashMap<>() {{
                             put(configKey, null);
                             put(targetStateKey, null);
                         }})
@@ -363,7 +363,7 @@ public class KafkaConfigBackingStoreTest {
     }
 
     @Test
-    public void testPutConnectorConfigWithTargetState() throws Exception {
+    public void testPutConnectorConfigWithTargetState() {
         when(configLog.partitionCount()).thenReturn(1);
 
         configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
@@ -376,7 +376,7 @@ public class KafkaConfigBackingStoreTest {
         assertNull(configState.connectorConfig(CONNECTOR_IDS.get(0)));
         assertNull(configState.targetState(CONNECTOR_IDS.get(0)));
 
-        doAnswer(expectReadToEnd(new LinkedHashMap<String, byte[]>() {{
+        doAnswer(expectReadToEnd(new LinkedHashMap<>() {{
                     put(TARGET_STATE_KEYS.get(0), 
TARGET_STATES_SERIALIZED.get(2));
                     put(CONNECTOR_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(0));
                 }})
@@ -1028,7 +1028,7 @@ public class KafkaConfigBackingStoreTest {
     }
 
     @Test
-    public void testPutTaskConfigsDoesNotResolveAllInconsistencies() throws 
Exception {
+    public void testPutTaskConfigsDoesNotResolveAllInconsistencies() {
         // Test a case where a failure and compaction has left us in an 
inconsistent state when reading the log.
         // We start out by loading an initial configuration where we started 
to write a task update, and then
         // compaction cleaned up the earlier record.
@@ -1105,18 +1105,18 @@ public class KafkaConfigBackingStoreTest {
     }
 
     @Test
-    public void testPutRestartRequestOnlyFailed() throws Exception {
+    public void testPutRestartRequestOnlyFailed() {
         RestartRequest restartRequest = new 
RestartRequest(CONNECTOR_IDS.get(0), true, false);
         testPutRestartRequest(restartRequest);
     }
 
     @Test
-    public void testPutRestartRequestOnlyFailedIncludingTasks() throws 
Exception {
+    public void testPutRestartRequestOnlyFailedIncludingTasks() {
         RestartRequest restartRequest = new 
RestartRequest(CONNECTOR_IDS.get(0), true, true);
         testPutRestartRequest(restartRequest);
     }
 
-    private void testPutRestartRequest(RestartRequest restartRequest) throws 
Exception {
+    private void testPutRestartRequest(RestartRequest restartRequest) {
         expectStart(Collections.emptyList(), Collections.emptyMap());
         when(configLog.partitionCount()).thenReturn(1);
 
@@ -1191,7 +1191,7 @@ public class KafkaConfigBackingStoreTest {
     }
 
     @Test
-    public void testPutTaskConfigsZeroTasks() throws Exception {
+    public void testPutTaskConfigsZeroTasks() {
         configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
         verifyConfigure();
         configStorage.start();
@@ -1290,7 +1290,7 @@ public class KafkaConfigBackingStoreTest {
     }
 
     @Test
-    public void testSameTargetState() throws Exception {
+    public void testSameTargetState() {
         // verify that we handle target state changes correctly when they come 
up through the log
         List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
                 new ConsumerRecord<>(TOPIC, 0, 0, 0L, 
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
@@ -1391,7 +1391,7 @@ public class KafkaConfigBackingStoreTest {
     }
 
     @Test
-    public void testTaskCountRecordsAndGenerations() throws Exception {
+    public void testTaskCountRecordsAndGenerations() {
         configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
         verifyConfigure();
         configStorage.start();
@@ -1408,7 +1408,7 @@ public class KafkaConfigBackingStoreTest {
         doAnswer(expectReadToEnd(new LinkedHashMap<>()))
                 .doAnswer(expectReadToEnd(new LinkedHashMap<>()))
                 .doAnswer(expectReadToEnd(serializedConfigs))
-                .doAnswer(expectReadToEnd(new LinkedHashMap<String, byte[]>() 
{{
+                .doAnswer(expectReadToEnd(new LinkedHashMap<>() {{
                             put(CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0), 
CONFIGS_SERIALIZED.get(3));
                         }})
                 )
@@ -1467,7 +1467,7 @@ public class KafkaConfigBackingStoreTest {
     }
 
     @Test
-    public void testPutTaskConfigs() throws Exception {
+    public void testPutTaskConfigs() {
         configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
         verifyConfigure();
         configStorage.start();
@@ -1475,7 +1475,7 @@ public class KafkaConfigBackingStoreTest {
 
         doAnswer(expectReadToEnd(new LinkedHashMap<>()))
                 .doAnswer(expectReadToEnd(new LinkedHashMap<>()))
-                .doAnswer(expectReadToEnd(new LinkedHashMap<String, byte[]>() 
{{
+                .doAnswer(expectReadToEnd(new LinkedHashMap<>() {{
                         put(TASK_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(0));
                         put(TASK_CONFIG_KEYS.get(1), 
CONFIGS_SERIALIZED.get(1));
                         put(COMMIT_TASKS_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(2));
@@ -1525,7 +1525,7 @@ public class KafkaConfigBackingStoreTest {
     }
 
     @Test
-    public void testPutTaskConfigsStartsOnlyReconfiguredTasks() throws 
Exception {
+    public void testPutTaskConfigsStartsOnlyReconfiguredTasks() {
         configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
         verifyConfigure();
         configStorage.start();
@@ -1533,7 +1533,7 @@ public class KafkaConfigBackingStoreTest {
 
         doAnswer(expectReadToEnd(new LinkedHashMap<>()))
                 .doAnswer(expectReadToEnd(new LinkedHashMap<>()))
-                .doAnswer(expectReadToEnd(new LinkedHashMap<String, byte[]>() 
{{
+                .doAnswer(expectReadToEnd(new LinkedHashMap<>() {{
                             put(TASK_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(0));
                             put(TASK_CONFIG_KEYS.get(1), 
CONFIGS_SERIALIZED.get(1));
                             put(COMMIT_TASKS_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(2));
@@ -1541,7 +1541,7 @@ public class KafkaConfigBackingStoreTest {
                 )
                 .doAnswer(expectReadToEnd(new LinkedHashMap<>()))
                 .doAnswer(expectReadToEnd(new LinkedHashMap<>()))
-                .doAnswer(expectReadToEnd(new LinkedHashMap<String, byte[]>() 
{{
+                .doAnswer(expectReadToEnd(new LinkedHashMap<>() {{
                             put(TASK_CONFIG_KEYS.get(2), 
CONFIGS_SERIALIZED.get(3));
                             put(COMMIT_TASKS_CONFIG_KEYS.get(1), 
CONFIGS_SERIALIZED.get(4));
                         }})
@@ -1628,7 +1628,7 @@ public class KafkaConfigBackingStoreTest {
     // from the log. Validate the data that is captured when the conversion is 
performed matches the specified data
     // (by checking a single field's value)
     private void expectConvertWriteRead2(final String configKey, final Schema 
valueSchema, final byte[] serialized,
-                                        final Struct value) throws Exception {
+                                        final Struct value) {
         doReturn(serialized).when(converter).fromConnectData(eq(TOPIC), 
eq(valueSchema), eq(value));
         
doReturn(producerFuture).when(configLog).sendWithReceipt(eq(configKey), 
eq(serialized));
         doReturn(new SchemaAndValue(null, 
structToMap(value))).when(converter).toConnectData(eq(TOPIC), eq(serialized));
@@ -1638,7 +1638,7 @@ public class KafkaConfigBackingStoreTest {
     // from the log. Validate the data that is captured when the conversion is 
performed matches the specified data
     // (by checking a single field's value)
     private void expectConvertWriteRead(final String configKey, final Schema 
valueSchema, final byte[] serialized,
-                                        final String dataFieldName, final 
Object dataFieldValue) throws Exception {
+                                        final String dataFieldName, final 
Object dataFieldValue) {
         final ArgumentCaptor<Struct> capturedRecord = 
ArgumentCaptor.forClass(Struct.class);
         when(converter.fromConnectData(eq(TOPIC), eq(valueSchema), 
capturedRecord.capture())).thenReturn(serialized);
         when(configLog.sendWithReceipt(configKey, 
serialized)).thenReturn(producerFuture);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
index f29d3bd29f2..9089e6f59c8 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
@@ -141,7 +141,7 @@ public class KafkaBasedLogTest {
 
     @BeforeEach
     public void setUp() {
-        store = new KafkaBasedLog<String, String>(TOPIC, PRODUCER_PROPS, 
CONSUMER_PROPS, topicAdminSupplier, consumedCallback, time, initializer) {
+        store = new KafkaBasedLog<>(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, 
topicAdminSupplier, consumedCallback, time, initializer) {
             @Override
             protected KafkaProducer<String, String> createProducer() {
                 return producer;
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java
index 29fd1689d40..2fb788a1f49 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java
@@ -104,7 +104,7 @@ public class SharedTopicAdminTest {
         // When closed
         sharedAdmin.close();
         // Then using the admin should fail
-        assertThrows(ConnectException.class, () -> sharedAdmin.topicAdmin());
+        assertThrows(ConnectException.class, sharedAdmin::topicAdmin);
     }
 
     private void verifyTopicAdminCreatesAndCloses(int count) {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestBackgroundThreadExceptionHandler.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestBackgroundThreadExceptionHandler.java
deleted file mode 100644
index 8726d5c87f9..00000000000
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestBackgroundThreadExceptionHandler.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.connect.util;
-
-/**
- * An UncaughtExceptionHandler that can be registered with one or more threads 
which tracks the
- * first exception so the main thread can check for uncaught exceptions.
- */
-public class TestBackgroundThreadExceptionHandler implements 
Thread.UncaughtExceptionHandler {
-    private Throwable firstException = null;
-
-    @Override
-    public void uncaughtException(Thread t, Throwable e) {
-        if (this.firstException == null)
-            this.firstException = e;
-    }
-
-    public void verifyNoExceptions() {
-        if (this.firstException != null)
-            throw new AssertionError(this.firstException);
-    }
-}
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestFuture.java 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestFuture.java
index 9130d8badc9..8410c0023b1 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestFuture.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestFuture.java
@@ -28,17 +28,9 @@ public class TestFuture<T> implements Future<T> {
     private Throwable exception;
     private final CountDownLatch getCalledLatch;
 
-    private volatile boolean resolveOnGet;
-    private T resolveOnGetResult;
-    private Throwable resolveOnGetException;
-
     public TestFuture() {
         resolved = false;
         getCalledLatch = new CountDownLatch(1);
-
-        resolveOnGet = false;
-        resolveOnGetResult = null;
-        resolveOnGetException = null;
     }
 
     public void resolve(T val) {
@@ -88,13 +80,6 @@ public class TestFuture<T> implements Future<T> {
     public T get(long timeout, TimeUnit unit) throws InterruptedException, 
ExecutionException, TimeoutException {
         getCalledLatch.countDown();
 
-        if (resolveOnGet) {
-            if (resolveOnGetException != null)
-                resolve(resolveOnGetException);
-            else
-                resolve(resolveOnGetResult);
-        }
-
         synchronized (this) {
             while (!resolved) {
                 this.wait(TimeUnit.MILLISECONDS.convert(timeout, unit));
@@ -111,50 +96,4 @@ public class TestFuture<T> implements Future<T> {
         }
         return result;
     }
-
-    /**
-     * Set a flag to resolve the future as soon as one of the get() methods 
has been called. Returns immediately.
-     * @param val the value to return from the future
-     */
-    public void resolveOnGet(T val) {
-        resolveOnGet = true;
-        resolveOnGetResult = val;
-    }
-
-    /**
-     * Set a flag to resolve the future as soon as one of the get() methods 
has been called. Returns immediately.
-     * @param t the exception to return from the future
-     */
-    public void resolveOnGet(Throwable t) {
-        resolveOnGet = true;
-        resolveOnGetException = t;
-    }
-
-    /**
-     * Block, waiting for another thread to call one of the get() methods, and 
then immediately resolve the future with
-     * the specified value.
-     * @param val the value to return from the future
-     */
-    public void waitForGetAndResolve(T val) {
-        waitForGet();
-        resolve(val);
-    }
-
-    /**
-     * Block, waiting for another thread to call one of the get() methods, and 
then immediately resolve the future with
-     * the specified value.
-     * @param t the exception to use to resolve the future
-     */
-    public void waitForGetAndResolve(Throwable t) {
-        waitForGet();
-        resolve(t);
-    }
-
-    private void waitForGet() {
-        try {
-            getCalledLatch.await();
-        } catch (InterruptedException e) {
-            throw new RuntimeException("Unexpected interruption: ", e);
-        }
-    }
 }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
index 08686b87f13..1f25dd15f51 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -525,8 +525,6 @@ public class TopicAdminTest {
         Cluster cluster = createCluster(1, "myTopic", 1);
 
         try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
-            Map<TopicPartition, Long> offsetMap = new HashMap<>();
-            offsetMap.put(tp1, offset);
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
             // This error should be treated as non-retriable and cause 
TopicAdmin::retryEndOffsets to fail
@@ -543,10 +541,7 @@ public class TopicAdminTest {
 
             Throwable cause = exception.getCause();
             assertNotNull(cause, "cause of failure should be preserved");
-            assertTrue(
-                    cause instanceof TopicAuthorizationException,
-                    "cause of failure should be accurately reported; expected 
topic authorization error, but was " + cause
-            );
+            assertInstanceOf(TopicAuthorizationException.class, cause, "cause 
of failure should be accurately reported; expected topic authorization error, 
but was " + cause);
         }
     }
 
@@ -559,8 +554,6 @@ public class TopicAdminTest {
         Cluster cluster = createCluster(1, "myTopic", 1);
 
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
-            Map<TopicPartition, Long> offsetMap = new HashMap<>();
-            offsetMap.put(tp1, offset);
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
             env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.UNKNOWN_TOPIC_OR_PARTITION));
             env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
index 018c9a40b05..efd306bd397 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
@@ -618,7 +618,7 @@ abstract class EmbeddedConnect {
             if (response.getStatus() < 
Response.Status.BAD_REQUEST.getStatusCode()) {
                 // We use String instead of ConnectorTaskId as the key here 
since the latter can't be automatically
                 // deserialized by Jackson when used as a JSON object key 
(i.e., when it's serialized as a JSON string)
-                return mapper.readValue(responseToString(response), new 
TypeReference<List<TaskInfo>>() { });
+                return mapper.readValue(responseToString(response), new 
TypeReference<>() { });
             }
         } catch (IOException e) {
             log.error("Could not read task configs from response: {}",
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
deleted file mode 100644
index e6f93f50009..00000000000
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.connect.util.clusters;
-
-/**
- * @deprecated Use {@link ConnectAssertions} instead.
- */
-@Deprecated
-public class EmbeddedConnectClusterAssertions extends ConnectAssertions {
-
-    EmbeddedConnectClusterAssertions(EmbeddedConnect connect) {
-        super(connect);
-    }
-
-}
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/WorkerHandle.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/WorkerHandle.java
index 51a4bbb337a..ad9c2917bbd 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/WorkerHandle.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/WorkerHandle.java
@@ -40,7 +40,7 @@ public class WorkerHandle {
 
     /**
      * Track the worker status during startup.
-     * @return {@link Connect#herderTask} to track or null
+     * @return {@link Connect#herderTask()} to track or null
      */
     public Future<?> herderTask() {
         return worker.herderTask();

Reply via email to