This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 73e8d5dd5b MINOR: Remove unused ShutdownableThread class and
ineffective ThreadedTest class (#12410)
73e8d5dd5b is described below
commit 73e8d5dd5b2344a2c494cef59929b8e187fa68ec
Author: Chris Egerton <[email protected]>
AuthorDate: Thu Aug 18 06:08:03 2022 -0400
MINOR: Remove unused ShutdownableThread class and ineffective ThreadedTest
class (#12410)
Reviewers: Mickael Maison <[email protected]>, Christo Lolov
<[email protected]>
---
.../kafka/connect/util/ShutdownableThread.java | 144 ---------------------
.../runtime/ExactlyOnceWorkerSourceTaskTest.java | 7 +-
.../runtime/SourceTaskOffsetCommitterTest.java | 7 +-
.../runtime/WorkerSinkTaskThreadedTest.java | 7 +-
.../connect/runtime/WorkerSourceTaskTest.java | 7 +-
.../apache/kafka/connect/runtime/WorkerTest.java | 5 +-
.../runtime/distributed/DistributedHerderTest.java | 10 +-
.../kafka/connect/util/ShutdownableThreadTest.java | 71 ----------
.../apache/kafka/connect/util/ThreadedTest.java | 42 ------
9 files changed, 19 insertions(+), 281 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ShutdownableThread.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ShutdownableThread.java
deleted file mode 100644
index 7c42f195ea..0000000000
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ShutdownableThread.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.connect.util;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * <p>
- * Thread class with support for triggering graceful and forcible shutdown. In
graceful shutdown,
- * a flag is set, which the thread should detect and try to exit gracefully
from. In forcible
- * shutdown, the thread is interrupted. These can be combined to give a thread
a chance to exit
- * gracefully, but then force it to exit if it takes too long.
- * </p>
- * <p>
- * Implementations should override the {@link #execute} method and check
{@link #getRunning} to
- * determine whether they should try to gracefully exit.
- * </p>
- */
-public abstract class ShutdownableThread extends Thread {
- private static final Logger log =
LoggerFactory.getLogger(ShutdownableThread.class);
-
- private final AtomicBoolean isRunning = new AtomicBoolean(true);
- private final CountDownLatch shutdownLatch = new CountDownLatch(1);
-
- /**
- * An UncaughtExceptionHandler to register on every instance of this
class. This is useful for
- * testing, where AssertionExceptions in the thread may not cause the test
to fail. Since one
- * instance is used for all threads, it must be thread-safe.
- */
- volatile public static UncaughtExceptionHandler funcaughtExceptionHandler
= null;
-
- public ShutdownableThread(String name) {
- // The default is daemon=true so that these threads will not prevent
shutdown. We use this
- // default because threads that are running user code that may not
clean up properly, even
- // when we attempt to forcibly shut them down.
- this(name, true);
- }
-
- public ShutdownableThread(String name, boolean daemon) {
- super(name);
- this.setDaemon(daemon);
- if (funcaughtExceptionHandler != null)
- this.setUncaughtExceptionHandler(funcaughtExceptionHandler);
- }
-
- /**
- * Implementations should override this method with the main body for the
thread.
- */
- public abstract void execute();
-
- /**
- * Returns true if the thread hasn't exited yet and none of the shutdown
methods have been
- * invoked
- */
- public boolean getRunning() {
- return isRunning.get();
- }
-
- @Override
- public void run() {
- try {
- execute();
- } catch (Error | RuntimeException e) {
- log.error("Thread {} exiting with uncaught exception: ",
getName(), e);
- throw e;
- } finally {
- shutdownLatch.countDown();
- }
- }
-
- /**
- * Shutdown the thread, first trying to shut down gracefully using the
specified timeout, then
- * forcibly interrupting the thread.
- * @param gracefulTimeout the maximum time to wait for a graceful exit
- * @param unit the time unit of the timeout argument
- */
- public void shutdown(long gracefulTimeout, TimeUnit unit)
- throws InterruptedException {
- boolean success = gracefulShutdown(gracefulTimeout, unit);
- if (!success)
- forceShutdown();
- }
-
- /**
- * Attempt graceful shutdown
- * @param timeout the maximum time to wait
- * @param unit the time unit of the timeout argument
- * @return true if successful, false if the timeout elapsed
- */
- public boolean gracefulShutdown(long timeout, TimeUnit unit) throws
InterruptedException {
- startGracefulShutdown();
- return awaitShutdown(timeout, unit);
- }
-
- /**
- * Start shutting down this thread gracefully, but do not block waiting
for it to exit.
- */
- public void startGracefulShutdown() {
- log.info("Starting graceful shutdown of thread {}", getName());
- isRunning.set(false);
- }
-
- /**
- * Awaits shutdown of this thread, waiting up to the timeout.
- * @param timeout the maximum time to wait
- * @param unit the time unit of the timeout argument
- * @return true if successful, false if the timeout elapsed
- * @throws InterruptedException
- */
- public boolean awaitShutdown(long timeout, TimeUnit unit) throws
InterruptedException {
- return shutdownLatch.await(timeout, unit);
- }
-
- /**
- * Immediately tries to force the thread to shut down by interrupting it.
This does not try to
- * wait for the thread to truly exit because forcible shutdown is not
always possible. By
- * default, threads are marked as daemon threads so they will not prevent
the process from
- * exiting.
- */
- public void forceShutdown() {
- log.info("Forcing shutdown of thread {}", getName());
- isRunning.set(false);
- interrupt();
- }
-}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
index 44427b5b54..8346de4d51 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
@@ -52,7 +52,6 @@ import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.ParameterizedTest;
-import org.apache.kafka.connect.util.ThreadedTest;
import org.apache.kafka.connect.util.TopicAdmin;
import org.apache.kafka.connect.util.TopicCreationGroup;
import org.easymock.Capture;
@@ -60,6 +59,7 @@ import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.easymock.IExpectationSetters;
import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
@@ -110,7 +110,7 @@ import static org.junit.Assert.fail;
"org.apache.log4j.*"})
@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(ParameterizedTest.class)
-public class ExactlyOnceWorkerSourceTaskTest extends ThreadedTest {
+public class ExactlyOnceWorkerSourceTaskTest {
private static final String TOPIC = "topic";
private static final Map<String, byte[]> PARTITION =
Collections.singletonMap("key", "partition".getBytes());
private static final Map<String, Integer> OFFSET =
Collections.singletonMap("key", 12);
@@ -175,9 +175,8 @@ public class ExactlyOnceWorkerSourceTaskTest extends
ThreadedTest {
this.enableTopicCreation = enableTopicCreation;
}
- @Override
+ @Before
public void setup() {
- super.setup();
Map<String, String> workerProps = workerProps();
plugins = new Plugins(workerProps);
config = new StandaloneConfig(workerProps);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
index 278a73d16d..0747770af2 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
@@ -19,9 +19,9 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.util.ConnectorTaskId;
-import org.apache.kafka.connect.util.ThreadedTest;
import org.easymock.Capture;
import org.easymock.EasyMock;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
@@ -46,7 +46,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@RunWith(PowerMockRunner.class)
-public class SourceTaskOffsetCommitterTest extends ThreadedTest {
+public class SourceTaskOffsetCommitterTest {
private final ConcurrentHashMap<ConnectorTaskId, ScheduledFuture<?>>
committers = new ConcurrentHashMap<>();
@@ -61,9 +61,8 @@ public class SourceTaskOffsetCommitterTest extends
ThreadedTest {
private static final long DEFAULT_OFFSET_COMMIT_INTERVAL_MS = 1000;
- @Override
+ @Before
public void setup() {
- super.setup();
Map<String, String> workerProps = new HashMap<>();
workerProps.put("key.converter",
"org.apache.kafka.connect.json.JsonConverter");
workerProps.put("value.converter",
"org.apache.kafka.connect.json.JsonConverter");
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index cdd87e230d..7ca7f44117 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -41,12 +41,12 @@ import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.connect.util.ThreadedTest;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.easymock.IExpectationSetters;
import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
@@ -74,7 +74,7 @@ import static org.junit.Assert.fail;
@RunWith(PowerMockRunner.class)
@PrepareForTest(WorkerSinkTask.class)
@PowerMockIgnore("javax.management.*")
-public class WorkerSinkTaskThreadedTest extends ThreadedTest {
+public class WorkerSinkTaskThreadedTest {
// These are fixed to keep this code simpler. In this example we assume
byte[] raw values
// with mix of integer/string in Connect
@@ -129,9 +129,8 @@ public class WorkerSinkTaskThreadedTest extends
ThreadedTest {
private long recordsReturned;
- @Override
+ @Before
public void setup() {
- super.setup();
time = new MockTime();
metrics = new MockConnectMetrics();
Map<String, String> workerProps = new HashMap<>();
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 0366677b17..898afacbc4 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -50,7 +50,6 @@ import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.ParameterizedTest;
-import org.apache.kafka.connect.util.ThreadedTest;
import org.apache.kafka.connect.util.TopicAdmin;
import org.apache.kafka.connect.util.TopicCreationGroup;
import org.easymock.Capture;
@@ -58,6 +57,7 @@ import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.easymock.IExpectationSetters;
import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
@@ -106,7 +106,7 @@ import static org.junit.Assert.assertTrue;
"org.apache.log4j.*"})
@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(ParameterizedTest.class)
-public class WorkerSourceTaskTest extends ThreadedTest {
+public class WorkerSourceTaskTest {
private static final String TOPIC = "topic";
private static final String OTHER_TOPIC = "other-topic";
private static final Map<String, Object> PARTITION =
Collections.singletonMap("key", "partition".getBytes());
@@ -168,9 +168,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
this.enableTopicCreation = enableTopicCreation;
}
- @Override
+ @Before
public void setup() {
- super.setup();
Map<String, String> workerProps = workerProps();
plugins = new Plugins(workerProps);
config = new StandaloneConfig(workerProps);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index a064e296b2..ef9398ace3 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -63,7 +63,6 @@ import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.FutureCallback;
import org.apache.kafka.connect.util.ParameterizedTest;
-import org.apache.kafka.connect.util.ThreadedTest;
import org.apache.kafka.connect.util.TopicAdmin;
import org.junit.After;
import org.junit.Before;
@@ -141,7 +140,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@RunWith(Parameterized.class)
-public class WorkerTest extends ThreadedTest {
+public class WorkerTest {
private static final String CONNECTOR_ID = "test-connector";
private static final ConnectorTaskId TASK_ID = new ConnectorTaskId("job",
0);
@@ -217,8 +216,6 @@ public class WorkerTest extends ThreadedTest {
@Before
public void setup() {
- super.setup();
-
// Use strict mode to detect unused mocks
mockitoSession = Mockito.mockitoSession()
.initMocks(this)
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 3249412259..0e1636ac3e 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -66,7 +66,6 @@ import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.FutureCallback;
-import org.apache.kafka.connect.util.ThreadedTest;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
@@ -96,6 +95,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -125,7 +125,7 @@ import static org.junit.Assert.fail;
@RunWith(PowerMockRunner.class)
@PrepareForTest({DistributedHerder.class, Plugins.class, RestClient.class})
@PowerMockIgnore({"javax.management.*", "javax.crypto.*"})
-public class DistributedHerderTest extends ThreadedTest {
+public class DistributedHerderTest {
private static final Map<String, String> HERDER_CONFIG = new HashMap<>();
static {
HERDER_CONFIG.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG,
"status-topic");
@@ -220,6 +220,7 @@ public class DistributedHerderTest extends ThreadedTest {
private ConfigBackingStore.UpdateListener configUpdateListener;
private WorkerRebalanceListener rebalanceListener;
private ExecutorService herderExecutor;
+ private Future<?> herderFuture;
private SinkConnectorConfig conn1SinkConfig;
private SinkConnectorConfig conn1SinkConfigUpdated;
@@ -3654,13 +3655,14 @@ public class DistributedHerderTest extends ThreadedTest
{
private void startBackgroundHerder() {
herderExecutor = Executors.newSingleThreadExecutor();
- herderExecutor.submit(herder);
+ herderFuture = herderExecutor.submit(herder);
}
private void stopBackgroundHerder() throws Exception {
herder.stop();
herderExecutor.shutdown();
- herderExecutor.awaitTermination(10, TimeUnit.SECONDS);
+ assertTrue("herder thread did not finish in time",
herderExecutor.awaitTermination(10, TimeUnit.SECONDS));
+ herderFuture.get();
}
private void expectHerderStartup() {
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ShutdownableThreadTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ShutdownableThreadTest.java
deleted file mode 100644
index a72937d8ab..0000000000
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ShutdownableThreadTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.connect.util;
-
-import org.junit.Test;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class ShutdownableThreadTest {
-
- @Test
- public void testGracefulShutdown() throws InterruptedException {
- ShutdownableThread thread = new ShutdownableThread("graceful") {
- @Override
- public void execute() {
- while (getRunning()) {
- try {
- Thread.sleep(1);
- } catch (InterruptedException e) {
- // Ignore
- }
- }
- }
- };
- thread.start();
- Thread.sleep(10);
- assertTrue(thread.gracefulShutdown(1000, TimeUnit.MILLISECONDS));
- }
-
- @Test
- public void testForcibleShutdown() throws InterruptedException {
- final CountDownLatch startedLatch = new CountDownLatch(1);
- ShutdownableThread thread = new ShutdownableThread("forcible") {
- @Override
- public void execute() {
- try {
- startedLatch.countDown();
- Thread.sleep(100000);
- } catch (InterruptedException e) {
- // Ignore
- }
- }
- };
- thread.start();
- startedLatch.await();
- thread.forceShutdown();
- // Not all threads can be forcibly stopped since interrupt() doesn't
work on threads in
- // certain conditions, but in this case we know the thread is
interruptible so we should be
- // able join() it
- thread.join(1000);
- assertFalse(thread.isAlive());
- }
-}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ThreadedTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ThreadedTest.java
deleted file mode 100644
index dd367dd052..0000000000
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ThreadedTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.connect.util;
-
-import org.junit.After;
-import org.junit.Before;
-
-/**
- * Base class for tests that use threads. It sets up uncaught exception
handlers for all known
- * thread classes and checks for errors at the end of the test so that
failures in background
- * threads will cause the test to fail.
- */
-public class ThreadedTest {
-
- protected TestBackgroundThreadExceptionHandler
backgroundThreadExceptionHandler;
-
- @Before
- public void setup() {
- backgroundThreadExceptionHandler = new
TestBackgroundThreadExceptionHandler();
- ShutdownableThread.funcaughtExceptionHandler =
backgroundThreadExceptionHandler;
- }
-
- @After
- public void teardown() {
- backgroundThreadExceptionHandler.verifyNoExceptions();
- ShutdownableThread.funcaughtExceptionHandler = null;
- }
-}