This is an automated email from the ASF dual-hosted git repository. mimaison pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 73e8d5dd5b MINOR: Remove unused ShutdownableThread class and ineffective ThreadedTest class (#12410) 73e8d5dd5b is described below commit 73e8d5dd5b2344a2c494cef59929b8e187fa68ec Author: Chris Egerton <fearthecel...@gmail.com> AuthorDate: Thu Aug 18 06:08:03 2022 -0400 MINOR: Remove unused ShutdownableThread class and ineffective ThreadedTest class (#12410) Reviewers: Mickael Maison <mickael.mai...@gmail.com>, Christo Lolov <christo_lo...@yahoo.com> --- .../kafka/connect/util/ShutdownableThread.java | 144 --------------------- .../runtime/ExactlyOnceWorkerSourceTaskTest.java | 7 +- .../runtime/SourceTaskOffsetCommitterTest.java | 7 +- .../runtime/WorkerSinkTaskThreadedTest.java | 7 +- .../connect/runtime/WorkerSourceTaskTest.java | 7 +- .../apache/kafka/connect/runtime/WorkerTest.java | 5 +- .../runtime/distributed/DistributedHerderTest.java | 10 +- .../kafka/connect/util/ShutdownableThreadTest.java | 71 ---------- .../apache/kafka/connect/util/ThreadedTest.java | 42 ------ 9 files changed, 19 insertions(+), 281 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ShutdownableThread.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ShutdownableThread.java deleted file mode 100644 index 7c42f195ea..0000000000 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ShutdownableThread.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.connect.util; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * <p> - * Thread class with support for triggering graceful and forcible shutdown. In graceful shutdown, - * a flag is set, which the thread should detect and try to exit gracefully from. In forcible - * shutdown, the thread is interrupted. These can be combined to give a thread a chance to exit - * gracefully, but then force it to exit if it takes too long. - * </p> - * <p> - * Implementations should override the {@link #execute} method and check {@link #getRunning} to - * determine whether they should try to gracefully exit. - * </p> - */ -public abstract class ShutdownableThread extends Thread { - private static final Logger log = LoggerFactory.getLogger(ShutdownableThread.class); - - private final AtomicBoolean isRunning = new AtomicBoolean(true); - private final CountDownLatch shutdownLatch = new CountDownLatch(1); - - /** - * An UncaughtExceptionHandler to register on every instance of this class. This is useful for - * testing, where AssertionExceptions in the thread may not cause the test to fail. Since one - * instance is used for all threads, it must be thread-safe. - */ - volatile public static UncaughtExceptionHandler funcaughtExceptionHandler = null; - - public ShutdownableThread(String name) { - // The default is daemon=true so that these threads will not prevent shutdown. We use this - // default because threads that are running user code that may not clean up properly, even - // when we attempt to forcibly shut them down. - this(name, true); - } - - public ShutdownableThread(String name, boolean daemon) { - super(name); - this.setDaemon(daemon); - if (funcaughtExceptionHandler != null) - this.setUncaughtExceptionHandler(funcaughtExceptionHandler); - } - - /** - * Implementations should override this method with the main body for the thread. - */ - public abstract void execute(); - - /** - * Returns true if the thread hasn't exited yet and none of the shutdown methods have been - * invoked - */ - public boolean getRunning() { - return isRunning.get(); - } - - @Override - public void run() { - try { - execute(); - } catch (Error | RuntimeException e) { - log.error("Thread {} exiting with uncaught exception: ", getName(), e); - throw e; - } finally { - shutdownLatch.countDown(); - } - } - - /** - * Shutdown the thread, first trying to shut down gracefully using the specified timeout, then - * forcibly interrupting the thread. - * @param gracefulTimeout the maximum time to wait for a graceful exit - * @param unit the time unit of the timeout argument - */ - public void shutdown(long gracefulTimeout, TimeUnit unit) - throws InterruptedException { - boolean success = gracefulShutdown(gracefulTimeout, unit); - if (!success) - forceShutdown(); - } - - /** - * Attempt graceful shutdown - * @param timeout the maximum time to wait - * @param unit the time unit of the timeout argument - * @return true if successful, false if the timeout elapsed - */ - public boolean gracefulShutdown(long timeout, TimeUnit unit) throws InterruptedException { - startGracefulShutdown(); - return awaitShutdown(timeout, unit); - } - - /** - * Start shutting down this thread gracefully, but do not block waiting for it to exit. - */ - public void startGracefulShutdown() { - log.info("Starting graceful shutdown of thread {}", getName()); - isRunning.set(false); - } - - /** - * Awaits shutdown of this thread, waiting up to the timeout. - * @param timeout the maximum time to wait - * @param unit the time unit of the timeout argument - * @return true if successful, false if the timeout elapsed - * @throws InterruptedException - */ - public boolean awaitShutdown(long timeout, TimeUnit unit) throws InterruptedException { - return shutdownLatch.await(timeout, unit); - } - - /** - * Immediately tries to force the thread to shut down by interrupting it. This does not try to - * wait for the thread to truly exit because forcible shutdown is not always possible. By - * default, threads are marked as daemon threads so they will not prevent the process from - * exiting. - */ - public void forceShutdown() { - log.info("Forcing shutdown of thread {}", getName()); - isRunning.set(false); - interrupt(); - } -} diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java index 44427b5b54..8346de4d51 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java @@ -52,7 +52,6 @@ import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.ParameterizedTest; -import org.apache.kafka.connect.util.ThreadedTest; import org.apache.kafka.connect.util.TopicAdmin; import org.apache.kafka.connect.util.TopicCreationGroup; import org.easymock.Capture; @@ -60,6 +59,7 @@ import org.easymock.EasyMock; import org.easymock.IAnswer; import org.easymock.IExpectationSetters; import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.api.easymock.PowerMock; @@ -110,7 +110,7 @@ import static org.junit.Assert.fail; "org.apache.log4j.*"}) @RunWith(PowerMockRunner.class) @PowerMockRunnerDelegate(ParameterizedTest.class) -public class ExactlyOnceWorkerSourceTaskTest extends ThreadedTest { +public class ExactlyOnceWorkerSourceTaskTest { private static final String TOPIC = "topic"; private static final Map<String, byte[]> PARTITION = Collections.singletonMap("key", "partition".getBytes()); private static final Map<String, Integer> OFFSET = Collections.singletonMap("key", 12); @@ -175,9 +175,8 @@ public class ExactlyOnceWorkerSourceTaskTest extends ThreadedTest { this.enableTopicCreation = enableTopicCreation; } - @Override + @Before public void setup() { - super.setup(); Map<String, String> workerProps = workerProps(); plugins = new Plugins(workerProps); config = new StandaloneConfig(workerProps); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java index 278a73d16d..0747770af2 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java @@ -19,9 +19,9 @@ package org.apache.kafka.connect.runtime; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.util.ConnectorTaskId; -import org.apache.kafka.connect.util.ThreadedTest; import org.easymock.Capture; import org.easymock.EasyMock; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.api.easymock.PowerMock; @@ -46,7 +46,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @RunWith(PowerMockRunner.class) -public class SourceTaskOffsetCommitterTest extends ThreadedTest { +public class SourceTaskOffsetCommitterTest { private final ConcurrentHashMap<ConnectorTaskId, ScheduledFuture<?>> committers = new ConcurrentHashMap<>(); @@ -61,9 +61,8 @@ public class SourceTaskOffsetCommitterTest extends ThreadedTest { private static final long DEFAULT_OFFSET_COMMIT_INTERVAL_MS = 1000; - @Override + @Before public void setup() { - super.setup(); Map<String, String> workerProps = new HashMap<>(); workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java index cdd87e230d..7ca7f44117 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java @@ -41,12 +41,12 @@ import org.apache.kafka.connect.storage.HeaderConverter; import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.connect.util.ThreadedTest; import org.easymock.Capture; import org.easymock.CaptureType; import org.easymock.EasyMock; import org.easymock.IExpectationSetters; import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.api.easymock.PowerMock; @@ -74,7 +74,7 @@ import static org.junit.Assert.fail; @RunWith(PowerMockRunner.class) @PrepareForTest(WorkerSinkTask.class) @PowerMockIgnore("javax.management.*") -public class WorkerSinkTaskThreadedTest extends ThreadedTest { +public class WorkerSinkTaskThreadedTest { // These are fixed to keep this code simpler. In this example we assume byte[] raw values // with mix of integer/string in Connect @@ -129,9 +129,8 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { private long recordsReturned; - @Override + @Before public void setup() { - super.setup(); time = new MockTime(); metrics = new MockConnectMetrics(); Map<String, String> workerProps = new HashMap<>(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 0366677b17..898afacbc4 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -50,7 +50,6 @@ import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.ParameterizedTest; -import org.apache.kafka.connect.util.ThreadedTest; import org.apache.kafka.connect.util.TopicAdmin; import org.apache.kafka.connect.util.TopicCreationGroup; import org.easymock.Capture; @@ -58,6 +57,7 @@ import org.easymock.EasyMock; import org.easymock.IAnswer; import org.easymock.IExpectationSetters; import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.api.easymock.PowerMock; @@ -106,7 +106,7 @@ import static org.junit.Assert.assertTrue; "org.apache.log4j.*"}) @RunWith(PowerMockRunner.class) @PowerMockRunnerDelegate(ParameterizedTest.class) -public class WorkerSourceTaskTest extends ThreadedTest { +public class WorkerSourceTaskTest { private static final String TOPIC = "topic"; private static final String OTHER_TOPIC = "other-topic"; private static final Map<String, Object> PARTITION = Collections.singletonMap("key", "partition".getBytes()); @@ -168,9 +168,8 @@ public class WorkerSourceTaskTest extends ThreadedTest { this.enableTopicCreation = enableTopicCreation; } - @Override + @Before public void setup() { - super.setup(); Map<String, String> workerProps = workerProps(); plugins = new Plugins(workerProps); config = new StandaloneConfig(workerProps); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index a064e296b2..ef9398ace3 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -63,7 +63,6 @@ import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.FutureCallback; import org.apache.kafka.connect.util.ParameterizedTest; -import org.apache.kafka.connect.util.ThreadedTest; import org.apache.kafka.connect.util.TopicAdmin; import org.junit.After; import org.junit.Before; @@ -141,7 +140,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @RunWith(Parameterized.class) -public class WorkerTest extends ThreadedTest { +public class WorkerTest { private static final String CONNECTOR_ID = "test-connector"; private static final ConnectorTaskId TASK_ID = new ConnectorTaskId("job", 0); @@ -217,8 +216,6 @@ public class WorkerTest extends ThreadedTest { @Before public void setup() { - super.setup(); - // Use strict mode to detect unused mocks mockitoSession = Mockito.mockitoSession() .initMocks(this) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index 3249412259..0e1636ac3e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -66,7 +66,6 @@ import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.FutureCallback; -import org.apache.kafka.connect.util.ThreadedTest; import org.easymock.Capture; import org.easymock.CaptureType; import org.easymock.EasyMock; @@ -96,6 +95,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -125,7 +125,7 @@ import static org.junit.Assert.fail; @RunWith(PowerMockRunner.class) @PrepareForTest({DistributedHerder.class, Plugins.class, RestClient.class}) @PowerMockIgnore({"javax.management.*", "javax.crypto.*"}) -public class DistributedHerderTest extends ThreadedTest { +public class DistributedHerderTest { private static final Map<String, String> HERDER_CONFIG = new HashMap<>(); static { HERDER_CONFIG.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "status-topic"); @@ -220,6 +220,7 @@ public class DistributedHerderTest extends ThreadedTest { private ConfigBackingStore.UpdateListener configUpdateListener; private WorkerRebalanceListener rebalanceListener; private ExecutorService herderExecutor; + private Future<?> herderFuture; private SinkConnectorConfig conn1SinkConfig; private SinkConnectorConfig conn1SinkConfigUpdated; @@ -3654,13 +3655,14 @@ public class DistributedHerderTest extends ThreadedTest { private void startBackgroundHerder() { herderExecutor = Executors.newSingleThreadExecutor(); - herderExecutor.submit(herder); + herderFuture = herderExecutor.submit(herder); } private void stopBackgroundHerder() throws Exception { herder.stop(); herderExecutor.shutdown(); - herderExecutor.awaitTermination(10, TimeUnit.SECONDS); + assertTrue("herder thread did not finish in time", herderExecutor.awaitTermination(10, TimeUnit.SECONDS)); + herderFuture.get(); } private void expectHerderStartup() { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ShutdownableThreadTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ShutdownableThreadTest.java deleted file mode 100644 index a72937d8ab..0000000000 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ShutdownableThreadTest.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.connect.util; - -import org.junit.Test; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -public class ShutdownableThreadTest { - - @Test - public void testGracefulShutdown() throws InterruptedException { - ShutdownableThread thread = new ShutdownableThread("graceful") { - @Override - public void execute() { - while (getRunning()) { - try { - Thread.sleep(1); - } catch (InterruptedException e) { - // Ignore - } - } - } - }; - thread.start(); - Thread.sleep(10); - assertTrue(thread.gracefulShutdown(1000, TimeUnit.MILLISECONDS)); - } - - @Test - public void testForcibleShutdown() throws InterruptedException { - final CountDownLatch startedLatch = new CountDownLatch(1); - ShutdownableThread thread = new ShutdownableThread("forcible") { - @Override - public void execute() { - try { - startedLatch.countDown(); - Thread.sleep(100000); - } catch (InterruptedException e) { - // Ignore - } - } - }; - thread.start(); - startedLatch.await(); - thread.forceShutdown(); - // Not all threads can be forcibly stopped since interrupt() doesn't work on threads in - // certain conditions, but in this case we know the thread is interruptible so we should be - // able join() it - thread.join(1000); - assertFalse(thread.isAlive()); - } -} diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ThreadedTest.java deleted file mode 100644 index dd367dd052..0000000000 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ThreadedTest.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.connect.util; - -import org.junit.After; -import org.junit.Before; - -/** - * Base class for tests that use threads. It sets up uncaught exception handlers for all known - * thread classes and checks for errors at the end of the test so that failures in background - * threads will cause the test to fail. - */ -public class ThreadedTest { - - protected TestBackgroundThreadExceptionHandler backgroundThreadExceptionHandler; - - @Before - public void setup() { - backgroundThreadExceptionHandler = new TestBackgroundThreadExceptionHandler(); - ShutdownableThread.funcaughtExceptionHandler = backgroundThreadExceptionHandler; - } - - @After - public void teardown() { - backgroundThreadExceptionHandler.verifyNoExceptions(); - ShutdownableThread.funcaughtExceptionHandler = null; - } -}