This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4763d0fc30c KAFKA-17242: Do not log spurious timeout message for
MirrorCheckpointTask sync store startup (#16773)
4763d0fc30c is described below
commit 4763d0fc30cd1c669bbeee88478db26c3de56805
Author: Dmitry Werner <[email protected]>
AuthorDate: Thu Aug 8 19:03:42 2024 +0500
KAFKA-17242: Do not log spurious timeout message for MirrorCheckpointTask
sync store startup (#16773)
Reviewers: Chris Egerton <[email protected]>
---
.../kafka/connect/mirror/MirrorCheckpointTask.java | 2 +-
.../org/apache/kafka/connect/mirror/Scheduler.java | 18 +++++++++++-------
2 files changed, 12 insertions(+), 8 deletions(-)
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
index e3bd4e41b1b..e659c4aae79 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
@@ -109,7 +109,7 @@ public class MirrorCheckpointTask extends SourceTask {
idleConsumerGroupsOffset = new HashMap<>();
checkpointStore = new CheckpointStore(config, consumerGroups);
scheduler = new Scheduler(getClass(), config.entityLabel(),
config.adminTimeout());
- scheduler.execute(() -> {
+ scheduler.executeAsync(() -> {
// loading the stores are potentially long running operations, so
they run asynchronously
// to avoid blocking task::start (until a task has completed
starting it cannot be stopped)
boolean checkpointsReadOk = checkpointStore.start();
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java
index 9a39242d40b..432fdbaf3e8 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java
@@ -47,20 +47,20 @@ class Scheduler implements AutoCloseable {
if (interval.toMillis() < 0L) {
return;
}
- executor.scheduleAtFixedRate(() -> executeThread(task, description),
0, interval.toMillis(), TimeUnit.MILLISECONDS);
+ executor.scheduleAtFixedRate(() -> executeThread(task, description,
true), 0, interval.toMillis(), TimeUnit.MILLISECONDS);
}
void scheduleRepeatingDelayed(Task task, Duration interval, String
description) {
if (interval.toMillis() < 0L) {
return;
}
- executor.scheduleAtFixedRate(() -> executeThread(task, description),
interval.toMillis(),
+ executor.scheduleAtFixedRate(() -> executeThread(task, description,
true), interval.toMillis(),
interval.toMillis(), TimeUnit.MILLISECONDS);
}
void execute(Task task, String description) {
try {
- executor.submit(() -> executeThread(task,
description)).get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+ executor.submit(() -> executeThread(task, description,
true)).get(timeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.warn("{} was interrupted running task: {}", name, description);
} catch (TimeoutException e) {
@@ -70,6 +70,10 @@ class Scheduler implements AutoCloseable {
}
}
+ void executeAsync(Task task, String description) {
+ executor.submit(() -> executeThread(task, description, false));
+ }
+
public void close() {
closed = true;
executor.shutdown();
@@ -87,13 +91,13 @@ class Scheduler implements AutoCloseable {
void run() throws InterruptedException, ExecutionException;
}
- private void run(Task task, String description) {
+ private void run(Task task, String description, boolean checkTimeout) {
try {
long start = System.currentTimeMillis();
task.run();
long elapsed = System.currentTimeMillis() - start;
LOG.info("{} took {} ms", description, elapsed);
- if (elapsed > timeout.toMillis()) {
+ if (checkTimeout && elapsed > timeout.toMillis()) {
LOG.warn("{} took too long ({} ms) running task: {}", name,
elapsed, description);
}
} catch (InterruptedException e) {
@@ -103,12 +107,12 @@ class Scheduler implements AutoCloseable {
}
}
- private void executeThread(Task task, String description) {
+ private void executeThread(Task task, String description, boolean
checkTimeout) {
Thread.currentThread().setName(name + "-" + description);
if (closed) {
LOG.info("{} skipping task due to shutdown: {}", name,
description);
return;
}
- run(task, description);
+ run(task, description, checkTimeout);
}
}