This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 45835a0e459 MINOR: Cleanup connect runtime module (#18074)
45835a0e459 is described below
commit 45835a0e459b3d1a7203d73c94e7b393bdc89dca
Author: Dmitry Werner <[email protected]>
AuthorDate: Tue Dec 10 22:25:38 2024 +0500
MINOR: Cleanup connect runtime module (#18074)
Reviewers: Mickael Maison <[email protected]>
---
.../kafka/connect/cli/ConnectStandalone.java | 7 +---
.../kafka/connect/runtime/ConnectMetrics.java | 6 +--
.../runtime/ExactlyOnceWorkerSourceTask.java | 2 +-
.../apache/kafka/connect/runtime/StateTracker.java | 4 +-
.../kafka/connect/runtime/WorkerSinkTask.java | 8 ++--
.../kafka/connect/runtime/WorkerSourceTask.java | 12 +++---
.../apache/kafka/connect/runtime/WorkerTask.java | 9 ++---
.../runtime/distributed/DistributedHerder.java | 4 +-
.../runtime/distributed/WorkerGroupMember.java | 3 +-
.../apache/kafka/connect/tools/PredicateDoc.java | 32 ++++++----------
.../kafka/connect/tools/TransformationDoc.java | 32 ++++++----------
.../org/apache/kafka/connect/util/Callback.java | 2 +-
.../apache/kafka/connect/util/KafkaBasedLog.java | 36 +++++++++---------
.../org/apache/kafka/connect/util/RetryUtil.java | 2 +-
.../kafka/connect/util/SafeObjectInputStream.java | 30 ++++++---------
.../connect/runtime/ErrorHandlingTaskTest.java | 11 ------
.../apache/kafka/connect/runtime/WorkerTest.java | 16 ++++----
.../kafka/connect/runtime/WorkerTestUtils.java | 43 ----------------------
.../connect/runtime/rest/RestServerConfigTest.java | 2 -
.../rest/resources/ConnectorsResourceTest.java | 5 ---
.../storage/KafkaConfigBackingStoreTest.java | 7 ----
21 files changed, 87 insertions(+), 186 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index 120e03c6f8e..43af6b274b6 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -121,9 +121,7 @@ public class ConnectStandalone extends
AbstractConnectCli<StandaloneHerder, Stan
File connectorConfigurationFile = Paths.get(filePath).toFile();
try {
- Map<String, String> connectorConfigs = objectMapper.readValue(
- connectorConfigurationFile,
- new TypeReference<Map<String, String>>() { });
+ Map<String, String> connectorConfigs =
objectMapper.readValue(connectorConfigurationFile, new TypeReference<>() { });
if (!connectorConfigs.containsKey(NAME_CONFIG)) {
throw new ConnectException("Connector configuration at '" +
filePath + "' is missing the mandatory '" + NAME_CONFIG + "' "
@@ -136,8 +134,7 @@ public class ConnectStandalone extends
AbstractConnectCli<StandaloneHerder, Stan
try {
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
- CreateConnectorRequest createConnectorRequest =
objectMapper.readValue(connectorConfigurationFile,
- new TypeReference<CreateConnectorRequest>() { });
+ CreateConnectorRequest createConnectorRequest =
objectMapper.readValue(connectorConfigurationFile, new TypeReference<>() { });
if (createConnectorRequest.config().containsKey(NAME_CONFIG)) {
if
(!createConnectorRequest.config().get(NAME_CONFIG).equals(createConnectorRequest.name()))
{
throw new ConnectException("Connector name configuration
in 'config' doesn't match the one specified in 'name' at '" + filePath
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
index ff62c25eee5..430cad52b8f 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
@@ -81,8 +81,7 @@ public class ConnectMetrics {
.timeWindow(sampleWindowMs, TimeUnit.MILLISECONDS).recordLevel(
Sensor.RecordingLevel.forName(metricsRecordingLevel));
- Map<String, Object> contextLabels = new HashMap<>();
-
contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
+ Map<String, Object> contextLabels = new
HashMap<>(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
contextLabels.put(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID, clusterId);
Object groupId =
config.originals().get(DistributedConfig.GROUP_ID_CONFIG);
if (groupId != null) {
@@ -391,8 +390,7 @@ public class ConnectMetrics {
public synchronized Sensor sensor(String name, MetricConfig config,
Sensor.RecordingLevel recordingLevel, Sensor... parents) {
// We need to make sure that all sensor names are unique across
all groups, so use the sensor prefix
Sensor result = metrics.sensor(sensorPrefix + name, config,
Long.MAX_VALUE, recordingLevel, parents);
- if (result != null)
- sensorNames.add(result.name());
+ sensorNames.add(result.name());
return result;
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
index bcff615c414..d837776be38 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
@@ -321,7 +321,7 @@ class ExactlyOnceWorkerSourceTask extends
AbstractWorkerSourceTask {
error = flushError.get();
if (error != null) {
- recordCommitFailure(time.milliseconds() - started, null);
+ recordCommitFailure(time.milliseconds() - started);
offsetWriter.cancelFlush();
throw maybeWrapProducerSendException(
"Failed to flush offsets and/or records for task " + id,
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java
index 7c10f42148e..9dddec09ae3 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java
@@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicReference;
/**
* Utility class that tracks the current state and the duration of time spent
in each state.
- * This class is threadsafe.
+ * This class is thread-safe.
*/
public class StateTracker {
@@ -60,7 +60,7 @@ public class StateTracker {
/**
* An immutable record of the accumulated times at the most recent state
change. This class is required to
- * efficiently make {@link StateTracker} threadsafe.
+ * efficiently make {@link StateTracker} thread-safe.
*/
private static final class StateChange {
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 1f4e930ae5a..424de8f3de5 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -228,7 +228,7 @@ class WorkerSinkTask extends
WorkerTask<ConsumerRecord<byte[], byte[]>, SinkReco
// Maybe commit
if (!committing && (context.isCommitRequested() || now >=
nextCommit)) {
- commitOffsets(now, false);
+ commitOffsets(now);
nextCommit = now + offsetCommitIntervalMs;
context.clearCommitRequest();
}
@@ -282,7 +282,7 @@ class WorkerSinkTask extends
WorkerTask<ConsumerRecord<byte[], byte[]>, SinkReco
log.error("{} Commit of offsets threw an unexpected exception
for sequence number {}: {}",
this, seqno, committedOffsets, error);
commitFailures++;
- recordCommitFailure(durationMillis, error);
+ recordCommitFailure(durationMillis);
} else {
log.debug("{} Finished offset commit successfully in {} ms for
sequence number {}: {}",
this, durationMillis, seqno, committedOffsets);
@@ -396,8 +396,8 @@ class WorkerSinkTask extends
WorkerTask<ConsumerRecord<byte[], byte[]>, SinkReco
}
}
- private void commitOffsets(long now, boolean closing) {
- commitOffsets(now, closing, consumer.assignment());
+ private void commitOffsets(long now) {
+ commitOffsets(now, false, consumer.assignment());
}
private void commitOffsets(long now, boolean closing,
Collection<TopicPartition> topicPartitions) {
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 0d0eba32d86..55cc097083d 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -262,11 +262,11 @@ class WorkerSourceTask extends AbstractWorkerSourceTask {
shouldFlush = offsetWriter.beginFlush(timeout -
time.milliseconds(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.warn("{} Interrupted while waiting for previous offset flush
to complete, cancelling", this);
- recordCommitFailure(time.milliseconds() - started, e);
+ recordCommitFailure(time.milliseconds() - started);
return false;
} catch (TimeoutException e) {
log.warn("{} Timed out while waiting for previous offset flush to
complete, cancelling", this);
- recordCommitFailure(time.milliseconds() - started, e);
+ recordCommitFailure(time.milliseconds() - started);
return false;
}
if (!shouldFlush) {
@@ -292,7 +292,7 @@ class WorkerSourceTask extends AbstractWorkerSourceTask {
// any data
if (flushFuture == null) {
offsetWriter.cancelFlush();
- recordCommitFailure(time.milliseconds() - started, null);
+ recordCommitFailure(time.milliseconds() - started);
return false;
}
try {
@@ -304,17 +304,17 @@ class WorkerSourceTask extends AbstractWorkerSourceTask {
} catch (InterruptedException e) {
log.warn("{} Flush of offsets interrupted, cancelling", this);
offsetWriter.cancelFlush();
- recordCommitFailure(time.milliseconds() - started, e);
+ recordCommitFailure(time.milliseconds() - started);
return false;
} catch (ExecutionException e) {
log.error("{} Flush of offsets threw an unexpected exception: ",
this, e);
offsetWriter.cancelFlush();
- recordCommitFailure(time.milliseconds() - started, e);
+ recordCommitFailure(time.milliseconds() - started);
return false;
} catch (TimeoutException e) {
log.error("{} Timed out waiting to flush offsets to storage; will
try again on next flush interval with latest offsets", this);
offsetWriter.cancelFlush();
- recordCommitFailure(time.milliseconds() - started, null);
+ recordCommitFailure(time.milliseconds() - started);
return false;
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index 98171fe47b6..9b70572fe24 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -356,17 +356,16 @@ abstract class WorkerTask<T, R extends ConnectRecord<R>>
implements Runnable {
* @param duration the length of time in milliseconds for the commit
attempt to complete
*/
protected void recordCommitSuccess(long duration) {
- taskMetricsGroup.recordCommit(duration, true, null);
+ taskMetricsGroup.recordCommit(duration, true);
}
/**
* Record that offsets have been committed.
*
* @param duration the length of time in milliseconds for the commit
attempt to complete
- * @param error the unexpected error that occurred; may be null in the
case of timeouts or interruptions
*/
- protected void recordCommitFailure(long duration, Throwable error) {
- taskMetricsGroup.recordCommit(duration, false, error);
+ protected void recordCommitFailure(long duration) {
+ taskMetricsGroup.recordCommit(duration, false);
}
/**
@@ -434,7 +433,7 @@ abstract class WorkerTask<T, R extends ConnectRecord<R>>
implements Runnable {
metricGroup.close();
}
- void recordCommit(long duration, boolean success, Throwable error) {
+ void recordCommit(long duration, boolean success) {
if (success) {
commitTime.record(duration);
commitAttempts.record(1.0d);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 69310a91bcc..ff7a9d3149d 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -1510,7 +1510,7 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
}
// Compute and send the response that this was accepted
Optional<RestartPlan> plan = buildRestartPlan(request);
- if (!plan.isPresent()) {
+ if (plan.isEmpty()) {
callback.onCompletion(new
NotFoundException("Status for connector " + connectorName + " not found",
null), null);
} else {
callback.onCompletion(null,
plan.get().restartConnectorStateInfo());
@@ -1558,7 +1558,7 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
protected synchronized void doRestartConnectorAndTasks(RestartRequest
request) {
String connectorName = request.connectorName();
Optional<RestartPlan> maybePlan = buildRestartPlan(request);
- if (!maybePlan.isPresent()) {
+ if (maybePlan.isEmpty()) {
log.debug("Skipping restart of connector '{}' since no status is
available: {}", connectorName, request);
return;
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index a3982f070a4..c89eb33082f 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -89,8 +89,7 @@ public class WorkerGroupMember {
.tags(metricsTags);
List<MetricsReporter> reporters =
CommonClientConfigs.metricsReporters(clientId, config);
- Map<String, Object> contextLabels = new HashMap<>();
-
contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
+ Map<String, Object> contextLabels = new
HashMap<>(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
contextLabels.put(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID,
config.kafkaClusterId());
contextLabels.put(WorkerConfig.CONNECT_GROUP_ID,
config.getString(DistributedConfig.GROUP_ID_CONFIG));
MetricsContext metricsContext = new
KafkaMetricsContext(JMX_PREFIX, contextLabels);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java
index ed09d4a37a0..56f559dc245 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.transforms.predicates.Predicate;
-import java.io.PrintStream;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
@@ -53,29 +52,22 @@ public class PredicateDoc {
.sorted(Comparator.comparing(docInfo -> docInfo.predicateName))
.collect(Collectors.toList());
- private static void printPredicateHtml(PrintStream out, DocInfo docInfo) {
- out.println("<div id=\"" + docInfo.predicateName + "\">");
-
- out.print("<h5>");
- out.print("<a href=\"#" + docInfo.predicateName + "\">" +
docInfo.predicateName + "</a>");
- out.println("</h5>");
-
- out.println(docInfo.overview);
-
- out.println("<p/>");
-
- out.println(docInfo.configDef.toHtml(6, key -> docInfo.predicateName +
"_" + key));
-
- out.println("</div>");
- }
-
- private static void printHtml(PrintStream out) {
+ private static String toHtml() {
+ StringBuilder b = new StringBuilder();
for (final DocInfo docInfo : PREDICATES) {
- printPredicateHtml(out, docInfo);
+ b.append("<div id=\"" + docInfo.predicateName + "\">\n");
+ b.append("<h5>");
+ b.append("<a href=\"#" + docInfo.predicateName + "\">" +
docInfo.predicateName + "</a>");
+ b.append("</h5>\n");
+ b.append(docInfo.overview + "\n");
+ b.append("<p/>\n");
+ b.append(docInfo.configDef.toHtml(6, key -> docInfo.predicateName
+ "_" + key) + "\n");
+ b.append("</div>\n");
}
+ return b.toString();
}
public static void main(String... args) {
- printHtml(System.out);
+ System.out.println(toHtml());
}
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
index 2c7250eb588..100f938bd9b 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
@@ -34,7 +34,6 @@ import org.apache.kafka.connect.transforms.TimestampConverter;
import org.apache.kafka.connect.transforms.TimestampRouter;
import org.apache.kafka.connect.transforms.ValueToKey;
-import java.io.PrintStream;
import java.util.Arrays;
import java.util.List;
@@ -71,30 +70,23 @@ public class TransformationDoc {
new DocInfo(ValueToKey.class.getName(), ValueToKey.OVERVIEW_DOC,
ValueToKey.CONFIG_DEF)
);
- private static void printTransformationHtml(PrintStream out, DocInfo
docInfo) {
- out.println("<div id=\"" + docInfo.transformationName + "\">");
-
- out.print("<h5>");
- out.print("<a href=\"#" + docInfo.transformationName + "\">" +
docInfo.transformationName + "</a>");
- out.println("</h5>");
-
- out.println(docInfo.overview);
-
- out.println("<p/>");
-
- out.println(docInfo.configDef.toHtml(6, key ->
docInfo.transformationName + "_" + key));
-
- out.println("</div>");
- }
-
- private static void printHtml(PrintStream out) {
+ private static String toHtml() {
+ StringBuilder b = new StringBuilder();
for (final DocInfo docInfo : TRANSFORMATIONS) {
- printTransformationHtml(out, docInfo);
+ b.append("<div id=\"" + docInfo.transformationName + "\">\n");
+ b.append("<h5>");
+ b.append("<a href=\"#" + docInfo.transformationName + "\">" +
docInfo.transformationName + "</a>");
+ b.append("</h5>\n");
+ b.append(docInfo.overview + "\n");
+ b.append("<p/>\n");
+ b.append(docInfo.configDef.toHtml(6, key ->
docInfo.transformationName + "_" + key) + "\n");
+ b.append("</div>\n");
}
+ return b.toString();
}
public static void main(String... args) {
- printHtml(System.out);
+ System.out.println(toHtml());
}
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/Callback.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/Callback.java
index c09eba62a23..fd62fc172f4 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/Callback.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/Callback.java
@@ -32,7 +32,7 @@ public interface Callback<V> {
}
default <V2> Callback<V2> chainStaging(Callback<V2> chained) {
- return new Callback<V2>() {
+ return new Callback<>() {
@Override
public void recordStage(Stage stage) {
Callback.this.recordStage(stage);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 23f6d8a9c49..bcc9f94b1fb 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -159,12 +159,12 @@ public class KafkaBasedLog<K, V> {
* @param initializer the function that should be run when this log
is {@link #start() started}; may be null
*/
public KafkaBasedLog(String topic,
- Map<String, Object> producerConfigs,
- Map<String, Object> consumerConfigs,
- Supplier<TopicAdmin> topicAdminSupplier,
- Callback<ConsumerRecord<K, V>> consumedCallback,
- Time time,
- java.util.function.Consumer<TopicAdmin> initializer) {
+ Map<String, Object> producerConfigs,
+ Map<String, Object> consumerConfigs,
+ Supplier<TopicAdmin> topicAdminSupplier,
+ Callback<ConsumerRecord<K, V>> consumedCallback,
+ Time time,
+ java.util.function.Consumer<TopicAdmin> initializer) {
this.topic = topic;
this.producerConfigs = producerConfigs;
this.consumerConfigs = consumerConfigs;
@@ -212,7 +212,7 @@ public class KafkaBasedLog<K, V> {
) {
Objects.requireNonNull(topicAdmin);
Objects.requireNonNull(readTopicPartition);
- return new KafkaBasedLog<K, V>(topic,
+ return new KafkaBasedLog<>(topic,
Collections.emptyMap(),
Collections.emptyMap(),
() -> topicAdmin,
@@ -266,8 +266,8 @@ public class KafkaBasedLog<K, V> {
// Then create the producer and consumer
producer = Optional.ofNullable(createProducer());
- if (!producer.isPresent())
- log.trace("Creating read-only KafkaBasedLog for topic " + topic);
+ if (producer.isEmpty())
+ log.trace("Creating read-only KafkaBasedLog for topic {}", topic);
consumer = createConsumer();
List<TopicPartition> partitions = new ArrayList<>();
@@ -308,13 +308,13 @@ public class KafkaBasedLog<K, V> {
thread = new WorkThread();
thread.start();
- log.info("Finished reading KafkaBasedLog for topic " + topic);
+ log.info("Finished reading KafkaBasedLog for topic {}", topic);
- log.info("Started KafkaBasedLog for topic " + topic);
+ log.info("Started KafkaBasedLog for topic {}", topic);
}
public void stop() {
- log.info("Stopping KafkaBasedLog for topic " + topic);
+ log.info("Stopping KafkaBasedLog for topic {}", topic);
synchronized (this) {
stopRequested = true;
@@ -338,7 +338,7 @@ public class KafkaBasedLog<K, V> {
// do not close the admin client, since we don't own it
admin = null;
- log.info("Stopped KafkaBasedLog for topic " + topic);
+ log.info("Stopped KafkaBasedLog for topic {}", topic);
}
/**
@@ -466,16 +466,16 @@ public class KafkaBasedLog<K, V> {
return true;
}
- private void poll(long timeoutMs) {
+ private void poll() {
try {
- ConsumerRecords<K, V> records =
consumer.poll(Duration.ofMillis(timeoutMs));
+ ConsumerRecords<K, V> records =
consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
for (ConsumerRecord<K, V> record : records)
consumedCallback.onCompletion(null, record);
} catch (WakeupException e) {
// Expected on get() or stop(). The calling code should handle this
throw e;
} catch (KafkaException e) {
- log.error("Error polling: " + e);
+ log.error("Error polling: ", e);
if (reportErrorsToCallback) {
consumedCallback.onCompletion(e, null);
}
@@ -507,7 +507,7 @@ public class KafkaBasedLog<K, V> {
} else {
log.trace("Behind end offset {} for {}; last-read offset
is {}",
endOffset, topicPartition, lastConsumedOffset);
- poll(Integer.MAX_VALUE);
+ poll();
break;
}
}
@@ -609,7 +609,7 @@ public class KafkaBasedLog<K, V> {
}
try {
- poll(Integer.MAX_VALUE);
+ poll();
} catch (WakeupException e) {
// See previous comment, both possible causes of this
wakeup are handled by starting this loop again
continue;
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
index a0a68e0e81e..cb8d51c0a43 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
@@ -59,7 +59,7 @@ public class RetryUtil {
// visible for testing
static <T> T retryUntilTimeout(Callable<T> callable, Supplier<String>
description, Duration timeoutDuration, long retryBackoffMs, Time time) throws
Exception {
- // if null supplier or string is provided, the message will be default
to "callabe"
+ // if null supplier or string is provided, the message will be default
to "callable"
final String descriptionStr = Optional.ofNullable(description)
.map(Supplier::get)
.orElse("callable");
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/SafeObjectInputStream.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/SafeObjectInputStream.java
index 0ad3889b5f0..df2da552780 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/SafeObjectInputStream.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/SafeObjectInputStream.java
@@ -20,29 +20,21 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectStreamClass;
-import java.util.Collections;
-import java.util.HashSet;
import java.util.Set;
public class SafeObjectInputStream extends ObjectInputStream {
- protected static final Set<String> DEFAULT_NO_DESERIALIZE_CLASS_NAMES;
-
- static {
-
- Set<String> s = new HashSet<>();
- s.add("org.apache.commons.collections.functors.InvokerTransformer");
-
s.add("org.apache.commons.collections.functors.InstantiateTransformer");
- s.add("org.apache.commons.collections4.functors.InvokerTransformer");
-
s.add("org.apache.commons.collections4.functors.InstantiateTransformer");
- s.add("org.codehaus.groovy.runtime.ConvertedClosure");
- s.add("org.codehaus.groovy.runtime.MethodClosure");
- s.add("org.springframework.beans.factory.ObjectFactory");
- s.add("com.sun.org.apache.xalan.internal.xsltc.trax.TemplatesImpl");
- s.add("org.apache.xalan.xsltc.trax.TemplatesImpl");
- DEFAULT_NO_DESERIALIZE_CLASS_NAMES = Collections.unmodifiableSet(s);
- }
-
+ protected static final Set<String> DEFAULT_NO_DESERIALIZE_CLASS_NAMES =
Set.of(
+ "org.apache.commons.collections.functors.InvokerTransformer",
+ "org.apache.commons.collections.functors.InstantiateTransformer",
+ "org.apache.commons.collections4.functors.InvokerTransformer",
+ "org.apache.commons.collections4.functors.InstantiateTransformer",
+ "org.codehaus.groovy.runtime.ConvertedClosure",
+ "org.codehaus.groovy.runtime.MethodClosure",
+ "org.springframework.beans.factory.ObjectFactory",
+ "com.sun.org.apache.xalan.internal.xsltc.trax.TemplatesImpl",
+ "org.apache.xalan.xsltc.trax.TemplatesImpl"
+ );
public SafeObjectInputStream(InputStream in) throws IOException {
super(in);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index 0974f35d16c..f4374d18500 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -74,8 +74,6 @@ import org.mockito.quality.Strictness;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -408,15 +406,6 @@ public class ErrorHandlingTaskTest {
assertEquals(expected, measured, 0.001d);
}
- private void verifyCloseSource() throws IOException {
- verify(producer).close(any(Duration.class));
- verify(admin).close(any(Duration.class));
- verify(offsetReader).close();
- verify(offsetStore).stop();
- // headerConverter.close() can throw IOException
- verify(headerConverter).close();
- }
-
private void expectTopicCreation(String topic) {
if (enableTopicCreation) {
when(admin.describeTopics(topic)).thenReturn(Collections.emptyMap());
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 14e29cded9c..65262983d9f 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -2149,7 +2149,7 @@ public class WorkerTest {
@ParameterizedTest
@ValueSource(booleans = {true, false})
@SuppressWarnings("unchecked")
- public void testAlterOffsetsSourceConnectorError(boolean
enableTopicCreation) throws Exception {
+ public void testAlterOffsetsSourceConnectorError(boolean
enableTopicCreation) {
setup(enableTopicCreation);
mockKafkaClusterId();
mockInternalConverters();
@@ -2188,7 +2188,7 @@ public class WorkerTest {
@ParameterizedTest
@ValueSource(booleans = {true, false})
- public void testNormalizeSourceConnectorOffsets(boolean
enableTopicCreation) throws Exception {
+ public void testNormalizeSourceConnectorOffsets(boolean
enableTopicCreation) {
setup(enableTopicCreation);
Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap(
Collections.singletonMap("filename", "/path/to/filename"),
@@ -2334,7 +2334,7 @@ public class WorkerTest {
@ParameterizedTest
@ValueSource(booleans = {true, false})
- public void testAlterOffsetsSinkConnectorAlterOffsetsError(boolean
enableTopicCreation) throws Exception {
+ public void testAlterOffsetsSinkConnectorAlterOffsetsError(boolean
enableTopicCreation) {
setup(enableTopicCreation);
mockKafkaClusterId();
String connectorClass = SampleSinkConnector.class.getName();
@@ -2375,7 +2375,7 @@ public class WorkerTest {
@ParameterizedTest
@ValueSource(booleans = {true, false})
- public void testAlterOffsetsSinkConnectorDeleteOffsetsError(boolean
enableTopicCreation) throws Exception {
+ public void testAlterOffsetsSinkConnectorDeleteOffsetsError(boolean
enableTopicCreation) {
setup(enableTopicCreation);
mockKafkaClusterId();
String connectorClass = SampleSinkConnector.class.getName();
@@ -2426,7 +2426,7 @@ public class WorkerTest {
@ParameterizedTest
@ValueSource(booleans = {true, false})
- public void testAlterOffsetsSinkConnectorSynchronousError(boolean
enableTopicCreation) throws Exception {
+ public void testAlterOffsetsSinkConnectorSynchronousError(boolean
enableTopicCreation) {
setup(enableTopicCreation);
mockKafkaClusterId();
String connectorClass = SampleSinkConnector.class.getName();
@@ -2557,7 +2557,7 @@ public class WorkerTest {
@ParameterizedTest
@ValueSource(booleans = {true, false})
- public void testResetOffsetsSinkConnectorDeleteConsumerGroupError(boolean
enableTopicCreation) throws Exception {
+ public void testResetOffsetsSinkConnectorDeleteConsumerGroupError(boolean
enableTopicCreation) {
setup(enableTopicCreation);
mockKafkaClusterId();
String connectorClass = SampleSinkConnector.class.getName();
@@ -2594,7 +2594,7 @@ public class WorkerTest {
@ParameterizedTest
@ValueSource(booleans = {true, false})
@SuppressWarnings("unchecked")
- public void testModifySourceConnectorOffsetsTimeout(boolean
enableTopicCreation) throws Exception {
+ public void testModifySourceConnectorOffsetsTimeout(boolean
enableTopicCreation) {
setup(enableTopicCreation);
mockKafkaClusterId();
Time time = new MockTime();
@@ -2630,7 +2630,7 @@ public class WorkerTest {
@ParameterizedTest
@ValueSource(booleans = {true, false})
- public void testModifyOffsetsSinkConnectorTimeout(boolean
enableTopicCreation) throws Exception {
+ public void testModifyOffsetsSinkConnectorTimeout(boolean
enableTopicCreation) {
setup(enableTopicCreation);
mockKafkaClusterId();
String connectorClass = SampleSinkConnector.class.getName();
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
index 462d02f3e6d..06b0e3fb55c 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
@@ -17,7 +17,6 @@
package org.apache.kafka.connect.runtime;
import org.apache.kafka.connect.runtime.distributed.ExtendedAssignment;
-import org.apache.kafka.connect.runtime.distributed.ExtendedWorkerState;
import org.apache.kafka.connect.storage.AppliedConnectorConfig;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.util.ConnectorTaskId;
@@ -30,35 +29,11 @@ import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import static
org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.WorkerLoad;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
public class WorkerTestUtils {
- public static WorkerLoad emptyWorkerLoad(String worker) {
- return new WorkerLoad.Builder(worker).build();
- }
-
- public WorkerLoad workerLoad(String worker, int connectorStart, int
connectorNum,
- int taskStart, int taskNum) {
- return new WorkerLoad.Builder(worker).with(
- newConnectors(connectorStart, connectorStart + connectorNum),
- newTasks(taskStart, taskStart + taskNum)).build();
- }
-
- public static List<String> newConnectors(int start, int end) {
- return IntStream.range(start, end)
- .mapToObj(i -> "connector" + i)
- .collect(Collectors.toList());
- }
-
- public static List<ConnectorTaskId> newTasks(int start, int end) {
- return IntStream.range(start, end)
- .mapToObj(i -> new ConnectorTaskId("task", i))
- .collect(Collectors.toList());
- }
-
public static ClusterConfigState clusterConfigState(long offset,
int connectorNum,
int taskNum) {
@@ -82,24 +57,6 @@ public class WorkerTestUtils {
Collections.emptySet());
}
- public static Map<String, ExtendedWorkerState> memberConfigs(String
givenLeader,
- long
givenOffset,
- Map<String,
ExtendedAssignment> givenAssignments) {
- return givenAssignments.entrySet().stream()
- .collect(Collectors.toMap(
- Map.Entry::getKey,
- e -> new
ExtendedWorkerState(expectedLeaderUrl(givenLeader), givenOffset,
e.getValue())));
- }
-
- public static Map<String, ExtendedWorkerState> memberConfigs(String
givenLeader,
- long
givenOffset,
- int start,
- int
connectorNum) {
- return IntStream.range(start, connectorNum + 1)
- .mapToObj(i -> new SimpleEntry<>("worker" + i, new
ExtendedWorkerState(expectedLeaderUrl(givenLeader), givenOffset, null)))
- .collect(Collectors.toMap(SimpleEntry::getKey,
SimpleEntry::getValue));
- }
-
public static Map<String, Integer> connectorTaskCounts(int start,
int connectorNum,
int taskCounts) {
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerConfigTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerConfigTest.java
index f76f0585f1a..e58444ccd4d 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerConfigTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerConfigTest.java
@@ -75,8 +75,6 @@ public class RestServerConfigTest {
props.put(RestServerConfig.LISTENERS_CONFIG, "http://a.b:9999,
https://a.b:7812");
config = RestServerConfig.forPublic(null, props);
assertEquals(Arrays.asList("http://a.b:9999", "https://a.b:7812"),
config.listeners());
-
- config = RestServerConfig.forPublic(null, props);
}
@Test
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index 392d082d330..700284a9c66 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -972,9 +972,4 @@ public class ConnectorsResourceTest {
return expectAndCallbackException(cb, new NotLeaderException("not
leader test", LEADER_URL));
}
- @FunctionalInterface
- public interface RunnableWithThrowable<T> {
- T run() throws Throwable;
- }
-
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index d0968db29ce..4173d9a357c 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -164,7 +164,6 @@ public class KafkaConfigBackingStoreTest {
new
Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties",
SAMPLE_CONFIGS.get(1)),
new
Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties",
SAMPLE_CONFIGS.get(2))
);
- private static final Struct TARGET_STATE_STARTED = new
Struct(KafkaConfigBackingStore.TARGET_STATE_V0).put("state", "STARTED");
private static final Struct TARGET_STATE_PAUSED = new
Struct(KafkaConfigBackingStore.TARGET_STATE_V1)
.put("state", "PAUSED")
.put("state.v2", "PAUSED");
@@ -1658,12 +1657,6 @@ public class KafkaConfigBackingStoreTest {
}
}
- private void expectRead(final String key, final byte[] serializedValue,
Struct deserializedValue) {
- LinkedHashMap<String, byte[]> serializedData = new LinkedHashMap<>();
- serializedData.put(key, serializedValue);
- expectRead(serializedData, Collections.singletonMap(key,
deserializedValue));
- }
-
// This map needs to maintain ordering
private Answer<Future<Void>> expectReadToEnd(final Map<String, byte[]>
serializedConfigs) {
return invocation -> {