This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a673f21242 KAFKA-12380 shutdown Executor in Connect's Worker when 
closed (#11955)
a673f21242 is described below

commit a673f2124218e524335ea13337bd3b49f52c52a3
Author: Rajani Karuturi <[email protected]>
AuthorDate: Fri Apr 29 11:05:25 2022 +0530

    KAFKA-12380 shutdown Executor in Connect's Worker when closed (#11955)
    
    When the worker is stopped, it does not shutdown this executor. This PR 
fixes the issue.
    
    Reviewers: Luke Chen <[email protected]>
---
 .../org/apache/kafka/connect/runtime/Worker.java   | 15 ++++++
 .../apache/kafka/connect/runtime/WorkerTest.java   | 61 +++++++++++++++++++++-
 2 files changed, 75 insertions(+), 1 deletion(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 4adf6ff5e0..89e32b54af 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -91,6 +91,7 @@ import java.util.stream.Collectors;
 public class Worker {
 
     public static final long CONNECTOR_GRACEFUL_SHUTDOWN_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(5);
+    public static final long EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(1);
 
     private static final Logger log = LoggerFactory.getLogger(Worker.class);
 
@@ -222,6 +223,20 @@ public class Worker {
         connectorStatusMetricsGroup.close();
 
         workerConfigTransformer.close();
+        executor.shutdown();
+        try {
+            // Wait a while for existing tasks to terminate
+            if 
(!executor.awaitTermination(EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)) {
+                executor.shutdownNow(); //cancel current executing threads
+                // Wait a while for tasks to respond to being cancelled
+                if 
(!executor.awaitTermination(EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS, 
TimeUnit.MILLISECONDS))
+                    log.error("Executor did not terminate in time");
+            }
+        } catch (InterruptedException e) {
+            executor.shutdownNow(); // (Re-)Cancel if current thread also 
interrupted
+            // Preserve interrupt status
+            Thread.currentThread().interrupt();
+        }
     }
 
     /**
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index dcd9286480..e410183c7f 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -103,12 +103,14 @@ import static org.mockito.ArgumentMatchers.anyMap;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockConstructionWithAnswer;
 import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 @RunWith(Parameterized.class)
@@ -411,7 +413,7 @@ public class WorkerTest extends ThreadedTest {
         when(plugins.newConnector(connectorAlias)).thenReturn(sinkConnector);
         
when(delegatingLoader.connectorLoader(connectorAlias)).thenReturn(pluginLoader);
         when(sinkConnector.version()).thenReturn("1.0");
-        
+
         pluginsMockedStatic.when(() -> 
Plugins.compareAndSwapLoaders(pluginLoader)).thenReturn(delegatingLoader);
         pluginsMockedStatic.when(() -> 
Plugins.compareAndSwapLoaders(delegatingLoader)).thenReturn(pluginLoader);
         connectUtilsMockedStatic.when(() -> 
ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)))
@@ -1170,6 +1172,63 @@ public class WorkerTest extends ThreadedTest {
         assertNotNull(server.getObjectInstance(new 
ObjectName("kafka.connect:type=grp1")));
     }
 
+    @Test
+    public void testExecutorServiceShutdown() throws InterruptedException {
+        ExecutorService executorService = mock(ExecutorService.class);
+        doNothing().when(executorService).shutdown();
+        when(executorService.awaitTermination(1000L, 
TimeUnit.MILLISECONDS)).thenReturn(true);
+
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config,
+                            offsetBackingStore, executorService,
+                            noneConnectorClientConfigOverridePolicy);
+        worker.start();
+
+        assertEquals(Collections.emptySet(), worker.connectorNames());
+        worker.stop();
+        verify(executorService, times(1)).shutdown();
+        verify(executorService, times(1)).awaitTermination(1000L, 
TimeUnit.MILLISECONDS);
+        verifyNoMoreInteractions(executorService);
+
+    }
+
+    @Test
+    public void testExecutorServiceShutdownWhenTerminationFails() throws 
InterruptedException {
+        ExecutorService executorService = mock(ExecutorService.class);
+        doNothing().when(executorService).shutdown();
+        when(executorService.awaitTermination(1000L, 
TimeUnit.MILLISECONDS)).thenReturn(false);
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config,
+                            offsetBackingStore, executorService,
+                            noneConnectorClientConfigOverridePolicy);
+        worker.start();
+
+        assertEquals(Collections.emptySet(), worker.connectorNames());
+        worker.stop();
+        verify(executorService, times(1)).shutdown();
+        verify(executorService, times(1)).shutdownNow();
+        verify(executorService, times(2)).awaitTermination(1000L, 
TimeUnit.MILLISECONDS);
+        verifyNoMoreInteractions(executorService);
+
+    }
+
+    @Test
+    public void testExecutorServiceShutdownWhenTerminationThrowsException() 
throws InterruptedException {
+        ExecutorService executorService = mock(ExecutorService.class);
+        doNothing().when(executorService).shutdown();
+        when(executorService.awaitTermination(1000L, 
TimeUnit.MILLISECONDS)).thenThrow(new InterruptedException("interrupt"));
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config,
+                            offsetBackingStore, executorService,
+                            noneConnectorClientConfigOverridePolicy);
+        worker.start();
+
+        assertEquals(Collections.emptySet(), worker.connectorNames());
+        worker.stop();
+        verify(executorService, times(1)).shutdown();
+        verify(executorService, times(1)).shutdownNow();
+        verify(executorService, times(1)).awaitTermination(1000L, 
TimeUnit.MILLISECONDS);
+        verifyNoMoreInteractions(executorService);
+
+    }
+
     private void assertStatusMetrics(long expected, String metricName) {
         MetricGroup statusMetrics = 
worker.connectorStatusMetricsGroup().metricGroup(TASK_ID.connector());
         if (expected == 0L) {

Reply via email to