This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6cffed56f6a KAFKA-16943: Synchronously verify Connect worker startup 
failure in InternalTopicsIntegrationTest (#16451)
6cffed56f6a is described below

commit 6cffed56f6a140d37bc0f210d4e786f78dc1631a
Author: Zhengke Zhou <[email protected]>
AuthorDate: Wed Jul 17 02:20:44 2024 +0800

    KAFKA-16943: Synchronously verify Connect worker startup failure in 
InternalTopicsIntegrationTest (#16451)
    
    Reviewers: Chris Egerton <[email protected]>
---
 .../java/org/apache/kafka/connect/runtime/Connect.java    | 15 +++++++++++++++
 .../connect/runtime/distributed/DistributedHerder.java    | 10 +++++++++-
 .../integration/InternalTopicsIntegrationTest.java        | 15 +++++++++++++--
 .../apache/kafka/connect/util/clusters/WorkerHandle.java  |  9 +++++++++
 4 files changed, 46 insertions(+), 3 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
index 1372106c9ff..d5de59f6a26 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
 import org.apache.kafka.connect.runtime.rest.ConnectRestServer;
 import org.apache.kafka.connect.runtime.rest.RestServer;
 
@@ -24,6 +25,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -34,6 +36,7 @@ public class Connect<H extends Herder> {
     private static final Logger log = LoggerFactory.getLogger(Connect.class);
 
     private final H herder;
+    private Future<?> herderTask;
     private final ConnectRestServer rest;
     private final CountDownLatch startLatch = new CountDownLatch(1);
     private final CountDownLatch stopLatch = new CountDownLatch(1);
@@ -47,6 +50,14 @@ public class Connect<H extends Herder> {
         shutdownHook = new ShutdownHook();
     }
 
+    /**
+     * Track task status which have been submitted to work thread.
+     * @return {@link DistributedHerder#herderTask} to track status or null if 
the herder type doesn't have a separate work thread
+     */
+    public Future<?> herderTask() {
+        return this.herderTask;
+    }
+
     public H herder() {
         return herder;
     }
@@ -59,6 +70,10 @@ public class Connect<H extends Herder> {
             herder.start();
             rest.initializeResources(herder);
 
+            if (herder instanceof DistributedHerder) {
+                herderTask = ((DistributedHerder) herder).herderTask();
+            }
+
             log.info("Kafka Connect started");
         } finally {
             startLatch.countDown();
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index cd5d847f446..d35b7ff43af 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -99,6 +99,7 @@ import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -232,6 +233,8 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
 
     private final DistributedConfig config;
 
+    private Future<?> herderTask;
+
     /**
      * Create a herder that will form a Connect cluster with other {@link 
DistributedHerder} instances (in this or other JVMs)
      * that have the same group ID.
@@ -363,7 +366,7 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
 
     @Override
     public void start() {
-        this.herderExecutor.submit(this);
+        herderTask = this.herderExecutor.submit(this);
     }
 
     @Override
@@ -396,6 +399,11 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
         }
     }
 
+    // public for testing
+    public Future<?> herderTask() {
+        return herderTask;
+    }
+
     // public for testing
     public void tick() {
         // The main loop does two primary things: 1) drive the group 
membership protocol, responding to rebalance events
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java
index 27a8611e364..ed7a786e1d6 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java
@@ -32,12 +32,16 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import javax.ws.rs.core.Response;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
@@ -152,11 +156,18 @@ public class InternalTopicsIntegrationTest {
                     "Body did not contain expected message detailing the 
worker's in-progress operation: " + body
             );
         }
+
         connect.resetRequestTimeout();
 
+        //Synchronously await and verify that the worker fails during startup
+        Future<?> herderTask = worker.herderTask();
+        assertThrows(
+                ExecutionException.class,
+                () -> herderTask.get(1, TimeUnit.MINUTES)
+        );
+
         // Verify that the offset and config topic don't exist;
-        // the status topic may have been created if timing was right but we 
don't care
-        // TODO: Synchronously await and verify that the worker fails during 
startup
+        // the status topic may have been created if timing was right, but we 
don't care
         log.info("Verifying the internal topics for Connect");
         connect.assertions().assertTopicsDoNotExist(configTopic(), 
offsetTopic());
     }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/WorkerHandle.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/WorkerHandle.java
index f650456da60..51a4bbb337a 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/WorkerHandle.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/WorkerHandle.java
@@ -23,6 +23,7 @@ import org.apache.kafka.connect.runtime.rest.RestServer;
 import java.net.URI;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.Future;
 
 /**
  * A handle to a worker executing in a Connect cluster.
@@ -37,6 +38,14 @@ public class WorkerHandle {
         this.worker = worker;
     }
 
+    /**
+     * Track the worker status during startup.
+     * @return {@link Connect#herderTask} to track or null
+     */
+    public Future<?> herderTask() {
+        return worker.herderTask();
+    }
+
     /**
      * Create and start a new worker with the given properties.
      *

Reply via email to