This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a673f21242 KAFKA-12380 shutdown Executor in Connect's Worker when
closed (#11955)
a673f21242 is described below
commit a673f2124218e524335ea13337bd3b49f52c52a3
Author: Rajani Karuturi <[email protected]>
AuthorDate: Fri Apr 29 11:05:25 2022 +0530
KAFKA-12380 shutdown Executor in Connect's Worker when closed (#11955)
When the worker is stopped, it does not shutdown this executor. This PR
fixes the issue.
Reviewers: Luke Chen <[email protected]>
---
.../org/apache/kafka/connect/runtime/Worker.java | 15 ++++++
.../apache/kafka/connect/runtime/WorkerTest.java | 61 +++++++++++++++++++++-
2 files changed, 75 insertions(+), 1 deletion(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 4adf6ff5e0..89e32b54af 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -91,6 +91,7 @@ import java.util.stream.Collectors;
public class Worker {
public static final long CONNECTOR_GRACEFUL_SHUTDOWN_TIMEOUT_MS =
TimeUnit.SECONDS.toMillis(5);
+ public static final long EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS =
TimeUnit.SECONDS.toMillis(1);
private static final Logger log = LoggerFactory.getLogger(Worker.class);
@@ -222,6 +223,20 @@ public class Worker {
connectorStatusMetricsGroup.close();
workerConfigTransformer.close();
+ executor.shutdown();
+ try {
+ // Wait a while for existing tasks to terminate
+ if
(!executor.awaitTermination(EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS,
TimeUnit.MILLISECONDS)) {
+ executor.shutdownNow(); //cancel current executing threads
+ // Wait a while for tasks to respond to being cancelled
+ if
(!executor.awaitTermination(EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS,
TimeUnit.MILLISECONDS))
+ log.error("Executor did not terminate in time");
+ }
+ } catch (InterruptedException e) {
+ executor.shutdownNow(); // (Re-)Cancel if current thread also
interrupted
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
}
/**
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index dcd9286480..e410183c7f 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -103,12 +103,14 @@ import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockConstructionWithAnswer;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@RunWith(Parameterized.class)
@@ -411,7 +413,7 @@ public class WorkerTest extends ThreadedTest {
when(plugins.newConnector(connectorAlias)).thenReturn(sinkConnector);
when(delegatingLoader.connectorLoader(connectorAlias)).thenReturn(pluginLoader);
when(sinkConnector.version()).thenReturn("1.0");
-
+
pluginsMockedStatic.when(() ->
Plugins.compareAndSwapLoaders(pluginLoader)).thenReturn(delegatingLoader);
pluginsMockedStatic.when(() ->
Plugins.compareAndSwapLoaders(delegatingLoader)).thenReturn(pluginLoader);
connectUtilsMockedStatic.when(() ->
ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)))
@@ -1170,6 +1172,63 @@ public class WorkerTest extends ThreadedTest {
assertNotNull(server.getObjectInstance(new
ObjectName("kafka.connect:type=grp1")));
}
+ @Test
+ public void testExecutorServiceShutdown() throws InterruptedException {
+ ExecutorService executorService = mock(ExecutorService.class);
+ doNothing().when(executorService).shutdown();
+ when(executorService.awaitTermination(1000L,
TimeUnit.MILLISECONDS)).thenReturn(true);
+
+ worker = new Worker(WORKER_ID, new MockTime(), plugins, config,
+ offsetBackingStore, executorService,
+ noneConnectorClientConfigOverridePolicy);
+ worker.start();
+
+ assertEquals(Collections.emptySet(), worker.connectorNames());
+ worker.stop();
+ verify(executorService, times(1)).shutdown();
+ verify(executorService, times(1)).awaitTermination(1000L,
TimeUnit.MILLISECONDS);
+ verifyNoMoreInteractions(executorService);
+
+ }
+
+ @Test
+ public void testExecutorServiceShutdownWhenTerminationFails() throws
InterruptedException {
+ ExecutorService executorService = mock(ExecutorService.class);
+ doNothing().when(executorService).shutdown();
+ when(executorService.awaitTermination(1000L,
TimeUnit.MILLISECONDS)).thenReturn(false);
+ worker = new Worker(WORKER_ID, new MockTime(), plugins, config,
+ offsetBackingStore, executorService,
+ noneConnectorClientConfigOverridePolicy);
+ worker.start();
+
+ assertEquals(Collections.emptySet(), worker.connectorNames());
+ worker.stop();
+ verify(executorService, times(1)).shutdown();
+ verify(executorService, times(1)).shutdownNow();
+ verify(executorService, times(2)).awaitTermination(1000L,
TimeUnit.MILLISECONDS);
+ verifyNoMoreInteractions(executorService);
+
+ }
+
+ @Test
+ public void testExecutorServiceShutdownWhenTerminationThrowsException()
throws InterruptedException {
+ ExecutorService executorService = mock(ExecutorService.class);
+ doNothing().when(executorService).shutdown();
+ when(executorService.awaitTermination(1000L,
TimeUnit.MILLISECONDS)).thenThrow(new InterruptedException("interrupt"));
+ worker = new Worker(WORKER_ID, new MockTime(), plugins, config,
+ offsetBackingStore, executorService,
+ noneConnectorClientConfigOverridePolicy);
+ worker.start();
+
+ assertEquals(Collections.emptySet(), worker.connectorNames());
+ worker.stop();
+ verify(executorService, times(1)).shutdown();
+ verify(executorService, times(1)).shutdownNow();
+ verify(executorService, times(1)).awaitTermination(1000L,
TimeUnit.MILLISECONDS);
+ verifyNoMoreInteractions(executorService);
+
+ }
+
private void assertStatusMetrics(long expected, String metricName) {
MetricGroup statusMetrics =
worker.connectorStatusMetricsGroup().metricGroup(TASK_ID.connector());
if (expected == 0L) {