This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 624cd4f7d0b MINOR: Various cleanups in connect:runtime tests (#17827)
624cd4f7d0b is described below
commit 624cd4f7d0bd17218b2c70c87ba89dc4312ad900
Author: Mickael Maison <[email protected]>
AuthorDate: Tue Nov 19 08:55:54 2024 +0100
MINOR: Various cleanups in connect:runtime tests (#17827)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../ConnectorClientPolicyIntegrationTest.java | 13 ++---
.../ConnectorRestartApiIntegrationTest.java | 8 +--
.../integration/ErrorHandlingIntegrationTest.java | 1 -
.../connect/integration/StartAndStopCounter.java | 13 -----
.../integration/TransformationIntegrationTest.java | 4 --
.../kafka/connect/runtime/AbstractHerderTest.java | 2 +-
.../connect/runtime/SourceConnectorConfigTest.java | 2 +-
.../connect/runtime/SubmittedRecordsTest.java | 4 +-
.../kafka/connect/runtime/WorkerSinkTaskTest.java | 2 +-
.../runtime/WorkerSinkTaskThreadedTest.java | 5 +-
.../IncrementalCooperativeAssignorTest.java | 3 +-
.../errors/RetryWithToleranceOperatorTest.java | 2 +-
.../connect/runtime/isolation/PluginUtilsTest.java | 1 -
.../connect/runtime/isolation/PluginsTest.java | 3 +-
.../runtime/isolation/SynchronizationTest.java | 1 -
.../runtime/rest/ConnectRestServerTest.java | 4 +-
.../kafka/connect/runtime/rest/RestClientTest.java | 24 ++++-----
.../resources/ConnectorPluginsResourceTest.java | 8 +--
.../rest/resources/ConnectorsResourceTest.java | 2 +-
.../runtime/standalone/StandaloneConfigTest.java | 4 +-
.../storage/KafkaConfigBackingStoreTest.java | 36 ++++++-------
.../kafka/connect/util/KafkaBasedLogTest.java | 2 +-
.../kafka/connect/util/SharedTopicAdminTest.java | 2 +-
.../util/TestBackgroundThreadExceptionHandler.java | 36 -------------
.../org/apache/kafka/connect/util/TestFuture.java | 61 ----------------------
.../apache/kafka/connect/util/TopicAdminTest.java | 9 +---
.../connect/util/clusters/EmbeddedConnect.java | 2 +-
.../clusters/EmbeddedConnectClusterAssertions.java | 29 ----------
.../kafka/connect/util/clusters/WorkerHandle.java | 2 +-
29 files changed, 59 insertions(+), 226 deletions(-)
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java
index a127f85b12d..ef55e0b3258 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java
@@ -25,7 +25,6 @@ import
org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
@@ -49,19 +48,15 @@ public class ConnectorClientPolicyIntegrationTest {
private static final int NUM_WORKERS = 1;
private static final String CONNECTOR_NAME = "simple-conn";
- @AfterEach
- public void close() {
- }
-
@Test
- public void testCreateWithOverridesForNonePolicy() throws Exception {
+ public void testCreateWithOverridesForNonePolicy() {
Map<String, String> props = basicConnectorConfig();
props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX +
SaslConfigs.SASL_JAAS_CONFIG, "sasl");
assertFailCreateConnector("None", props);
}
@Test
- public void testCreateWithNotAllowedOverridesForPrincipalPolicy() throws
Exception {
+ public void testCreateWithNotAllowedOverridesForPrincipalPolicy() {
Map<String, String> props = basicConnectorConfig();
props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX +
SaslConfigs.SASL_JAAS_CONFIG, "sasl");
props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX +
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
@@ -98,7 +93,7 @@ public class ConnectorClientPolicyIntegrationTest {
assertPassCreateConnector(null, props);
}
- private EmbeddedConnectCluster connectClusterWithPolicy(String policy)
throws InterruptedException {
+ private EmbeddedConnectCluster connectClusterWithPolicy(String policy) {
// setup Connect worker properties
Map<String, String> workerProps = new HashMap<>();
workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG,
String.valueOf(5_000));
@@ -125,7 +120,7 @@ public class ConnectorClientPolicyIntegrationTest {
return connect;
}
- private void assertFailCreateConnector(String policy, Map<String, String>
props) throws InterruptedException {
+ private void assertFailCreateConnector(String policy, Map<String, String>
props) {
EmbeddedConnectCluster connect = connectClusterWithPolicy(policy);
try {
connect.configureConnector(CONNECTOR_NAME, props);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java
index 1c398a22396..a95352dbdbf 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java
@@ -86,7 +86,7 @@ public class ConnectorRestartApiIntegrationTest {
connectorHandle = RuntimeHandles.get().connectorHandle(connectorName);
}
- private void startOrReuseConnectWithNumWorkers(int numWorkers) throws
Exception {
+ private void startOrReuseConnectWithNumWorkers(int numWorkers) {
connect = CONNECT_CLUSTERS.computeIfAbsent(numWorkers, n -> {
// setup Connect worker properties
Map<String, String> workerProps = new HashMap<>();
@@ -123,7 +123,7 @@ public class ConnectorRestartApiIntegrationTest {
}
@Test
- public void testRestartUnknownConnectorNoParams() throws Exception {
+ public void testRestartUnknownConnectorNoParams() {
String connectorName = "Unknown";
// build a Connect cluster backed by a Kafka KRaft cluster
@@ -137,14 +137,14 @@ public class ConnectorRestartApiIntegrationTest {
}
@Test
- public void testRestartUnknownConnector() throws Exception {
+ public void testRestartUnknownConnector() {
restartUnknownConnector(false, false);
restartUnknownConnector(false, true);
restartUnknownConnector(true, false);
restartUnknownConnector(true, true);
}
- private void restartUnknownConnector(boolean onlyFailed, boolean
includeTasks) throws Exception {
+ private void restartUnknownConnector(boolean onlyFailed, boolean
includeTasks) {
String connectorName = "Unknown";
// build a Connect cluster backed by a Kafka KRaft cluster
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
index 256629f4b11..6f386267e21 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
@@ -74,7 +74,6 @@ import static org.junit.jupiter.api.Assertions.fail;
public class ErrorHandlingIntegrationTest {
private static final Logger log =
LoggerFactory.getLogger(ErrorHandlingIntegrationTest.class);
- private static final int NUM_WORKERS = 1;
private static final String DLQ_TOPIC = "my-connector-errors";
private static final String CONNECTOR_NAME = "error-conn";
private static final String TASK_ID = "error-conn-0";
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopCounter.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopCounter.java
index 25911896430..42b660c0182 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopCounter.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopCounter.java
@@ -115,19 +115,6 @@ public class StartAndStopCounter {
return expectedRestarts(expectedRestarts, expectedRestarts);
}
- /**
- * Obtain a {@link StartAndStopLatch} that can be used to wait until the
expected number of restarts
- * has been completed.
- *
- * @param expectedRestarts the expected number of restarts
- * @param dependents any dependent latches that must also complete
in order for the
- * resulting latch to complete
- * @return the latch; never null
- */
- public StartAndStopLatch expectedRestarts(int expectedRestarts,
List<StartAndStopLatch> dependents) {
- return expectedRestarts(expectedRestarts, expectedRestarts,
dependents);
- }
-
/**
* Obtain a {@link StartAndStopLatch} that can be used to wait until the
expected number of starts
* has been completed.
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
index 105d238d56f..50c7d829a4a 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
@@ -28,8 +28,6 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
@@ -58,8 +56,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
@Tag("integration")
public class TransformationIntegrationTest {
- private static final Logger log =
LoggerFactory.getLogger(TransformationIntegrationTest.class);
-
private static final int NUM_RECORDS_PRODUCED = 2000;
private static final int NUM_TOPIC_PARTITIONS = 3;
private static final long RECORD_TRANSFER_DURATION_MS =
TimeUnit.SECONDS.toMillis(30);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 420cf5e745c..01f847d94f6 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -1232,7 +1232,7 @@ public class AbstractHerderTest {
private void testConfigProviderRegex(String rawConnConfig, boolean
expected) {
Set<String> keys =
keysWithVariableValues(Collections.singletonMap("key", rawConnConfig),
ConfigTransformer.DEFAULT_PATTERN);
- boolean actual = keys != null && !keys.isEmpty() &&
keys.contains("key");
+ boolean actual = !keys.isEmpty() && keys.contains("key");
assertEquals(expected, actual, String.format("%s should have matched
regex", rawConnConfig));
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceConnectorConfigTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceConnectorConfigTest.java
index e8e8fbcbdd8..cb39ac42ebc 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceConnectorConfigTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceConnectorConfigTest.java
@@ -73,7 +73,7 @@ public class SourceConnectorConfigTest {
TOPIC_CREATION_GROUP_1, TOPIC_CREATION_GROUP_2));
props.put(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG,
"1");
props.put(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG, "1");
- SourceConnectorConfig config = new SourceConnectorConfig(MOCK_PLUGINS,
props, true);
+ new SourceConnectorConfig(MOCK_PLUGINS, props, true);
}
@Test
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java
index 8feeee0588a..6b8368e002c 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java
@@ -342,7 +342,7 @@ public class SubmittedRecordsTest {
}
@SafeVarargs
- private final void assertRemovedDeques(Map<String, ?>... partitions) {
+ private void assertRemovedDeques(Map<String, ?>... partitions) {
for (Map<String, ?> partition : partitions) {
assertFalse(submittedRecords.records.containsKey(partition),
"Deque for partition " + partition + " should have been cleaned up from
internal records map");
}
@@ -365,7 +365,7 @@ public class SubmittedRecordsTest {
@SafeVarargs
@SuppressWarnings("varargs")
- private final void assertMetadata(
+ private void assertMetadata(
CommittableOffsets committableOffsets,
int committableMessages,
int uncommittableMessages,
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index b3d197acc06..edad9a250f0 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -1760,7 +1760,7 @@ public class WorkerSinkTaskTest {
transformationChain, mockConsumer, pluginLoader, time,
RetryWithToleranceOperatorTest.noopOperator(), null,
statusBackingStore, Collections::emptyList);
mockConsumer.updateBeginningOffsets(
- new HashMap<TopicPartition, Long>() {{
+ new HashMap<>() {{
put(TOPIC_PARTITION, 0L);
put(TOPIC_PARTITION2, 0L);
}}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index a41ce37c356..67978760e7b 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -641,7 +641,7 @@ public class WorkerSinkTaskThreadedTest {
}
private void expectPreCommit(ExpectOffsetCommitCommand... commands) {
- doAnswer(new Answer<Object>() {
+ doAnswer(new Answer<>() {
int index = 0;
@Override
@@ -662,7 +662,7 @@ public class WorkerSinkTaskThreadedTest {
}
private void expectOffsetCommit(ExpectOffsetCommitCommand... commands) {
- doAnswer(new Answer<Object>() {
+ doAnswer(new Answer<>() {
int index = 0;
@Override
@@ -718,7 +718,6 @@ public class WorkerSinkTaskThreadedTest {
private abstract static class TestSinkTask extends SinkTask {
}
- @SuppressWarnings("NewClassNamingConvention")
private static class ExpectOffsetCommitCommand {
final long expectedMessages;
final RuntimeException error;
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
index b59c1863a91..86bc897fafe 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
@@ -923,8 +923,7 @@ public class IncrementalCooperativeAssignorTest {
assignor.handleLostAssignments(lostAssignments, new
ConnectorsAndTasks.Builder(),
new ArrayList<>(configuredAssignment.values()));
- Set<String> expectedWorkers = new HashSet<>();
- expectedWorkers.addAll(Arrays.asList(newWorker, flakyWorker));
+ Set<String> expectedWorkers = new HashSet<>(Arrays.asList(newWorker,
flakyWorker));
assertEquals(expectedWorkers,
assignor.candidateWorkersForReassignment,
"Wrong set of workers for reassignments");
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
index 68931a8a993..dfa4aa353fe 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
@@ -87,7 +87,7 @@ import static org.mockito.Mockito.when;
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
public class RetryWithToleranceOperatorTest {
- private static final Map<String, String> PROPERTIES = new HashMap<String,
String>() {{
+ private static final Map<String, String> PROPERTIES = new HashMap<>() {{
put(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG,
Objects.toString(2));
put(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG,
Objects.toString(3000));
put(CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG,
Sensor.RecordingLevel.INFO.toString());
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
index 30babb1bb65..23041f9c319 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
@@ -518,7 +518,6 @@ public class PluginUtilsTest {
Collections.emptySortedSet(),
Collections.emptySortedSet()
);
- Map<String, String> aliases = PluginUtils.computeAliases(result);
Map<String, String> actualAliases = PluginUtils.computeAliases(result);
Map<String, String> expectedAliases = new HashMap<>();
expectedAliases.put("MockSinkConnector",
MockSinkConnector.class.getName());
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
index d1c723852bc..55a3445a331 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -186,8 +186,7 @@ public class PluginsTest {
assertNotNull(connectRestExtensions);
assertEquals(1, connectRestExtensions.size(), "One Rest Extension
expected");
assertNotNull(connectRestExtensions.get(0));
- assertTrue(connectRestExtensions.get(0) instanceof
TestConnectRestExtension,
- "Should be instance of TestConnectRestExtension");
+ assertInstanceOf(TestConnectRestExtension.class,
connectRestExtensions.get(0), "Should be instance of TestConnectRestExtension");
assertNotNull(((TestConnectRestExtension)
connectRestExtensions.get(0)).configs);
assertEquals(config.originals(),
((TestConnectRestExtension)
connectRestExtensions.get(0)).configs);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
index 70b875b21b8..1f3d42fd3dc 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
@@ -456,7 +456,6 @@ public class SynchronizationTest {
}
}
- @SuppressWarnings("removal")
private static ThreadFactory threadFactoryWithNamedThreads(String
threadPrefix) {
AtomicInteger threadNumber = new AtomicInteger(1);
return r -> {
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java
index 8b69b1c37b2..9a686aa81fd 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java
@@ -184,7 +184,7 @@ public class ConnectRestServerTest {
response.getEntity().writeTo(baos);
assertArrayEquals(
request.getAllowedMethods(response).toArray(),
- new String(baos.toByteArray(), StandardCharsets.UTF_8).split(", ")
+ baos.toString(StandardCharsets.UTF_8).split(", ")
);
}
@@ -281,7 +281,7 @@ public class ConnectRestServerTest {
expectedLogger.put("level", loggingLevel);
expectedLogger.put("last_modified", lastModified);
Map<String, Map<String, Object>> expectedLoggers =
Collections.singletonMap(logger, expectedLogger);
- Map<String, Map<String, Object>> actualLoggers =
mapper.readValue(responseStr, new TypeReference<Map<String, Map<String,
Object>>>() { });
+ Map<String, Map<String, Object>> actualLoggers =
mapper.readValue(responseStr, new TypeReference<>() { });
assertEquals(expectedLoggers, actualLoggers);
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java
index 5250fc28f0d..75e224cfa32 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java
@@ -69,7 +69,7 @@ public class RestClientTest {
private static final String MOCK_URL =
"http://localhost:1234/api/endpoint";
private static final String TEST_METHOD = "GET";
private static final TestDTO TEST_DTO = new TestDTO("requestBodyData");
- private static final TypeReference<TestDTO> TEST_TYPE = new
TypeReference<TestDTO>() { };
+ private static final TypeReference<TestDTO> TEST_TYPE = new
TypeReference<>() { };
private static final SecretKey MOCK_SECRET_KEY = getMockSecretKey();
private static final String TEST_SIGNATURE_ALGORITHM = "HmacSHA1";
@@ -138,17 +138,15 @@ public class RestClientTest {
@Test
public void testNullUrl() {
RestClient client = spy(new RestClient(null));
- assertThrows(NullPointerException.class, () -> {
- client.httpRequest(
- null,
- TEST_METHOD,
- null,
- TEST_DTO,
- TEST_TYPE,
- MOCK_SECRET_KEY,
- TEST_SIGNATURE_ALGORITHM
- );
- });
+ assertThrows(NullPointerException.class, () -> client.httpRequest(
+ null,
+ TEST_METHOD,
+ null,
+ TEST_DTO,
+ TEST_TYPE,
+ MOCK_SECRET_KEY,
+ TEST_SIGNATURE_ALGORITHM
+ ));
}
@Test
@@ -233,7 +231,7 @@ public class RestClientTest {
when(resp.getContentAsString()).thenReturn(toJsonString(TEST_DTO));
setupHttpClient(statusCode, req, resp);
- TypeReference<Void> voidResponse = new TypeReference<Void>() { };
+ TypeReference<Void> voidResponse = new TypeReference<>() { };
RestClient.HttpResponse<Void> httpResp = httpRequest(
httpClient, MOCK_URL, TEST_METHOD, voidResponse,
TEST_SIGNATURE_ALGORITHM
);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index 7773618ce25..eb34d619d5c 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -159,22 +159,18 @@ public class ConnectorPluginsResourceTest {
PREDICATE_PLUGINS.add(new PluginDesc<>(HasHeaderKey.class,
appVersion, PluginType.PREDICATE, classLoader));
PREDICATE_PLUGINS.add(new PluginDesc<>(RecordIsTombstone.class,
appVersion, PluginType.PREDICATE, classLoader));
} catch (Exception e) {
- e.printStackTrace();
fail("Failed setting up plugins");
}
}
static {
- List<ConfigInfo> configs = new LinkedList<>();
- List<ConfigInfo> partialConfigs = new LinkedList<>();
-
ConfigDef connectorConfigDef = ConnectorConfig.configDef();
List<ConfigValue> connectorConfigValues =
connectorConfigDef.validate(PROPS);
List<ConfigValue> partialConnectorConfigValues =
connectorConfigDef.validate(PARTIAL_PROPS);
ConfigInfos result =
AbstractHerder.generateResult(ConnectorPluginsResourceTestConnector.class.getName(),
connectorConfigDef.configKeys(), connectorConfigValues,
Collections.emptyList());
ConfigInfos partialResult =
AbstractHerder.generateResult(ConnectorPluginsResourceTestConnector.class.getName(),
connectorConfigDef.configKeys(), partialConnectorConfigValues,
Collections.emptyList());
- configs.addAll(result.values());
- partialConfigs.addAll(partialResult.values());
+ List<ConfigInfo> configs = new LinkedList<>(result.values());
+ List<ConfigInfo> partialConfigs = new
LinkedList<>(partialResult.values());
ConfigKeyInfo configKeyInfo = new ConfigKeyInfo("test.string.config",
"STRING", true, null, "HIGH", "Test configuration for string type.", null, -1,
"NONE", "test.string.config", Collections.emptyList());
ConfigValueInfo configValueInfo = new
ConfigValueInfo("test.string.config", "testString", Collections.emptyList(),
Collections.emptyList(), true);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index 6357f731fa4..d4675b51df5 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -629,7 +629,7 @@ public class ConnectorsResourceTest {
}
@Test
- public void testPatchConnectorConfigNotFound() throws Throwable {
+ public void testPatchConnectorConfigNotFound() {
final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb =
ArgumentCaptor.forClass(Callback.class);
expectAndCallbackException(cb, new NotFoundException("Connector " +
CONNECTOR_NAME + " not found"))
.when(herder).patchConnectorConfig(eq(CONNECTOR_NAME),
eq(CONNECTOR_CONFIG_PATCH), cb.capture());
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfigTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfigTest.java
index 1630fbf19fa..4d8c25932fe 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfigTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfigTest.java
@@ -34,7 +34,7 @@ public class StandaloneConfigTest {
private static final String HTTPS_LISTENER_PREFIX = "listeners.https.";
private Map<String, Object> sslProps() {
- return new HashMap<String, Object>() {
+ return new HashMap<>() {
{
put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, new
Password("ssl_key_password"));
put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "ssl_keystore");
@@ -46,7 +46,7 @@ public class StandaloneConfigTest {
}
private Map<String, String> baseWorkerProps() {
- return new HashMap<String, String>() {
+ return new HashMap<>() {
{
put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
"org.apache.kafka.connect.json.JsonConverter");
put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
"org.apache.kafka.connect.json.JsonConverter");
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index 945b88cce6c..d0968db29ce 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -310,7 +310,7 @@ public class KafkaConfigBackingStoreTest {
doAnswer(expectReadToEnd(Collections.singletonMap(CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0))))
.doAnswer(expectReadToEnd(Collections.singletonMap(CONNECTOR_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(1))))
// Config deletion
- .doAnswer(expectReadToEnd(new LinkedHashMap<String, byte[]>()
{{
+ .doAnswer(expectReadToEnd(new LinkedHashMap<>() {{
put(configKey, null);
put(targetStateKey, null);
}})
@@ -363,7 +363,7 @@ public class KafkaConfigBackingStoreTest {
}
@Test
- public void testPutConnectorConfigWithTargetState() throws Exception {
+ public void testPutConnectorConfigWithTargetState() {
when(configLog.partitionCount()).thenReturn(1);
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
@@ -376,7 +376,7 @@ public class KafkaConfigBackingStoreTest {
assertNull(configState.connectorConfig(CONNECTOR_IDS.get(0)));
assertNull(configState.targetState(CONNECTOR_IDS.get(0)));
- doAnswer(expectReadToEnd(new LinkedHashMap<String, byte[]>() {{
+ doAnswer(expectReadToEnd(new LinkedHashMap<>() {{
put(TARGET_STATE_KEYS.get(0),
TARGET_STATES_SERIALIZED.get(2));
put(CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0));
}})
@@ -1028,7 +1028,7 @@ public class KafkaConfigBackingStoreTest {
}
@Test
- public void testPutTaskConfigsDoesNotResolveAllInconsistencies() throws
Exception {
+ public void testPutTaskConfigsDoesNotResolveAllInconsistencies() {
// Test a case where a failure and compaction has left us in an
inconsistent state when reading the log.
// We start out by loading an initial configuration where we started
to write a task update, and then
// compaction cleaned up the earlier record.
@@ -1105,18 +1105,18 @@ public class KafkaConfigBackingStoreTest {
}
@Test
- public void testPutRestartRequestOnlyFailed() throws Exception {
+ public void testPutRestartRequestOnlyFailed() {
RestartRequest restartRequest = new
RestartRequest(CONNECTOR_IDS.get(0), true, false);
testPutRestartRequest(restartRequest);
}
@Test
- public void testPutRestartRequestOnlyFailedIncludingTasks() throws
Exception {
+ public void testPutRestartRequestOnlyFailedIncludingTasks() {
RestartRequest restartRequest = new
RestartRequest(CONNECTOR_IDS.get(0), true, true);
testPutRestartRequest(restartRequest);
}
- private void testPutRestartRequest(RestartRequest restartRequest) throws
Exception {
+ private void testPutRestartRequest(RestartRequest restartRequest) {
expectStart(Collections.emptyList(), Collections.emptyMap());
when(configLog.partitionCount()).thenReturn(1);
@@ -1191,7 +1191,7 @@ public class KafkaConfigBackingStoreTest {
}
@Test
- public void testPutTaskConfigsZeroTasks() throws Exception {
+ public void testPutTaskConfigsZeroTasks() {
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
verifyConfigure();
configStorage.start();
@@ -1290,7 +1290,7 @@ public class KafkaConfigBackingStoreTest {
}
@Test
- public void testSameTargetState() throws Exception {
+ public void testSameTargetState() {
// verify that we handle target state changes correctly when they come
up through the log
List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
new ConsumerRecord<>(TOPIC, 0, 0, 0L,
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
@@ -1391,7 +1391,7 @@ public class KafkaConfigBackingStoreTest {
}
@Test
- public void testTaskCountRecordsAndGenerations() throws Exception {
+ public void testTaskCountRecordsAndGenerations() {
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
verifyConfigure();
configStorage.start();
@@ -1408,7 +1408,7 @@ public class KafkaConfigBackingStoreTest {
doAnswer(expectReadToEnd(new LinkedHashMap<>()))
.doAnswer(expectReadToEnd(new LinkedHashMap<>()))
.doAnswer(expectReadToEnd(serializedConfigs))
- .doAnswer(expectReadToEnd(new LinkedHashMap<String, byte[]>()
{{
+ .doAnswer(expectReadToEnd(new LinkedHashMap<>() {{
put(CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0),
CONFIGS_SERIALIZED.get(3));
}})
)
@@ -1467,7 +1467,7 @@ public class KafkaConfigBackingStoreTest {
}
@Test
- public void testPutTaskConfigs() throws Exception {
+ public void testPutTaskConfigs() {
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
verifyConfigure();
configStorage.start();
@@ -1475,7 +1475,7 @@ public class KafkaConfigBackingStoreTest {
doAnswer(expectReadToEnd(new LinkedHashMap<>()))
.doAnswer(expectReadToEnd(new LinkedHashMap<>()))
- .doAnswer(expectReadToEnd(new LinkedHashMap<String, byte[]>()
{{
+ .doAnswer(expectReadToEnd(new LinkedHashMap<>() {{
put(TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0));
put(TASK_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(1));
put(COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(2));
@@ -1525,7 +1525,7 @@ public class KafkaConfigBackingStoreTest {
}
@Test
- public void testPutTaskConfigsStartsOnlyReconfiguredTasks() throws
Exception {
+ public void testPutTaskConfigsStartsOnlyReconfiguredTasks() {
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
verifyConfigure();
configStorage.start();
@@ -1533,7 +1533,7 @@ public class KafkaConfigBackingStoreTest {
doAnswer(expectReadToEnd(new LinkedHashMap<>()))
.doAnswer(expectReadToEnd(new LinkedHashMap<>()))
- .doAnswer(expectReadToEnd(new LinkedHashMap<String, byte[]>()
{{
+ .doAnswer(expectReadToEnd(new LinkedHashMap<>() {{
put(TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0));
put(TASK_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(1));
put(COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(2));
@@ -1541,7 +1541,7 @@ public class KafkaConfigBackingStoreTest {
)
.doAnswer(expectReadToEnd(new LinkedHashMap<>()))
.doAnswer(expectReadToEnd(new LinkedHashMap<>()))
- .doAnswer(expectReadToEnd(new LinkedHashMap<String, byte[]>()
{{
+ .doAnswer(expectReadToEnd(new LinkedHashMap<>() {{
put(TASK_CONFIG_KEYS.get(2),
CONFIGS_SERIALIZED.get(3));
put(COMMIT_TASKS_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(4));
}})
@@ -1628,7 +1628,7 @@ public class KafkaConfigBackingStoreTest {
// from the log. Validate the data that is captured when the conversion is
performed matches the specified data
// (by checking a single field's value)
private void expectConvertWriteRead2(final String configKey, final Schema
valueSchema, final byte[] serialized,
- final Struct value) throws Exception {
+ final Struct value) {
doReturn(serialized).when(converter).fromConnectData(eq(TOPIC),
eq(valueSchema), eq(value));
doReturn(producerFuture).when(configLog).sendWithReceipt(eq(configKey),
eq(serialized));
doReturn(new SchemaAndValue(null,
structToMap(value))).when(converter).toConnectData(eq(TOPIC), eq(serialized));
@@ -1638,7 +1638,7 @@ public class KafkaConfigBackingStoreTest {
// from the log. Validate the data that is captured when the conversion is
performed matches the specified data
// (by checking a single field's value)
private void expectConvertWriteRead(final String configKey, final Schema
valueSchema, final byte[] serialized,
- final String dataFieldName, final
Object dataFieldValue) throws Exception {
+ final String dataFieldName, final
Object dataFieldValue) {
final ArgumentCaptor<Struct> capturedRecord =
ArgumentCaptor.forClass(Struct.class);
when(converter.fromConnectData(eq(TOPIC), eq(valueSchema),
capturedRecord.capture())).thenReturn(serialized);
when(configLog.sendWithReceipt(configKey,
serialized)).thenReturn(producerFuture);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
index f29d3bd29f2..9089e6f59c8 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
@@ -141,7 +141,7 @@ public class KafkaBasedLogTest {
@BeforeEach
public void setUp() {
- store = new KafkaBasedLog<String, String>(TOPIC, PRODUCER_PROPS,
CONSUMER_PROPS, topicAdminSupplier, consumedCallback, time, initializer) {
+ store = new KafkaBasedLog<>(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS,
topicAdminSupplier, consumedCallback, time, initializer) {
@Override
protected KafkaProducer<String, String> createProducer() {
return producer;
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java
index 29fd1689d40..2fb788a1f49 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java
@@ -104,7 +104,7 @@ public class SharedTopicAdminTest {
// When closed
sharedAdmin.close();
// Then using the admin should fail
- assertThrows(ConnectException.class, () -> sharedAdmin.topicAdmin());
+ assertThrows(ConnectException.class, sharedAdmin::topicAdmin);
}
private void verifyTopicAdminCreatesAndCloses(int count) {
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestBackgroundThreadExceptionHandler.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestBackgroundThreadExceptionHandler.java
deleted file mode 100644
index 8726d5c87f9..00000000000
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestBackgroundThreadExceptionHandler.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.connect.util;
-
-/**
- * An UncaughtExceptionHandler that can be registered with one or more threads
which tracks the
- * first exception so the main thread can check for uncaught exceptions.
- */
-public class TestBackgroundThreadExceptionHandler implements
Thread.UncaughtExceptionHandler {
- private Throwable firstException = null;
-
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- if (this.firstException == null)
- this.firstException = e;
- }
-
- public void verifyNoExceptions() {
- if (this.firstException != null)
- throw new AssertionError(this.firstException);
- }
-}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestFuture.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestFuture.java
index 9130d8badc9..8410c0023b1 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestFuture.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestFuture.java
@@ -28,17 +28,9 @@ public class TestFuture<T> implements Future<T> {
private Throwable exception;
private final CountDownLatch getCalledLatch;
- private volatile boolean resolveOnGet;
- private T resolveOnGetResult;
- private Throwable resolveOnGetException;
-
public TestFuture() {
resolved = false;
getCalledLatch = new CountDownLatch(1);
-
- resolveOnGet = false;
- resolveOnGetResult = null;
- resolveOnGetException = null;
}
public void resolve(T val) {
@@ -88,13 +80,6 @@ public class TestFuture<T> implements Future<T> {
public T get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {
getCalledLatch.countDown();
- if (resolveOnGet) {
- if (resolveOnGetException != null)
- resolve(resolveOnGetException);
- else
- resolve(resolveOnGetResult);
- }
-
synchronized (this) {
while (!resolved) {
this.wait(TimeUnit.MILLISECONDS.convert(timeout, unit));
@@ -111,50 +96,4 @@ public class TestFuture<T> implements Future<T> {
}
return result;
}
-
- /**
- * Set a flag to resolve the future as soon as one of the get() methods
has been called. Returns immediately.
- * @param val the value to return from the future
- */
- public void resolveOnGet(T val) {
- resolveOnGet = true;
- resolveOnGetResult = val;
- }
-
- /**
- * Set a flag to resolve the future as soon as one of the get() methods
has been called. Returns immediately.
- * @param t the exception to return from the future
- */
- public void resolveOnGet(Throwable t) {
- resolveOnGet = true;
- resolveOnGetException = t;
- }
-
- /**
- * Block, waiting for another thread to call one of the get() methods, and
then immediately resolve the future with
- * the specified value.
- * @param val the value to return from the future
- */
- public void waitForGetAndResolve(T val) {
- waitForGet();
- resolve(val);
- }
-
- /**
- * Block, waiting for another thread to call one of the get() methods, and
then immediately resolve the future with
- * the specified value.
- * @param t the exception to use to resolve the future
- */
- public void waitForGetAndResolve(Throwable t) {
- waitForGet();
- resolve(t);
- }
-
- private void waitForGet() {
- try {
- getCalledLatch.await();
- } catch (InterruptedException e) {
- throw new RuntimeException("Unexpected interruption: ", e);
- }
- }
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
index 08686b87f13..1f25dd15f51 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -525,8 +525,6 @@ public class TopicAdminTest {
Cluster cluster = createCluster(1, "myTopic", 1);
try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new
MockTime(), cluster)) {
- Map<TopicPartition, Long> offsetMap = new HashMap<>();
- offsetMap.put(tp1, offset);
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
// This error should be treated as non-retriable and cause
TopicAdmin::retryEndOffsets to fail
@@ -543,10 +541,7 @@ public class TopicAdminTest {
Throwable cause = exception.getCause();
assertNotNull(cause, "cause of failure should be preserved");
- assertTrue(
- cause instanceof TopicAuthorizationException,
- "cause of failure should be accurately reported; expected
topic authorization error, but was " + cause
- );
+ assertInstanceOf(TopicAuthorizationException.class, cause, "cause
of failure should be accurately reported; expected topic authorization error,
but was " + cause);
}
}
@@ -559,8 +554,6 @@ public class TopicAdminTest {
Cluster cluster = createCluster(1, "myTopic", 1);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new
MockTime(), cluster)) {
- Map<TopicPartition, Long> offsetMap = new HashMap<>();
- offsetMap.put(tp1, offset);
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster,
Errors.UNKNOWN_TOPIC_OR_PARTITION));
env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster,
Errors.NONE));
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
index 018c9a40b05..efd306bd397 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
@@ -618,7 +618,7 @@ abstract class EmbeddedConnect {
if (response.getStatus() <
Response.Status.BAD_REQUEST.getStatusCode()) {
// We use String instead of ConnectorTaskId as the key here
since the latter can't be automatically
// deserialized by Jackson when used as a JSON object key
(i.e., when it's serialized as a JSON string)
- return mapper.readValue(responseToString(response), new
TypeReference<List<TaskInfo>>() { });
+ return mapper.readValue(responseToString(response), new
TypeReference<>() { });
}
} catch (IOException e) {
log.error("Could not read task configs from response: {}",
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
deleted file mode 100644
index e6f93f50009..00000000000
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.connect.util.clusters;
-
-/**
- * @deprecated Use {@link ConnectAssertions} instead.
- */
-@Deprecated
-public class EmbeddedConnectClusterAssertions extends ConnectAssertions {
-
- EmbeddedConnectClusterAssertions(EmbeddedConnect connect) {
- super(connect);
- }
-
-}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/WorkerHandle.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/WorkerHandle.java
index 51a4bbb337a..ad9c2917bbd 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/WorkerHandle.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/WorkerHandle.java
@@ -40,7 +40,7 @@ public class WorkerHandle {
/**
* Track the worker status during startup.
- * @return {@link Connect#herderTask} to track or null
+ * @return {@link Connect#herderTask()} to track or null
*/
public Future<?> herderTask() {
return worker.herderTask();