This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 45835a0e459 MINOR: Cleanup connect runtime module (#18074)
45835a0e459 is described below

commit 45835a0e459b3d1a7203d73c94e7b393bdc89dca
Author: Dmitry Werner <[email protected]>
AuthorDate: Tue Dec 10 22:25:38 2024 +0500

    MINOR: Cleanup connect runtime module (#18074)
    
    
    Reviewers: Mickael Maison <[email protected]>
---
 .../kafka/connect/cli/ConnectStandalone.java       |  7 +---
 .../kafka/connect/runtime/ConnectMetrics.java      |  6 +--
 .../runtime/ExactlyOnceWorkerSourceTask.java       |  2 +-
 .../apache/kafka/connect/runtime/StateTracker.java |  4 +-
 .../kafka/connect/runtime/WorkerSinkTask.java      |  8 ++--
 .../kafka/connect/runtime/WorkerSourceTask.java    | 12 +++---
 .../apache/kafka/connect/runtime/WorkerTask.java   |  9 ++---
 .../runtime/distributed/DistributedHerder.java     |  4 +-
 .../runtime/distributed/WorkerGroupMember.java     |  3 +-
 .../apache/kafka/connect/tools/PredicateDoc.java   | 32 ++++++----------
 .../kafka/connect/tools/TransformationDoc.java     | 32 ++++++----------
 .../org/apache/kafka/connect/util/Callback.java    |  2 +-
 .../apache/kafka/connect/util/KafkaBasedLog.java   | 36 +++++++++---------
 .../org/apache/kafka/connect/util/RetryUtil.java   |  2 +-
 .../kafka/connect/util/SafeObjectInputStream.java  | 30 ++++++---------
 .../connect/runtime/ErrorHandlingTaskTest.java     | 11 ------
 .../apache/kafka/connect/runtime/WorkerTest.java   | 16 ++++----
 .../kafka/connect/runtime/WorkerTestUtils.java     | 43 ----------------------
 .../connect/runtime/rest/RestServerConfigTest.java |  2 -
 .../rest/resources/ConnectorsResourceTest.java     |  5 ---
 .../storage/KafkaConfigBackingStoreTest.java       |  7 ----
 21 files changed, 87 insertions(+), 186 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index 120e03c6f8e..43af6b274b6 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -121,9 +121,7 @@ public class ConnectStandalone extends 
AbstractConnectCli<StandaloneHerder, Stan
 
         File connectorConfigurationFile = Paths.get(filePath).toFile();
         try {
-            Map<String, String> connectorConfigs = objectMapper.readValue(
-                connectorConfigurationFile,
-                new TypeReference<Map<String, String>>() { });
+            Map<String, String> connectorConfigs = 
objectMapper.readValue(connectorConfigurationFile, new TypeReference<>() { });
 
             if (!connectorConfigs.containsKey(NAME_CONFIG)) {
                 throw new ConnectException("Connector configuration at '" + 
filePath + "' is missing the mandatory '" + NAME_CONFIG + "' "
@@ -136,8 +134,7 @@ public class ConnectStandalone extends 
AbstractConnectCli<StandaloneHerder, Stan
 
         try {
             
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
-            CreateConnectorRequest createConnectorRequest = 
objectMapper.readValue(connectorConfigurationFile,
-                new TypeReference<CreateConnectorRequest>() { });
+            CreateConnectorRequest createConnectorRequest = 
objectMapper.readValue(connectorConfigurationFile, new TypeReference<>() { });
             if (createConnectorRequest.config().containsKey(NAME_CONFIG)) {
                 if 
(!createConnectorRequest.config().get(NAME_CONFIG).equals(createConnectorRequest.name()))
 {
                     throw new ConnectException("Connector name configuration 
in 'config' doesn't match the one specified in 'name' at '" + filePath
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
index ff62c25eee5..430cad52b8f 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
@@ -81,8 +81,7 @@ public class ConnectMetrics {
                 .timeWindow(sampleWindowMs, TimeUnit.MILLISECONDS).recordLevel(
                         Sensor.RecordingLevel.forName(metricsRecordingLevel));
 
-        Map<String, Object> contextLabels = new HashMap<>();
-        
contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
+        Map<String, Object> contextLabels = new 
HashMap<>(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
         contextLabels.put(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID, clusterId);
         Object groupId = 
config.originals().get(DistributedConfig.GROUP_ID_CONFIG);
         if (groupId != null) {
@@ -391,8 +390,7 @@ public class ConnectMetrics {
         public synchronized Sensor sensor(String name, MetricConfig config, 
Sensor.RecordingLevel recordingLevel, Sensor... parents) {
             // We need to make sure that all sensor names are unique across 
all groups, so use the sensor prefix
             Sensor result = metrics.sensor(sensorPrefix + name, config, 
Long.MAX_VALUE, recordingLevel, parents);
-            if (result != null)
-                sensorNames.add(result.name());
+            sensorNames.add(result.name());
             return result;
         }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
index bcff615c414..d837776be38 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
@@ -321,7 +321,7 @@ class ExactlyOnceWorkerSourceTask extends 
AbstractWorkerSourceTask {
 
         error = flushError.get();
         if (error != null) {
-            recordCommitFailure(time.milliseconds() - started, null);
+            recordCommitFailure(time.milliseconds() - started);
             offsetWriter.cancelFlush();
             throw maybeWrapProducerSendException(
                     "Failed to flush offsets and/or records for task " + id,
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java
index 7c10f42148e..9dddec09ae3 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java
@@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Utility class that tracks the current state and the duration of time spent 
in each state.
- * This class is threadsafe.
+ * This class is thread-safe.
  */
 public class StateTracker {
 
@@ -60,7 +60,7 @@ public class StateTracker {
 
     /**
      * An immutable record of the accumulated times at the most recent state 
change. This class is required to
-     * efficiently make {@link StateTracker} threadsafe.
+     * efficiently make {@link StateTracker} thread-safe.
      */
     private static final class StateChange {
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 1f4e930ae5a..424de8f3de5 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -228,7 +228,7 @@ class WorkerSinkTask extends 
WorkerTask<ConsumerRecord<byte[], byte[]>, SinkReco
 
             // Maybe commit
             if (!committing && (context.isCommitRequested() || now >= 
nextCommit)) {
-                commitOffsets(now, false);
+                commitOffsets(now);
                 nextCommit = now + offsetCommitIntervalMs;
                 context.clearCommitRequest();
             }
@@ -282,7 +282,7 @@ class WorkerSinkTask extends 
WorkerTask<ConsumerRecord<byte[], byte[]>, SinkReco
                 log.error("{} Commit of offsets threw an unexpected exception 
for sequence number {}: {}",
                         this, seqno, committedOffsets, error);
                 commitFailures++;
-                recordCommitFailure(durationMillis, error);
+                recordCommitFailure(durationMillis);
             } else {
                 log.debug("{} Finished offset commit successfully in {} ms for 
sequence number {}: {}",
                         this, durationMillis, seqno, committedOffsets);
@@ -396,8 +396,8 @@ class WorkerSinkTask extends 
WorkerTask<ConsumerRecord<byte[], byte[]>, SinkReco
         }
     }
 
-    private void commitOffsets(long now, boolean closing) {
-        commitOffsets(now, closing, consumer.assignment());
+    private void commitOffsets(long now) {
+        commitOffsets(now, false, consumer.assignment());
     }
 
     private void commitOffsets(long now, boolean closing, 
Collection<TopicPartition> topicPartitions) {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 0d0eba32d86..55cc097083d 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -262,11 +262,11 @@ class WorkerSourceTask extends AbstractWorkerSourceTask {
             shouldFlush = offsetWriter.beginFlush(timeout - 
time.milliseconds(), TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
             log.warn("{} Interrupted while waiting for previous offset flush 
to complete, cancelling", this);
-            recordCommitFailure(time.milliseconds() - started, e);
+            recordCommitFailure(time.milliseconds() - started);
             return false;
         } catch (TimeoutException e) {
             log.warn("{} Timed out while waiting for previous offset flush to 
complete, cancelling", this);
-            recordCommitFailure(time.milliseconds() - started, e);
+            recordCommitFailure(time.milliseconds() - started);
             return false;
         }
         if (!shouldFlush) {
@@ -292,7 +292,7 @@ class WorkerSourceTask extends AbstractWorkerSourceTask {
         // any data
         if (flushFuture == null) {
             offsetWriter.cancelFlush();
-            recordCommitFailure(time.milliseconds() - started, null);
+            recordCommitFailure(time.milliseconds() - started);
             return false;
         }
         try {
@@ -304,17 +304,17 @@ class WorkerSourceTask extends AbstractWorkerSourceTask {
         } catch (InterruptedException e) {
             log.warn("{} Flush of offsets interrupted, cancelling", this);
             offsetWriter.cancelFlush();
-            recordCommitFailure(time.milliseconds() - started, e);
+            recordCommitFailure(time.milliseconds() - started);
             return false;
         } catch (ExecutionException e) {
             log.error("{} Flush of offsets threw an unexpected exception: ", 
this, e);
             offsetWriter.cancelFlush();
-            recordCommitFailure(time.milliseconds() - started, e);
+            recordCommitFailure(time.milliseconds() - started);
             return false;
         } catch (TimeoutException e) {
             log.error("{} Timed out waiting to flush offsets to storage; will 
try again on next flush interval with latest offsets", this);
             offsetWriter.cancelFlush();
-            recordCommitFailure(time.milliseconds() - started, null);
+            recordCommitFailure(time.milliseconds() - started);
             return false;
         }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index 98171fe47b6..9b70572fe24 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -356,17 +356,16 @@ abstract class WorkerTask<T, R extends ConnectRecord<R>> 
implements Runnable {
      * @param duration the length of time in milliseconds for the commit 
attempt to complete
      */
     protected void recordCommitSuccess(long duration) {
-        taskMetricsGroup.recordCommit(duration, true, null);
+        taskMetricsGroup.recordCommit(duration, true);
     }
 
     /**
      * Record that offsets have been committed.
      *
      * @param duration the length of time in milliseconds for the commit 
attempt to complete
-     * @param error the unexpected error that occurred; may be null in the 
case of timeouts or interruptions
      */
-    protected void recordCommitFailure(long duration, Throwable error) {
-        taskMetricsGroup.recordCommit(duration, false, error);
+    protected void recordCommitFailure(long duration) {
+        taskMetricsGroup.recordCommit(duration, false);
     }
 
     /**
@@ -434,7 +433,7 @@ abstract class WorkerTask<T, R extends ConnectRecord<R>> 
implements Runnable {
             metricGroup.close();
         }
 
-        void recordCommit(long duration, boolean success, Throwable error) {
+        void recordCommit(long duration, boolean success) {
             if (success) {
                 commitTime.record(duration);
                 commitAttempts.record(1.0d);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 69310a91bcc..ff7a9d3149d 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -1510,7 +1510,7 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
                         }
                         // Compute and send the response that this was accepted
                         Optional<RestartPlan> plan = buildRestartPlan(request);
-                        if (!plan.isPresent()) {
+                        if (plan.isEmpty()) {
                             callback.onCompletion(new 
NotFoundException("Status for connector " + connectorName + " not found", 
null), null);
                         } else {
                             callback.onCompletion(null, 
plan.get().restartConnectorStateInfo());
@@ -1558,7 +1558,7 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
     protected synchronized void doRestartConnectorAndTasks(RestartRequest 
request) {
         String connectorName = request.connectorName();
         Optional<RestartPlan> maybePlan = buildRestartPlan(request);
-        if (!maybePlan.isPresent()) {
+        if (maybePlan.isEmpty()) {
             log.debug("Skipping restart of connector '{}' since no status is 
available: {}", connectorName, request);
             return;
         }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index a3982f070a4..c89eb33082f 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -89,8 +89,7 @@ public class WorkerGroupMember {
                     .tags(metricsTags);
             List<MetricsReporter> reporters = 
CommonClientConfigs.metricsReporters(clientId, config);
 
-            Map<String, Object> contextLabels = new HashMap<>();
-            
contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
+            Map<String, Object> contextLabels = new 
HashMap<>(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
             contextLabels.put(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID, 
config.kafkaClusterId());
             contextLabels.put(WorkerConfig.CONNECT_GROUP_ID, 
config.getString(DistributedConfig.GROUP_ID_CONFIG));
             MetricsContext metricsContext = new 
KafkaMetricsContext(JMX_PREFIX, contextLabels);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java
index ed09d4a37a0..56f559dc245 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.transforms.predicates.Predicate;
 
-import java.io.PrintStream;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
@@ -53,29 +52,22 @@ public class PredicateDoc {
         .sorted(Comparator.comparing(docInfo -> docInfo.predicateName))
         .collect(Collectors.toList());
 
-    private static void printPredicateHtml(PrintStream out, DocInfo docInfo) {
-        out.println("<div id=\"" + docInfo.predicateName + "\">");
-
-        out.print("<h5>");
-        out.print("<a href=\"#" + docInfo.predicateName + "\">" + 
docInfo.predicateName + "</a>");
-        out.println("</h5>");
-
-        out.println(docInfo.overview);
-
-        out.println("<p/>");
-
-        out.println(docInfo.configDef.toHtml(6, key -> docInfo.predicateName + 
"_" + key));
-
-        out.println("</div>");
-    }
-
-    private static void printHtml(PrintStream out) {
+    private static String toHtml() {
+        StringBuilder b = new StringBuilder();
         for (final DocInfo docInfo : PREDICATES) {
-            printPredicateHtml(out, docInfo);
+            b.append("<div id=\"" + docInfo.predicateName + "\">\n");
+            b.append("<h5>");
+            b.append("<a href=\"#" + docInfo.predicateName + "\">" + 
docInfo.predicateName + "</a>");
+            b.append("</h5>\n");
+            b.append(docInfo.overview + "\n");
+            b.append("<p/>\n");
+            b.append(docInfo.configDef.toHtml(6, key -> docInfo.predicateName 
+ "_" + key) + "\n");
+            b.append("</div>\n");
         }
+        return b.toString();
     }
 
     public static void main(String... args) {
-        printHtml(System.out);
+        System.out.println(toHtml());
     }
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
index 2c7250eb588..100f938bd9b 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
@@ -34,7 +34,6 @@ import org.apache.kafka.connect.transforms.TimestampConverter;
 import org.apache.kafka.connect.transforms.TimestampRouter;
 import org.apache.kafka.connect.transforms.ValueToKey;
 
-import java.io.PrintStream;
 import java.util.Arrays;
 import java.util.List;
 
@@ -71,30 +70,23 @@ public class TransformationDoc {
             new DocInfo(ValueToKey.class.getName(), ValueToKey.OVERVIEW_DOC, 
ValueToKey.CONFIG_DEF)
     );
 
-    private static void printTransformationHtml(PrintStream out, DocInfo 
docInfo) {
-        out.println("<div id=\"" + docInfo.transformationName + "\">");
-
-        out.print("<h5>");
-        out.print("<a href=\"#" + docInfo.transformationName + "\">" + 
docInfo.transformationName + "</a>");
-        out.println("</h5>");
-
-        out.println(docInfo.overview);
-
-        out.println("<p/>");
-
-        out.println(docInfo.configDef.toHtml(6, key -> 
docInfo.transformationName + "_"  + key));
-
-        out.println("</div>");
-    }
-
-    private static void printHtml(PrintStream out) {
+    private static String toHtml() {
+        StringBuilder b = new StringBuilder();
         for (final DocInfo docInfo : TRANSFORMATIONS) {
-            printTransformationHtml(out, docInfo);
+            b.append("<div id=\"" + docInfo.transformationName + "\">\n");
+            b.append("<h5>");
+            b.append("<a href=\"#" + docInfo.transformationName + "\">" + 
docInfo.transformationName + "</a>");
+            b.append("</h5>\n");
+            b.append(docInfo.overview + "\n");
+            b.append("<p/>\n");
+            b.append(docInfo.configDef.toHtml(6, key -> 
docInfo.transformationName + "_"  + key) + "\n");
+            b.append("</div>\n");
         }
+        return b.toString();
     }
 
     public static void main(String... args) {
-        printHtml(System.out);
+        System.out.println(toHtml());
     }
 
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/Callback.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/Callback.java
index c09eba62a23..fd62fc172f4 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/Callback.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/Callback.java
@@ -32,7 +32,7 @@ public interface Callback<V> {
     }
 
     default <V2> Callback<V2> chainStaging(Callback<V2> chained) {
-        return new Callback<V2>() {
+        return new Callback<>() {
             @Override
             public void recordStage(Stage stage) {
                 Callback.this.recordStage(stage);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 23f6d8a9c49..bcc9f94b1fb 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -159,12 +159,12 @@ public class KafkaBasedLog<K, V> {
      * @param initializer        the function that should be run when this log 
is {@link #start() started}; may be null
      */
     public KafkaBasedLog(String topic,
-            Map<String, Object> producerConfigs,
-            Map<String, Object> consumerConfigs,
-            Supplier<TopicAdmin> topicAdminSupplier,
-            Callback<ConsumerRecord<K, V>> consumedCallback,
-            Time time,
-            java.util.function.Consumer<TopicAdmin> initializer) {
+                         Map<String, Object> producerConfigs,
+                         Map<String, Object> consumerConfigs,
+                         Supplier<TopicAdmin> topicAdminSupplier,
+                         Callback<ConsumerRecord<K, V>> consumedCallback,
+                         Time time,
+                         java.util.function.Consumer<TopicAdmin> initializer) {
         this.topic = topic;
         this.producerConfigs = producerConfigs;
         this.consumerConfigs = consumerConfigs;
@@ -212,7 +212,7 @@ public class KafkaBasedLog<K, V> {
     ) {
         Objects.requireNonNull(topicAdmin);
         Objects.requireNonNull(readTopicPartition);
-        return new KafkaBasedLog<K, V>(topic,
+        return new KafkaBasedLog<>(topic,
                 Collections.emptyMap(),
                 Collections.emptyMap(),
                 () -> topicAdmin,
@@ -266,8 +266,8 @@ public class KafkaBasedLog<K, V> {
 
         // Then create the producer and consumer
         producer = Optional.ofNullable(createProducer());
-        if (!producer.isPresent())
-            log.trace("Creating read-only KafkaBasedLog for topic " + topic);
+        if (producer.isEmpty())
+            log.trace("Creating read-only KafkaBasedLog for topic {}", topic);
         consumer = createConsumer();
 
         List<TopicPartition> partitions = new ArrayList<>();
@@ -308,13 +308,13 @@ public class KafkaBasedLog<K, V> {
         thread = new WorkThread();
         thread.start();
 
-        log.info("Finished reading KafkaBasedLog for topic " + topic);
+        log.info("Finished reading KafkaBasedLog for topic {}", topic);
 
-        log.info("Started KafkaBasedLog for topic " + topic);
+        log.info("Started KafkaBasedLog for topic {}", topic);
     }
 
     public void stop() {
-        log.info("Stopping KafkaBasedLog for topic " + topic);
+        log.info("Stopping KafkaBasedLog for topic {}", topic);
 
         synchronized (this) {
             stopRequested = true;
@@ -338,7 +338,7 @@ public class KafkaBasedLog<K, V> {
         // do not close the admin client, since we don't own it
         admin = null;
 
-        log.info("Stopped KafkaBasedLog for topic " + topic);
+        log.info("Stopped KafkaBasedLog for topic {}", topic);
     }
 
     /**
@@ -466,16 +466,16 @@ public class KafkaBasedLog<K, V> {
         return true;
     }
 
-    private void poll(long timeoutMs) {
+    private void poll() {
         try {
-            ConsumerRecords<K, V> records = 
consumer.poll(Duration.ofMillis(timeoutMs));
+            ConsumerRecords<K, V> records = 
consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
             for (ConsumerRecord<K, V> record : records)
                 consumedCallback.onCompletion(null, record);
         } catch (WakeupException e) {
             // Expected on get() or stop(). The calling code should handle this
             throw e;
         } catch (KafkaException e) {
-            log.error("Error polling: " + e);
+            log.error("Error polling: ", e);
             if (reportErrorsToCallback) {
                 consumedCallback.onCompletion(e, null);
             }
@@ -507,7 +507,7 @@ public class KafkaBasedLog<K, V> {
                 } else {
                     log.trace("Behind end offset {} for {}; last-read offset 
is {}",
                             endOffset, topicPartition, lastConsumedOffset);
-                    poll(Integer.MAX_VALUE);
+                    poll();
                     break;
                 }
             }
@@ -609,7 +609,7 @@ public class KafkaBasedLog<K, V> {
                     }
 
                     try {
-                        poll(Integer.MAX_VALUE);
+                        poll();
                     } catch (WakeupException e) {
                         // See previous comment, both possible causes of this 
wakeup are handled by starting this loop again
                         continue;
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
index a0a68e0e81e..cb8d51c0a43 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
@@ -59,7 +59,7 @@ public class RetryUtil {
 
     // visible for testing
     static <T> T retryUntilTimeout(Callable<T> callable, Supplier<String> 
description, Duration timeoutDuration, long retryBackoffMs, Time time) throws 
Exception {
-        // if null supplier or string is provided, the message will be default 
to "callabe"
+        // if null supplier or string is provided, the message will be default 
to "callable"
         final String descriptionStr = Optional.ofNullable(description)
                 .map(Supplier::get)
                 .orElse("callable");
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/SafeObjectInputStream.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/SafeObjectInputStream.java
index 0ad3889b5f0..df2da552780 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/SafeObjectInputStream.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/SafeObjectInputStream.java
@@ -20,29 +20,21 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectStreamClass;
-import java.util.Collections;
-import java.util.HashSet;
 import java.util.Set;
 
 public class SafeObjectInputStream extends ObjectInputStream {
 
-    protected static final Set<String> DEFAULT_NO_DESERIALIZE_CLASS_NAMES;
-
-    static {
-
-        Set<String> s = new HashSet<>();
-        s.add("org.apache.commons.collections.functors.InvokerTransformer");
-        
s.add("org.apache.commons.collections.functors.InstantiateTransformer");
-        s.add("org.apache.commons.collections4.functors.InvokerTransformer");
-        
s.add("org.apache.commons.collections4.functors.InstantiateTransformer");
-        s.add("org.codehaus.groovy.runtime.ConvertedClosure");
-        s.add("org.codehaus.groovy.runtime.MethodClosure");
-        s.add("org.springframework.beans.factory.ObjectFactory");
-        s.add("com.sun.org.apache.xalan.internal.xsltc.trax.TemplatesImpl");
-        s.add("org.apache.xalan.xsltc.trax.TemplatesImpl");
-        DEFAULT_NO_DESERIALIZE_CLASS_NAMES = Collections.unmodifiableSet(s);
-    }
-
+    protected static final Set<String> DEFAULT_NO_DESERIALIZE_CLASS_NAMES = 
Set.of(
+            "org.apache.commons.collections.functors.InvokerTransformer",
+            "org.apache.commons.collections.functors.InstantiateTransformer",
+            "org.apache.commons.collections4.functors.InvokerTransformer",
+            "org.apache.commons.collections4.functors.InstantiateTransformer",
+            "org.codehaus.groovy.runtime.ConvertedClosure",
+            "org.codehaus.groovy.runtime.MethodClosure",
+            "org.springframework.beans.factory.ObjectFactory",
+            "com.sun.org.apache.xalan.internal.xsltc.trax.TemplatesImpl",
+            "org.apache.xalan.xsltc.trax.TemplatesImpl"
+    );
 
     public SafeObjectInputStream(InputStream in) throws IOException {
         super(in);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index 0974f35d16c..f4374d18500 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -74,8 +74,6 @@ import org.mockito.quality.Strictness;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.time.Duration;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -408,15 +406,6 @@ public class ErrorHandlingTaskTest {
         assertEquals(expected, measured, 0.001d);
     }
 
-    private void verifyCloseSource() throws IOException {
-        verify(producer).close(any(Duration.class));
-        verify(admin).close(any(Duration.class));
-        verify(offsetReader).close();
-        verify(offsetStore).stop();
-        // headerConverter.close() can throw IOException
-        verify(headerConverter).close();
-    }
-
     private void expectTopicCreation(String topic) {
         if (enableTopicCreation) {
             
when(admin.describeTopics(topic)).thenReturn(Collections.emptyMap());
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 14e29cded9c..65262983d9f 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -2149,7 +2149,7 @@ public class WorkerTest {
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     @SuppressWarnings("unchecked")
-    public void testAlterOffsetsSourceConnectorError(boolean 
enableTopicCreation) throws Exception {
+    public void testAlterOffsetsSourceConnectorError(boolean 
enableTopicCreation) {
         setup(enableTopicCreation);
         mockKafkaClusterId();
         mockInternalConverters();
@@ -2188,7 +2188,7 @@ public class WorkerTest {
 
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
-    public void testNormalizeSourceConnectorOffsets(boolean 
enableTopicCreation) throws Exception {
+    public void testNormalizeSourceConnectorOffsets(boolean 
enableTopicCreation) {
         setup(enableTopicCreation);
         Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap(
                 Collections.singletonMap("filename", "/path/to/filename"),
@@ -2334,7 +2334,7 @@ public class WorkerTest {
 
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
-    public void testAlterOffsetsSinkConnectorAlterOffsetsError(boolean 
enableTopicCreation) throws Exception {
+    public void testAlterOffsetsSinkConnectorAlterOffsetsError(boolean 
enableTopicCreation) {
         setup(enableTopicCreation);
         mockKafkaClusterId();
         String connectorClass = SampleSinkConnector.class.getName();
@@ -2375,7 +2375,7 @@ public class WorkerTest {
 
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
-    public void testAlterOffsetsSinkConnectorDeleteOffsetsError(boolean 
enableTopicCreation) throws Exception {
+    public void testAlterOffsetsSinkConnectorDeleteOffsetsError(boolean 
enableTopicCreation) {
         setup(enableTopicCreation);
         mockKafkaClusterId();
         String connectorClass = SampleSinkConnector.class.getName();
@@ -2426,7 +2426,7 @@ public class WorkerTest {
 
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
-    public void testAlterOffsetsSinkConnectorSynchronousError(boolean 
enableTopicCreation) throws Exception {
+    public void testAlterOffsetsSinkConnectorSynchronousError(boolean 
enableTopicCreation) {
         setup(enableTopicCreation);
         mockKafkaClusterId();
         String connectorClass = SampleSinkConnector.class.getName();
@@ -2557,7 +2557,7 @@ public class WorkerTest {
 
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
-    public void testResetOffsetsSinkConnectorDeleteConsumerGroupError(boolean 
enableTopicCreation) throws Exception {
+    public void testResetOffsetsSinkConnectorDeleteConsumerGroupError(boolean 
enableTopicCreation) {
         setup(enableTopicCreation);
         mockKafkaClusterId();
         String connectorClass = SampleSinkConnector.class.getName();
@@ -2594,7 +2594,7 @@ public class WorkerTest {
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     @SuppressWarnings("unchecked")
-    public void testModifySourceConnectorOffsetsTimeout(boolean 
enableTopicCreation) throws Exception {
+    public void testModifySourceConnectorOffsetsTimeout(boolean 
enableTopicCreation) {
         setup(enableTopicCreation);
         mockKafkaClusterId();
         Time time = new MockTime();
@@ -2630,7 +2630,7 @@ public class WorkerTest {
 
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
-    public void testModifyOffsetsSinkConnectorTimeout(boolean 
enableTopicCreation) throws Exception {
+    public void testModifyOffsetsSinkConnectorTimeout(boolean 
enableTopicCreation) {
         setup(enableTopicCreation);
         mockKafkaClusterId();
         String connectorClass = SampleSinkConnector.class.getName();
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
index 462d02f3e6d..06b0e3fb55c 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.connect.runtime.distributed.ExtendedAssignment;
-import org.apache.kafka.connect.runtime.distributed.ExtendedWorkerState;
 import org.apache.kafka.connect.storage.AppliedConnectorConfig;
 import org.apache.kafka.connect.storage.ClusterConfigState;
 import org.apache.kafka.connect.util.ConnectorTaskId;
@@ -30,35 +29,11 @@ import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import static 
org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.WorkerLoad;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 
 public class WorkerTestUtils {
 
-    public static WorkerLoad emptyWorkerLoad(String worker) {
-        return new WorkerLoad.Builder(worker).build();
-    }
-
-    public WorkerLoad workerLoad(String worker, int connectorStart, int 
connectorNum,
-                                  int taskStart, int taskNum) {
-        return new WorkerLoad.Builder(worker).with(
-                newConnectors(connectorStart, connectorStart + connectorNum),
-                newTasks(taskStart, taskStart + taskNum)).build();
-    }
-
-    public static List<String> newConnectors(int start, int end) {
-        return IntStream.range(start, end)
-                .mapToObj(i -> "connector" + i)
-                .collect(Collectors.toList());
-    }
-
-    public static List<ConnectorTaskId> newTasks(int start, int end) {
-        return IntStream.range(start, end)
-                .mapToObj(i -> new ConnectorTaskId("task", i))
-                .collect(Collectors.toList());
-    }
-
     public static ClusterConfigState clusterConfigState(long offset,
                                                         int connectorNum,
                                                         int taskNum) {
@@ -82,24 +57,6 @@ public class WorkerTestUtils {
                 Collections.emptySet());
     }
 
-    public static Map<String, ExtendedWorkerState> memberConfigs(String 
givenLeader,
-                                                                 long 
givenOffset,
-                                                                 Map<String, 
ExtendedAssignment> givenAssignments) {
-        return givenAssignments.entrySet().stream()
-                .collect(Collectors.toMap(
-                    Map.Entry::getKey,
-                    e -> new 
ExtendedWorkerState(expectedLeaderUrl(givenLeader), givenOffset, 
e.getValue())));
-    }
-
-    public static Map<String, ExtendedWorkerState> memberConfigs(String 
givenLeader,
-                                                                 long 
givenOffset,
-                                                                 int start,
-                                                                 int 
connectorNum) {
-        return IntStream.range(start, connectorNum + 1)
-                .mapToObj(i -> new SimpleEntry<>("worker" + i, new 
ExtendedWorkerState(expectedLeaderUrl(givenLeader), givenOffset, null)))
-                .collect(Collectors.toMap(SimpleEntry::getKey, 
SimpleEntry::getValue));
-    }
-
     public static Map<String, Integer> connectorTaskCounts(int start,
                                                            int connectorNum,
                                                            int taskCounts) {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerConfigTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerConfigTest.java
index f76f0585f1a..e58444ccd4d 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerConfigTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerConfigTest.java
@@ -75,8 +75,6 @@ public class RestServerConfigTest {
         props.put(RestServerConfig.LISTENERS_CONFIG, "http://a.b:9999, 
https://a.b:7812";);
         config = RestServerConfig.forPublic(null, props);
         assertEquals(Arrays.asList("http://a.b:9999";, "https://a.b:7812";), 
config.listeners());
-
-        config = RestServerConfig.forPublic(null, props);
     }
 
     @Test
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index 392d082d330..700284a9c66 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -972,9 +972,4 @@ public class ConnectorsResourceTest {
         return expectAndCallbackException(cb, new NotLeaderException("not 
leader test", LEADER_URL));
     }
 
-    @FunctionalInterface
-    public interface RunnableWithThrowable<T> {
-        T run() throws Throwable;
-    }
-
 }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index d0968db29ce..4173d9a357c 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -164,7 +164,6 @@ public class KafkaConfigBackingStoreTest {
             new 
Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", 
SAMPLE_CONFIGS.get(1)),
             new 
Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", 
SAMPLE_CONFIGS.get(2))
     );
-    private static final Struct TARGET_STATE_STARTED = new 
Struct(KafkaConfigBackingStore.TARGET_STATE_V0).put("state", "STARTED");
     private static final Struct TARGET_STATE_PAUSED = new 
Struct(KafkaConfigBackingStore.TARGET_STATE_V1)
             .put("state", "PAUSED")
             .put("state.v2", "PAUSED");
@@ -1658,12 +1657,6 @@ public class KafkaConfigBackingStoreTest {
         }
     }
 
-    private void expectRead(final String key, final byte[] serializedValue, 
Struct deserializedValue) {
-        LinkedHashMap<String, byte[]> serializedData = new LinkedHashMap<>();
-        serializedData.put(key, serializedValue);
-        expectRead(serializedData, Collections.singletonMap(key, 
deserializedValue));
-    }
-
     // This map needs to maintain ordering
     private Answer<Future<Void>> expectReadToEnd(final Map<String, byte[]> 
serializedConfigs) {
         return invocation -> {


Reply via email to