This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6cffed56f6a KAFKA-16943: Synchronously verify Connect worker startup
failure in InternalTopicsIntegrationTest (#16451)
6cffed56f6a is described below
commit 6cffed56f6a140d37bc0f210d4e786f78dc1631a
Author: Zhengke Zhou <[email protected]>
AuthorDate: Wed Jul 17 02:20:44 2024 +0800
KAFKA-16943: Synchronously verify Connect worker startup failure in
InternalTopicsIntegrationTest (#16451)
Reviewers: Chris Egerton <[email protected]>
---
.../java/org/apache/kafka/connect/runtime/Connect.java | 15 +++++++++++++++
.../connect/runtime/distributed/DistributedHerder.java | 10 +++++++++-
.../integration/InternalTopicsIntegrationTest.java | 15 +++++++++++++--
.../apache/kafka/connect/util/clusters/WorkerHandle.java | 9 +++++++++
4 files changed, 46 insertions(+), 3 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
index 1372106c9ff..d5de59f6a26 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
import org.apache.kafka.connect.runtime.rest.ConnectRestServer;
import org.apache.kafka.connect.runtime.rest.RestServer;
@@ -24,6 +25,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -34,6 +36,7 @@ public class Connect<H extends Herder> {
private static final Logger log = LoggerFactory.getLogger(Connect.class);
private final H herder;
+ private Future<?> herderTask;
private final ConnectRestServer rest;
private final CountDownLatch startLatch = new CountDownLatch(1);
private final CountDownLatch stopLatch = new CountDownLatch(1);
@@ -47,6 +50,14 @@ public class Connect<H extends Herder> {
shutdownHook = new ShutdownHook();
}
+ /**
+ * Track task status which have been submitted to work thread.
+ * @return {@link DistributedHerder#herderTask} to track status or null if
the herder type doesn't have a separate work thread
+ */
+ public Future<?> herderTask() {
+ return this.herderTask;
+ }
+
public H herder() {
return herder;
}
@@ -59,6 +70,10 @@ public class Connect<H extends Herder> {
herder.start();
rest.initializeResources(herder);
+ if (herder instanceof DistributedHerder) {
+ herderTask = ((DistributedHerder) herder).herderTask();
+ }
+
log.info("Kafka Connect started");
} finally {
startLatch.countDown();
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index cd5d847f446..d35b7ff43af 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -99,6 +99,7 @@ import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
@@ -232,6 +233,8 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
private final DistributedConfig config;
+ private Future<?> herderTask;
+
/**
* Create a herder that will form a Connect cluster with other {@link
DistributedHerder} instances (in this or other JVMs)
* that have the same group ID.
@@ -363,7 +366,7 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
@Override
public void start() {
- this.herderExecutor.submit(this);
+ herderTask = this.herderExecutor.submit(this);
}
@Override
@@ -396,6 +399,11 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
}
}
+ // public for testing
+ public Future<?> herderTask() {
+ return herderTask;
+ }
+
// public for testing
public void tick() {
// The main loop does two primary things: 1) drive the group
membership protocol, responding to rebalance events
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java
index 27a8611e364..ed7a786e1d6 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java
@@ -32,12 +32,16 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import javax.ws.rs.core.Response;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@@ -152,11 +156,18 @@ public class InternalTopicsIntegrationTest {
"Body did not contain expected message detailing the
worker's in-progress operation: " + body
);
}
+
connect.resetRequestTimeout();
+ //Synchronously await and verify that the worker fails during startup
+ Future<?> herderTask = worker.herderTask();
+ assertThrows(
+ ExecutionException.class,
+ () -> herderTask.get(1, TimeUnit.MINUTES)
+ );
+
// Verify that the offset and config topic don't exist;
- // the status topic may have been created if timing was right but we
don't care
- // TODO: Synchronously await and verify that the worker fails during
startup
+ // the status topic may have been created if timing was right, but we
don't care
log.info("Verifying the internal topics for Connect");
connect.assertions().assertTopicsDoNotExist(configTopic(),
offsetTopic());
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/WorkerHandle.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/WorkerHandle.java
index f650456da60..51a4bbb337a 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/WorkerHandle.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/WorkerHandle.java
@@ -23,6 +23,7 @@ import org.apache.kafka.connect.runtime.rest.RestServer;
import java.net.URI;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.Future;
/**
* A handle to a worker executing in a Connect cluster.
@@ -37,6 +38,14 @@ public class WorkerHandle {
this.worker = worker;
}
+ /**
+ * Track the worker status during startup.
+ * @return {@link Connect#herderTask} to track or null
+ */
+ public Future<?> herderTask() {
+ return worker.herderTask();
+ }
+
/**
* Create and start a new worker with the given properties.
*