This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4763d0fc30c KAFKA-17242: Do not log spurious timeout message for 
MirrorCheckpointTask sync store startup (#16773)
4763d0fc30c is described below

commit 4763d0fc30cd1c669bbeee88478db26c3de56805
Author: Dmitry Werner <[email protected]>
AuthorDate: Thu Aug 8 19:03:42 2024 +0500

    KAFKA-17242: Do not log spurious timeout message for MirrorCheckpointTask 
sync store startup (#16773)
    
    Reviewers: Chris Egerton <[email protected]>
---
 .../kafka/connect/mirror/MirrorCheckpointTask.java     |  2 +-
 .../org/apache/kafka/connect/mirror/Scheduler.java     | 18 +++++++++++-------
 2 files changed, 12 insertions(+), 8 deletions(-)

diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
index e3bd4e41b1b..e659c4aae79 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
@@ -109,7 +109,7 @@ public class MirrorCheckpointTask extends SourceTask {
         idleConsumerGroupsOffset = new HashMap<>();
         checkpointStore = new CheckpointStore(config, consumerGroups);
         scheduler = new Scheduler(getClass(), config.entityLabel(), 
config.adminTimeout());
-        scheduler.execute(() -> {
+        scheduler.executeAsync(() -> {
             // loading the stores are potentially long running operations, so 
they run asynchronously
             // to avoid blocking task::start (until a task has completed 
starting it cannot be stopped)
             boolean checkpointsReadOk = checkpointStore.start();
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java
index 9a39242d40b..432fdbaf3e8 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java
@@ -47,20 +47,20 @@ class Scheduler implements AutoCloseable {
         if (interval.toMillis() < 0L) {
             return;
         }
-        executor.scheduleAtFixedRate(() -> executeThread(task, description), 
0, interval.toMillis(), TimeUnit.MILLISECONDS);
+        executor.scheduleAtFixedRate(() -> executeThread(task, description, 
true), 0, interval.toMillis(), TimeUnit.MILLISECONDS);
     }
  
     void scheduleRepeatingDelayed(Task task, Duration interval, String 
description) {
         if (interval.toMillis() < 0L) {
             return;
         }
-        executor.scheduleAtFixedRate(() -> executeThread(task, description), 
interval.toMillis(),
+        executor.scheduleAtFixedRate(() -> executeThread(task, description, 
true), interval.toMillis(),
             interval.toMillis(), TimeUnit.MILLISECONDS);
     }
 
     void execute(Task task, String description) {
         try {
-            executor.submit(() -> executeThread(task, 
description)).get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+            executor.submit(() -> executeThread(task, description, 
true)).get(timeout.toMillis(), TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
             LOG.warn("{} was interrupted running task: {}", name, description);
         } catch (TimeoutException e) {
@@ -70,6 +70,10 @@ class Scheduler implements AutoCloseable {
         }
     } 
 
+    void executeAsync(Task task, String description) {
+        executor.submit(() -> executeThread(task, description, false));
+    }
+
     public void close() {
         closed = true;
         executor.shutdown();
@@ -87,13 +91,13 @@ class Scheduler implements AutoCloseable {
         void run() throws InterruptedException, ExecutionException;
     }
 
-    private void run(Task task, String description) {
+    private void run(Task task, String description, boolean checkTimeout) {
         try {
             long start = System.currentTimeMillis();
             task.run();
             long elapsed = System.currentTimeMillis() - start;
             LOG.info("{} took {} ms", description, elapsed);
-            if (elapsed > timeout.toMillis()) {
+            if (checkTimeout && elapsed > timeout.toMillis()) {
                 LOG.warn("{} took too long ({} ms) running task: {}", name, 
elapsed, description);
             }
         } catch (InterruptedException e) {
@@ -103,12 +107,12 @@ class Scheduler implements AutoCloseable {
         }
     }
 
-    private void executeThread(Task task, String description) {
+    private void executeThread(Task task, String description, boolean 
checkTimeout) {
         Thread.currentThread().setName(name + "-" + description);
         if (closed) {
             LOG.info("{} skipping task due to shutdown: {}", name, 
description);
             return;
         }
-        run(task, description);
+        run(task, description, checkTimeout);
     }
 }

Reply via email to