Copilot commented on code in PR #17510: URL: https://github.com/apache/dolphinscheduler/pull/17510#discussion_r2476463566
########## dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtilsTest.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.utils; + +import org.apache.dolphinscheduler.plugin.task.api.TaskException; +import org.apache.dolphinscheduler.plugin.task.api.k8s.KubernetesClientPool; + +import java.util.Collections; +import java.util.List; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodList; +import io.fabric8.kubernetes.api.model.StatusDetails; +import io.fabric8.kubernetes.api.model.batch.v1.Job; +import io.fabric8.kubernetes.api.model.batch.v1.JobList; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.Watch; +import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.dsl.BatchAPIGroupDSL; +import io.fabric8.kubernetes.client.dsl.MixedOperation; +import io.fabric8.kubernetes.client.dsl.PodResource; +import io.fabric8.kubernetes.client.dsl.PrettyLoggable; +import io.fabric8.kubernetes.client.dsl.ScalableResource; +import io.fabric8.kubernetes.client.dsl.V1BatchAPIGroupDSL; + +public class K8sUtilsTest { + + private K8sUtils k8sUtils; + private MockedStatic<KubernetesClientPool> mockedKubernetesClientPool; + private KubernetesClient mockClient; + private final String mockClusterId = "mock-cluster-id"; + private final String mockKubeConfig = + "apiVersion: v1\nclusters:\n- cluster:\n server: https://kubernetes.default.svc\n name: mock-cluster\ncontexts:\n- context:\n cluster: mock-cluster\n namespace: default\n user: mock-user\n name: mock-context\ncurrent-context: mock-context\nkind: Config\npreferences: {}\nusers:\n- name: mock-user\n user: {}\n"; + private final String mockNamespace = "default"; + private final String mockJobName = "test-job-123"; + + private BatchAPIGroupDSL mockBatch; + private V1BatchAPIGroupDSL mockV1; + private MixedOperation<Job, JobList, ScalableResource<Job>> mockJobs; + private MixedOperation<Job, JobList, ScalableResource<Job>> mockInNamespace; + private ScalableResource<Job> mockWithName; + private ScalableResource<Job> mockResource; + private Job mockJob; + private Watch mockWatch; + private Watcher<Job> mockWatcher; + + private final String expectedLog = "Pod log content"; + + @SuppressWarnings("unchecked") + @BeforeEach + public void setUp() { + k8sUtils = new K8sUtils(); + mockedKubernetesClientPool = Mockito.mockStatic(KubernetesClientPool.class); // 拦截所有使用静态方法的请求 Review Comment: Chinese comment should be removed or translated to English. The comment '拦截所有使用静态方法的请求' (intercept all requests using static methods) is inconsistent with the rest of the codebase which uses English comments. ```suggestion mockedKubernetesClientPool = Mockito.mockStatic(KubernetesClientPool.class); // Intercept all requests using static methods ``` ########## dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java: ########## @@ -292,16 +301,13 @@ private void parsePodLogOutput() { } @Override - public TaskResponse run(String k8sParameterStr) throws Exception { + public TaskResponse run(String k8sParameterStr) { Review Comment: The method signature changed from `throws Exception` to no exception declaration, but the method body still contains try-catch blocks that catch Exception. This is a breaking API change that could affect callers expecting to handle exceptions. The removed exception should either be kept in the signature or properly handled/logged in all code paths. ```suggestion public TaskResponse run(String k8sParameterStr) throws Exception { ``` ########## dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtils.java: ########## @@ -59,33 +67,58 @@ public void deleteJob(String jobName, String namespace) { .delete(); } catch (Exception e) { throw new TaskException("fail to delete job", e); + } finally { + if (client != null) { + KubernetesClientPool.getInstance().returnClient(clusterId, client); + } } } - public Boolean jobExist(String jobName, String namespace) { + public Boolean jobExist(String configYaml, String jobName, String namespace) { + String clusterId = KubernetesClientPool.getInstance().getClusterId(configYaml); + KubernetesClient client = null; try { + client = KubernetesClientPool.getInstance().getClient(clusterId, configYaml); Job job = client.batch().v1().jobs().inNamespace(namespace).withName(jobName).get(); return job != null; } catch (Exception e) { - throw new TaskException("fail to check job: ", e); + throw new TaskException("fail to check job", e); + } finally { + if (client != null) { + KubernetesClientPool.getInstance().returnClient(clusterId, client); + } } } - public Watch createBatchJobWatcher(String jobName, Watcher<Job> watcher) { + public Watch createBatchJobWatcher(String configYaml, String jobName, Watcher<Job> watcher) { + String clusterId = KubernetesClientPool.getInstance().getClusterId(configYaml); + KubernetesClient client = null; try { + client = KubernetesClientPool.getInstance().getClient(clusterId, configYaml); return client.batch() .v1() .jobs() .withName(jobName) .watch(watcher); } catch (Exception e) { throw new TaskException("fail to register batch job watcher", e); + } finally { + if (client != null) { + log.debug( + "createBatchJobWatcher does not return client immediately, caller should manage client lifecycle"); Review Comment: The comment indicates the client is not returned immediately and the caller should manage the lifecycle, but the client is obtained in the try block (line 97) and logged in the finally block (line 106-109) without actually being returned to the pool. This will cause a client connection leak as the client remains borrowed but is never returned. ########## dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java: ########## @@ -96,56 +107,90 @@ public ResourceManagerType getResourceManagerType() { /** * get driver pod * - * @param kubernetesApplicationManagerContext - * @return + * @param kubernetesApplicationManagerContext Context + * @return pods */ @SneakyThrows - private FilterWatchListDeletable<Pod, PodList, PodResource> getListenPod(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) { - KubernetesClient client = getClient(kubernetesApplicationManagerContext); + public FilterWatchListDeletable<Pod, PodList, PodResource> getListenPod(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) { + String clusterId = getClusterId(kubernetesApplicationManagerContext.getK8sTaskExecutionContext()); + KubernetesClient client = null; String labelValue = kubernetesApplicationManagerContext.getLabelValue(); List<Pod> podList = null; FilterWatchListDeletable<Pod, PodList, PodResource> watchList = null; int retryTimes = 0; - while (CollectionUtils.isEmpty(podList) && retryTimes < MAX_RETRY_TIMES) { - watchList = client.pods() - .inNamespace(kubernetesApplicationManagerContext.getK8sTaskExecutionContext().getNamespace()) - .withLabel(UNIQUE_LABEL_NAME, labelValue); - podList = watchList.list().getItems(); - if (!CollectionUtils.isEmpty(podList)) { - break; + try { + client = getClient(kubernetesApplicationManagerContext); + while (CollectionUtils.isEmpty(podList) && retryTimes < MAX_RETRY_TIMES) { + watchList = client.pods() + .inNamespace(kubernetesApplicationManagerContext.getK8sTaskExecutionContext().getNamespace()) + .withLabel(UNIQUE_LABEL_NAME, labelValue); + podList = watchList.list().getItems(); + if (!CollectionUtils.isEmpty(podList)) { + break; + } + Thread.sleep(SLEEP_TIME_MILLIS); + retryTimes += 1; + } + return watchList; + } finally { + if (client != null) { + returnClient(clusterId, client); } - Thread.sleep(SLEEP_TIME_MILLIS); - retryTimes += 1; } - - return watchList; } /** - * create client or get from cache map + * Retrieve Kubernetes clients from the connection pool * - * @param kubernetesApplicationManagerContext - * @return + * @param kubernetesApplicationManagerContext Context parameters + * @return Kubernetes Client */ - private KubernetesClient getClient(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) { + public KubernetesClient getClient(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) { K8sTaskExecutionContext k8sTaskExecutionContext = kubernetesApplicationManagerContext.getK8sTaskExecutionContext(); - return cacheClientMap.computeIfAbsent(kubernetesApplicationManagerContext.getLabelValue(), - key -> new KubernetesClientBuilder() - .withConfig(Config.fromKubeconfig(k8sTaskExecutionContext.getConfigYaml())).build()); + + // Using k8s configuration as cluster identifier + String clusterId = getClusterId(k8sTaskExecutionContext); + String kubeConfig = k8sTaskExecutionContext.getConfigYaml(); + + try { + return clientPool.getClient(clusterId, kubeConfig); + } catch (Exception e) { + log.error("Failed to get Kubernetes client from pool", e); + throw new RuntimeException("Failed to get Kubernetes client", e); + } + } + + /** + * Get Cluster Identifier + */ + public String getClusterId(K8sTaskExecutionContext k8sTaskExecutionContext) { + String kubeConfig = k8sTaskExecutionContext.getConfigYaml(); + int hashCode = kubeConfig.hashCode(); + int nonNegativeHash = hashCode & 0x7FFFFFFF; + return K8S_CLUSTER_PREFIX + nonNegativeHash; Review Comment: The cluster ID generation logic is duplicated between `KubernetesApplicationManager.getClusterId()` and `KubernetesClientPool.getClusterId()`. These methods use different algorithms (SHA-256 in the pool vs hashCode in the manager), which will produce different cluster IDs for the same kubeConfig. This inconsistency will prevent proper client pooling and cause connection leaks. ########## dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/KubernetesClientPoolTest.java: ########## @@ -0,0 +1,1262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.k8s; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockedConstruction; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +import io.fabric8.kubernetes.api.model.Namespace; +import io.fabric8.kubernetes.api.model.NamespaceList; +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; +import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation; +import io.fabric8.kubernetes.client.dsl.Resource; + +public class KubernetesClientPoolTest { + + private KubernetesClientPool mockPool; + private final String mockKubeConfig = + "apiVersion: v1\nclusters:\n- cluster:\n server: https://kubernetes.default.svc\n name: mock-cluster\ncontexts:\n- context:\n cluster: mock-cluster\n namespace: default\n user: mock-user\n name: mock-context\ncurrent-context: mock-context\nkind: Config\npreferences: {}\nusers:\n- name: mock-user\n user: {}"; + private final String clusterId = "mock-cluster-id"; + private MockedStatic<KubernetesClientPool> mockedKubernetesClientPool; + + @BeforeEach + public void before() throws Exception { + mockedKubernetesClientPool = Mockito.mockStatic(KubernetesClientPool.class); + + mockPool = Mockito.mock(KubernetesClientPool.class); + KubernetesClient mockClient = Mockito.mock(KubernetesClient.class); + + Mockito.when(KubernetesClientPool.getInstance()).thenReturn(mockPool); + Mockito.when(mockPool.getClient(Mockito.anyString(), Mockito.anyString())).thenReturn(mockClient); + Mockito.when(mockPool.getClusterId(mockKubeConfig)).thenReturn(clusterId); + } + + @AfterEach + public void after() { + if (mockedKubernetesClientPool != null) { + mockedKubernetesClientPool.close(); + } + } + + /** + * test: getClusterId,getClient,closePool,returnClient + */ + @Test + public void testKubernetesClientPoolBasicFunction() { + KubernetesClientPool mockPool = KubernetesClientPool.getInstance(); + + String clusterId = "mock-cluster-id"; + Mockito.when(mockPool.getClusterId(Mockito.anyString())).thenReturn(clusterId); + + KubernetesClient mockClient = Mockito.mock(KubernetesClient.class); + Mockito.when(mockPool.getClient(clusterId, mockKubeConfig)).thenReturn(mockClient); + + String actualClusterId = mockPool.getClusterId(mockKubeConfig); + Assertions.assertEquals(clusterId, actualClusterId); + + KubernetesClient client1 = mockPool.getClient(clusterId, mockKubeConfig); + Assertions.assertNotNull(client1); + Assertions.assertEquals(mockClient, client1); + + mockPool.returnClient(clusterId, client1); + Mockito.verify(mockPool).returnClient(clusterId, client1); + + KubernetesClient client2 = mockPool.getClient(clusterId, mockKubeConfig); + Assertions.assertNotNull(client2); + + mockPool.closePool(clusterId); + Mockito.verify(mockPool).closePool(clusterId); + } + + /** + * test:PoolConfig, + */ + @Test + public void testKubernetesClientPoolConfig() { + try { + KubernetesClientPool.PoolConfig expectedConfig = new KubernetesClientPool.PoolConfig( + 10, // maxSize + 2, // minIdle + 5, // maxIdle + 10000, // maxWaitMs + 600000 // idleTimeoutMs + ); + + KubernetesClientPool mockPool = Mockito.mock(KubernetesClientPool.class); + + java.lang.reflect.Field configField = KubernetesClientPool.class.getDeclaredField("poolConfig"); + configField.setAccessible(true); + configField.set(mockPool, expectedConfig); + + Assertions.assertEquals(10, expectedConfig.getMaxSize()); + Assertions.assertEquals(2, expectedConfig.getMinIdle()); + Assertions.assertEquals(5, expectedConfig.getMaxIdle()); + Assertions.assertEquals(10000, expectedConfig.getMaxWaitMs()); + } catch (Exception e) { + Assertions.fail("Failed to test connection pool config: " + e.getMessage()); + } + } + + /** + * Test handling of invalid kubeConfig + */ + @Test + public void testInvalidKubeConfig() { + String invalidKubeConfig = "invalid config"; + // Mock KubernetesClientPool.getInstance() to return our mockPool + KubernetesClientPool realPool = Mockito.spy(KubernetesClientPool.class); // spy on actual method + String clusterId = realPool.getClusterId(invalidKubeConfig); + Assertions.assertNotNull(clusterId, "Cluster ID should not be null even for invalid kubeConfig"); + + // Mock getClient to throw exception for invalid kubeConfig + Mockito.when(mockPool.getClient(clusterId, invalidKubeConfig)) + .thenThrow(new RuntimeException("Invalid kubeconfig")); + + // Test getClient with invalid kubeConfig (should throw exception) + Assertions.assertThrows(Exception.class, () -> mockPool.getClient(clusterId, invalidKubeConfig), + "getClient should throw exception for invalid kubeConfig"); + } + @Test + public void testClusterIdGeneration() { + + KubernetesClientPool mockPool = KubernetesClientPool.getInstance(); + + String mockClusterId = "mock-cluster-id-1"; + Mockito.when(mockPool.getClusterId(mockKubeConfig)).thenReturn(mockClusterId); + + String differentKubeConfig = mockKubeConfig + "#different"; + String mockDifferentClusterId = "mock-cluster-id-2"; + Mockito.when(mockPool.getClusterId(differentKubeConfig)).thenReturn(mockDifferentClusterId); + + String clusterId1 = mockPool.getClusterId(mockKubeConfig); + String clusterId2 = mockPool.getClusterId(mockKubeConfig); + Assertions.assertEquals(clusterId1, clusterId2); + Assertions.assertEquals(mockClusterId, clusterId1); + + String clusterId3 = mockPool.getClusterId(differentKubeConfig); + Assertions.assertNotEquals(clusterId1, clusterId3); + Assertions.assertEquals(mockDifferentClusterId, clusterId3); + } + + /** + * Test singleton pattern of KubernetesClientPool + */ + @Test + public void testSingletonPattern() { + KubernetesClientPool instance1 = KubernetesClientPool.getInstance(); + KubernetesClientPool instance2 = KubernetesClientPool.getInstance(); + Assertions.assertSame(instance1, instance2, "KubernetesClientPool should be a singleton"); + } + + /** + * Test getClusterId method with valid kubeConfig + */ + @Test + public void testGetClusterId() { + String clusterId = mockPool.getClusterId(mockKubeConfig); + Assertions.assertNotNull(clusterId, "Cluster ID should not be null"); + // Verify that the same kubeConfig returns the same clusterId + String sameClusterId = mockPool.getClusterId(mockKubeConfig); + Assertions.assertEquals(clusterId, sameClusterId, "Same kubeConfig should return same cluster ID"); + } + + /** + * Test getClusterId method with different kubeConfigs + */ + @Test + public void testGetClusterIdDifferentConfigs() { + String clusterId1 = mockPool.getClusterId(mockKubeConfig); + String differentKubeConfig = mockKubeConfig + "#different"; + String clusterId2 = mockPool.getClusterId(differentKubeConfig); + Assertions.assertNotEquals(clusterId1, clusterId2, "Different kubeConfigs should return different cluster IDs"); + } + + /** + * Test getClient method with valid parameters + */ + @Test + public void testGetClient() throws Exception { + // Mock a KubernetesClient and ClusterClientPool + KubernetesClient mockClient = Mockito.mock(KubernetesClient.class); + // Get the real instance for testing + KubernetesClientPool realPool = Mockito.spy(KubernetesClientPool.class); + + // Mock the clusterClientPools field to return a mock ClusterClientPool + java.lang.reflect.Field clusterClientPoolsField = + KubernetesClientPool.class.getDeclaredField("clusterClientPools"); + clusterClientPoolsField.setAccessible(true); + ConcurrentHashMap<String, KubernetesClientPool.ClusterClientPool> mockClusterClientPools = + new ConcurrentHashMap<>(); + + // Create a mock ClusterClientPool + KubernetesClientPool.ClusterClientPool mockClusterClientPool = + Mockito.mock(KubernetesClientPool.ClusterClientPool.class); + mockClusterClientPools.put(clusterId, mockClusterClientPool); + + // Mock the borrowObject method + Mockito.doReturn(mockClient).when(mockClusterClientPool).borrowObject(); + + // Set the mock clusterClientPools into the realPool + clusterClientPoolsField.set(realPool, mockClusterClientPools); + + // Mock the getInstance method to return our spy + Mockito.when(KubernetesClientPool.getInstance()).thenReturn(realPool); + + // Test getting a client + KubernetesClient client = realPool.getClient(clusterId, mockKubeConfig); + + // Verify the client is not null + Assertions.assertNotNull(client, "Client should not be null"); + Assertions.assertEquals(mockClient, client, "Returned client should match the mock client"); + } + + /** + * Test returnClient method + */ + @Test + public void testReturnClient() throws Exception { + // Mock a KubernetesClient and ClusterClientPool + KubernetesClient mockClient = Mockito.mock(KubernetesClient.class); + + // Get the real instance for testing + KubernetesClientPool realPool = Mockito.spy(KubernetesClientPool.class); + + // Mock the clusterClientPools field to return a mock ClusterClientPool + java.lang.reflect.Field clusterClientPoolsField = + KubernetesClientPool.class.getDeclaredField("clusterClientPools"); + clusterClientPoolsField.setAccessible(true); + ConcurrentHashMap<String, KubernetesClientPool.ClusterClientPool> mockClusterClientPools = + new ConcurrentHashMap<>(); + + // Create a mock ClusterClientPool + KubernetesClientPool.ClusterClientPool mockClusterClientPool = + Mockito.mock(KubernetesClientPool.ClusterClientPool.class); + mockClusterClientPools.put(clusterId, mockClusterClientPool); + + // Mock the borrowObject method to return our mock client + Mockito.doReturn(mockClient).when(mockClusterClientPool).borrowObject(); + + // Set the mock clusterClientPools into the realPool + clusterClientPoolsField.set(realPool, mockClusterClientPools); + + // Mock the getInstance method to return our spy + Mockito.when(KubernetesClientPool.getInstance()).thenReturn(realPool); + + // Get a client + KubernetesClient client = realPool.getClient(clusterId, mockKubeConfig); + Assertions.assertNotNull(client, "Client should not be null"); + + // Return the client to the pool + realPool.returnClient(clusterId, client); + + // Verify that returnObject was called on the mockClusterClientPool + Mockito.verify(mockClusterClientPool).returnObject(client); + + // Verify that getClient calls borrowObject again + realPool.getClient(clusterId, mockKubeConfig); + Mockito.verify(mockClusterClientPool, Mockito.times(2)).borrowObject(); + } + + /** + * Test closePool method + */ + @Test + public void testClosePool() throws Exception { + // Get the real instance for testing + KubernetesClientPool realPool = Mockito.spy(KubernetesClientPool.class); + + // Mock the clusterClientPools field to return a mock ClusterClientPool + java.lang.reflect.Field clusterClientPoolsField = + KubernetesClientPool.class.getDeclaredField("clusterClientPools"); + clusterClientPoolsField.setAccessible(true); + ConcurrentHashMap<String, KubernetesClientPool.ClusterClientPool> mockClusterClientPools = + new ConcurrentHashMap<>(); + + // Create a mock ClusterClientPool + KubernetesClientPool.ClusterClientPool mockClusterClientPool = + Mockito.mock(KubernetesClientPool.ClusterClientPool.class); + mockClusterClientPools.put(clusterId, mockClusterClientPool); + + // Set the mock clusterClientPools into the realPool + clusterClientPoolsField.set(realPool, mockClusterClientPools); + + // Mock the getInstance method to return our spy + Mockito.when(KubernetesClientPool.getInstance()).thenReturn(realPool); + + // Close the pool + realPool.closePool(clusterId); + + // Verify that the ClusterClientPool's close method was called + Mockito.verify(mockClusterClientPool).close(); + + // Verify that the clusterClientPools no longer contains the clusterId + Assertions.assertFalse(mockClusterClientPools.containsKey(clusterId), + "Cluster client pool should be removed after close"); + } + + /** + * Test close method (closes all pools) + */ + @Test + public void testClose() throws Exception { + // Get the real instance for testing + KubernetesClientPool realPool = Mockito.spy(KubernetesClientPool.class); + + // Mock the clusterClientPools field + java.lang.reflect.Field clusterClientPoolsField = + KubernetesClientPool.class.getDeclaredField("clusterClientPools"); + clusterClientPoolsField.setAccessible(true); + ConcurrentHashMap<String, KubernetesClientPool.ClusterClientPool> mockClusterClientPools = + new ConcurrentHashMap<>(); + + // Create multiple mock ClusterClientPools for different clusters + String clusterId1 = clusterId; + String clusterId2 = clusterId + "_2"; + + KubernetesClientPool.ClusterClientPool mockClusterClientPool1 = + Mockito.mock(KubernetesClientPool.ClusterClientPool.class); + KubernetesClientPool.ClusterClientPool mockClusterClientPool2 = + Mockito.mock(KubernetesClientPool.ClusterClientPool.class); + + mockClusterClientPools.put(clusterId1, mockClusterClientPool1); + mockClusterClientPools.put(clusterId2, mockClusterClientPool2); + + // Set the mock clusterClientPools into the realPool + clusterClientPoolsField.set(realPool, mockClusterClientPools); + + // Mock the getInstance method to return our spy + Mockito.when(KubernetesClientPool.getInstance()).thenReturn(realPool); + + // Close all pools + realPool.closePool(clusterId1); + realPool.closePool(clusterId2); + + // Verify that both ClusterClientPools' close methods were called + Mockito.verify(mockClusterClientPool1).close(); + Mockito.verify(mockClusterClientPool2).close(); + + // Verify that the clusterClientPools is now empty + Assertions.assertTrue(mockClusterClientPools.isEmpty(), + "All cluster client pools should be removed after close"); + } + + @Test + public void testConcurrentAccess() throws Exception { + final int threadCount = 10; + final int operationsPerThread = 5; + final CountDownLatch latch = new CountDownLatch(threadCount); + final ExecutorService executorService = Executors.newFixedThreadPool(threadCount); + final List<Exception> exceptions = new ArrayList<>(); + + // Get the real instance for testing + KubernetesClientPool realPool = Mockito.spy(KubernetesClientPool.class); + + // Mock the clusterClientPools field + java.lang.reflect.Field clusterClientPoolsField = + KubernetesClientPool.class.getDeclaredField("clusterClientPools"); + clusterClientPoolsField.setAccessible(true); + ConcurrentHashMap<String, KubernetesClientPool.ClusterClientPool> mockClusterClientPools = + new ConcurrentHashMap<>(); + + // Create a mock ClusterClientPool + KubernetesClientPool.ClusterClientPool mockClusterClientPool = + Mockito.mock(KubernetesClientPool.ClusterClientPool.class); + mockClusterClientPools.put(clusterId, mockClusterClientPool); + + // Create a mock KubernetesClient + KubernetesClient mockClient = Mockito.mock(KubernetesClient.class); + + // Mock the borrowObject method to return our mock client + Mockito.when(mockClusterClientPool.borrowObject()).thenReturn(mockClient); + + // Set the mock clusterClientPools into the realPool + clusterClientPoolsField.set(realPool, mockClusterClientPools); + + // Mock the getInstance method to return our spy + Mockito.when(KubernetesClientPool.getInstance()).thenReturn(realPool); + + try { + // Start multiple threads that concurrently access the client pool + for (int i = 0; i < threadCount; i++) { + executorService.submit(() -> { + try { + for (int j = 0; j < operationsPerThread; j++) { + // Get a client + KubernetesClient client = realPool.getClient(clusterId, mockKubeConfig); + Assertions.assertNotNull(client, "Client should not be null"); + + // Simulate some work with the client + Awaitility.await().atMost(Duration.ofMillis(10)); + + // Return the client to the pool + realPool.returnClient(clusterId, client); + } + } catch (Exception e) { + exceptions.add(e); + } finally { + latch.countDown(); + } + }); + } + + // Wait for all threads to complete + boolean completed = latch.await(60, TimeUnit.SECONDS); + Assertions.assertTrue(completed, "All threads should complete within timeout"); + + // Verify no exceptions occurred during concurrent access + Assertions.assertTrue(exceptions.isEmpty(), "No exceptions should occur during concurrent access"); + + // Verify that borrowObject and returnObject were called the expected number of times + int expectedCalls = threadCount * operationsPerThread; + Mockito.verify(mockClusterClientPool, Mockito.times(expectedCalls)).borrowObject(); + Mockito.verify(mockClusterClientPool, Mockito.times(expectedCalls)).returnObject(Mockito.any()); + } finally { + executorService.shutdown(); + } + } + + /** + * Test handling of closed client + */ + @Test + public void testHandlingClosedClient() { + // Get the real instance for testing + KubernetesClientPool realPool = Mockito.spy(KubernetesClientPool.class); + + // Mock the clusterClientPools field to return a mock ClusterClientPool + try { + java.lang.reflect.Field clusterClientPoolsField = + KubernetesClientPool.class.getDeclaredField("clusterClientPools"); + clusterClientPoolsField.setAccessible(true); + ConcurrentHashMap<String, KubernetesClientPool.ClusterClientPool> mockClusterClientPools = + new ConcurrentHashMap<>(); + + // Create a mock ClusterClientPool + KubernetesClientPool.ClusterClientPool mockClusterClientPool = + Mockito.mock(KubernetesClientPool.ClusterClientPool.class); + mockClusterClientPools.put(clusterId, mockClusterClientPool); + + // Mock the borrowObject method to throw an exception, simulating a closed client scenario + Mockito.doThrow(new RuntimeException("Client is closed")).when(mockClusterClientPool).borrowObject(); + + // Set the mock clusterClientPools into the realPool + clusterClientPoolsField.set(realPool, mockClusterClientPools); + + // Test getClient with a closed client scenario + try { + // This should handle the exception gracefully + KubernetesClient client = realPool.getClient(clusterId, mockKubeConfig); + // If no exception is thrown, verify the client is handled properly + Assertions.assertNull(client, "Client should be null when borrowObject fails"); + } catch (Exception e) { + // If exception propagates, this is also acceptable behavior + System.out.println("Expected exception when getting client in closed scenario: " + e.getMessage()); + } + + // Verify that borrowObject was called + Mockito.verify(mockClusterClientPool).borrowObject(); + } catch (Exception e) { + Assertions.fail("Failed to setup test for closed client scenario: " + e.getMessage()); + } + } + + /** + * Test that PooledClient constructor correctly initializes the client and lastUsedTime. + */ + @Test + public void testPooledClientConstructor() throws Exception { + // Mock a KubernetesClient + KubernetesClient mockClient = Mockito.mock(KubernetesClient.class); + Class<?> pooledClientClass = getPooledClientClass(); + long currentTime = System.currentTimeMillis(); + + // Create a PooledClient instance using reflection + Constructor<?> constructor = pooledClientClass.getDeclaredConstructor(KubernetesClient.class); + constructor.setAccessible(true); + Object pooledClient = constructor.newInstance(mockClient); + + // Verify that the client was set correctly + Field clientField = pooledClientClass.getDeclaredField("client"); + Field lastUsedTimeField = pooledClientClass.getDeclaredField("lastUsedTime"); + lastUsedTimeField.setAccessible(true); + clientField.setAccessible(true); + + Object clientValue = clientField.get(pooledClient); + // Verify the client is accessible and correct + Assertions.assertNotNull(clientValue, "Client should not be null"); + Assertions.assertSame(mockClient, clientValue, "Client reference should match the original client"); + + // Verify that lastUsedTime was initialized correctly + long lastUsedTimeValue = lastUsedTimeField.getLong(pooledClient); + + // Allow for a small time difference (up to 1 second) between test execution and constructor call + Assertions.assertEquals(currentTime, lastUsedTimeValue, 1000, + "lastUsedTime should be initialized to current time"); + + // Test that we can modify the lastUsedTime field. + long newTime = System.currentTimeMillis() + 10000; // 10 seconds in the future + lastUsedTimeField.setLong(pooledClient, newTime); + // Verify the modification + long updatedTime = lastUsedTimeField.getLong(pooledClient); + Assertions.assertEquals(newTime, updatedTime, "lastUsedTime should be successfully updated"); + } + + /** + * Helper method to get the PooledClient class using reflection. + */ + private Class<?> getPooledClientClass() throws ClassNotFoundException { + Class<?> clusterClientPoolClass = + Class.forName("org.apache.dolphinscheduler.plugin.task.api.k8s.KubernetesClientPool$ClusterClientPool"); + Class<?>[] nestedClasses = clusterClientPoolClass.getDeclaredClasses(); + Class<?> targetClass = KubernetesClientPool.ClusterClientPool.PooledClient.class; + + for (Class<?> nestedClass : nestedClasses) { + if (nestedClass == targetClass) { + return nestedClass; + } + } + + throw new ClassNotFoundException("Could not find PooledClient class"); + } + + @Test + public void testClusterClientPoolBorrowObjectTimeout() throws Exception { + // Mock a KubernetesClient and create a PoolConfig with small timeout + KubernetesClientPool.PoolConfig poolConfig = new KubernetesClientPool.PoolConfig( + 2, // maxSize + 0, // minIdle + 2, // maxIdle + 500, // maxWaitMs (500ms timeout) + 600000); // idleTimeoutMs + + // Create a real ClusterClientPool instance using reflection + Class<?> clusterClientPoolClass = + Class.forName("org.apache.dolphinscheduler.plugin.task.api.k8s.KubernetesClientPool$ClusterClientPool"); + Constructor<?> constructor = clusterClientPoolClass.getDeclaredConstructor(String.class, String.class, + KubernetesClientPool.PoolConfig.class); + constructor.setAccessible(true); + + // Create mock clients + KubernetesClient mockClient1 = Mockito.mock(KubernetesClient.class); + KubernetesClient mockClient2 = Mockito.mock(KubernetesClient.class); + + // Use a class-level AtomicInteger to track builder calls + AtomicInteger builderCounter = new AtomicInteger(0); + + // Mock KubernetesClientBuilder to control client creation + try ( + MockedConstruction<KubernetesClientBuilder> mockedConstruction = + Mockito.mockConstruction(KubernetesClientBuilder.class, + (mock, context) -> { + // Configure the mock behavior + Mockito.when(mock.withConfig((Config) Mockito.any())).thenReturn(mock); + // Use the class-level counter to determine which mock client to return + Mockito.when(mock.build()).thenAnswer(invocation -> { + int count = builderCounter.getAndIncrement(); + if (count == 0) { + return mockClient1; + } else if (count == 1) { + return mockClient2; + } else { + // For any additional calls, just return mockClient2 + return mockClient2; + } + }); + })) { + + Object clusterClientPool = constructor.newInstance(clusterId, mockKubeConfig, poolConfig); + + // Get the borrowObject method + Method borrowObjectMethod = clusterClientPoolClass.getDeclaredMethod("borrowObject"); + borrowObjectMethod.setAccessible(true); + + // Get all available clients (maxSize = 2) + KubernetesClient client1 = (KubernetesClient) borrowObjectMethod.invoke(clusterClientPool); + KubernetesClient client2 = (KubernetesClient) borrowObjectMethod.invoke(clusterClientPool); Review Comment: Variable 'KubernetesClient client1' is never read. ```suggestion borrowObjectMethod.invoke(clusterClientPool); borrowObjectMethod.invoke(clusterClientPool); ``` ########## dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/KubernetesClientPool.java: ########## @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.k8s; + +import org.apache.dolphinscheduler.common.utils.PropertyUtils; +import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; + +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.util.Base64; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import lombok.Getter; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; + +/** + * KubernetesClientPool is used to manage the Kubernetes client connection pool + * Maintain an independent connection pool for each K8s cluster to implement connection creation, acquisition, return, and closure + */ +@Slf4j +public class KubernetesClientPool { + + private static final String BASE64_PADDING_CHARACTER = "="; + private static final String EMPTY_STRING = ""; + private final static int CLEANUP_THREAD_REGULAR = 30000; + + /** + * Connection pool instance + */ + private static final KubernetesClientPool INSTANCE = new KubernetesClientPool(); + + /** + * Cluster connection pool mapping, with the key being the cluster identifier + */ + private final ConcurrentMap<String, ClusterClientPool> clusterClientPools = new ConcurrentHashMap<>(); + + /** + * Connection pool configuration + */ + private final PoolConfig poolConfig; + + private KubernetesClientPool() { + this.poolConfig = new PoolConfig( + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MAX_SIZE, 10), + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MIN_IDLE, 2), + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MAX_IDLE, 5), + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MAX_WAIT_MS, 30000), + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_IDLE_TIMEOUT_MS, 600000)); + log.info("KubernetesClientPool initialized with config: {}", poolConfig); + + // clean connection thread + startCleanupThread(); + } + + public static KubernetesClientPool getInstance() { + return INSTANCE; + } + + /** + * Generate cluster identifier based on kubeconfig + * @param kubeConfig kubeconfig Configuration + * @return Cluster identification + */ + public String getClusterId(String kubeConfig) { + try { + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + byte[] hashBytes = digest.digest(kubeConfig.getBytes(StandardCharsets.UTF_8)); + String base64Hash = Base64.getUrlEncoder().encodeToString(hashBytes); + return base64Hash.replace(BASE64_PADDING_CHARACTER, EMPTY_STRING); + } catch (Exception e) { + log.error("Failed to generate cluster ID", e); + return Integer.toString(kubeConfig.hashCode()); Review Comment: The fallback cluster ID generation using `hashCode()` can produce negative values, which could cause issues. Although line 170 in KubernetesApplicationManager uses `hashCode & 0x7FFFFFFF` to ensure non-negative values, this fallback in KubernetesClientPool doesn't apply the same mask, creating inconsistency in cluster ID generation. ```suggestion return Integer.toString(kubeConfig.hashCode() & 0x7FFFFFFF); ``` ########## dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java: ########## @@ -56,33 +53,47 @@ public class KubernetesApplicationManager implements ApplicationManager<Kubernet private static final String FAILED = "Failed"; private static final String UNKNOWN = "Unknown"; + private static final String K8S_CLUSTER_PREFIX = "k8s-cluster-"; private static final int MAX_RETRY_TIMES = 10; /** - * cache k8s client for same task + * Get Kubernetes client connection pool instance */ - private final Map<String, KubernetesClient> cacheClientMap = new ConcurrentHashMap<>(); + public final KubernetesClientPool clientPool = KubernetesClientPool.getInstance(); @Override public boolean killApplication(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) throws TaskException { boolean isKill; String labelValue = kubernetesApplicationManagerContext.getLabelValue(); + FilterWatchListDeletable<Pod, PodList, PodResource> watchList = getListenPod(kubernetesApplicationManagerContext); try { if (getApplicationStatus(kubernetesApplicationManagerContext, watchList).isFailure()) { log.error("Driver pod is in FAILED or UNKNOWN status."); isKill = false; } else { - watchList.delete(); - isKill = true; + String clusterId = getClusterId(kubernetesApplicationManagerContext.getK8sTaskExecutionContext()); + KubernetesClient client = null; + try { + client = getClient(kubernetesApplicationManagerContext); + // Retrieve watchList again, as the previous instance of tes client connection pool may have expired Review Comment: Typo in comment: 'tes client' should be 'the client'. ```suggestion // Retrieve watchList again, as the previous instance of the client connection pool may have expired ``` ########## dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/KubernetesClientPoolTest.java: ########## @@ -0,0 +1,1262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.k8s; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockedConstruction; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +import io.fabric8.kubernetes.api.model.Namespace; +import io.fabric8.kubernetes.api.model.NamespaceList; +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; +import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation; +import io.fabric8.kubernetes.client.dsl.Resource; + +public class KubernetesClientPoolTest { + + private KubernetesClientPool mockPool; + private final String mockKubeConfig = + "apiVersion: v1\nclusters:\n- cluster:\n server: https://kubernetes.default.svc\n name: mock-cluster\ncontexts:\n- context:\n cluster: mock-cluster\n namespace: default\n user: mock-user\n name: mock-context\ncurrent-context: mock-context\nkind: Config\npreferences: {}\nusers:\n- name: mock-user\n user: {}"; + private final String clusterId = "mock-cluster-id"; + private MockedStatic<KubernetesClientPool> mockedKubernetesClientPool; + + @BeforeEach + public void before() throws Exception { + mockedKubernetesClientPool = Mockito.mockStatic(KubernetesClientPool.class); + + mockPool = Mockito.mock(KubernetesClientPool.class); + KubernetesClient mockClient = Mockito.mock(KubernetesClient.class); + + Mockito.when(KubernetesClientPool.getInstance()).thenReturn(mockPool); + Mockito.when(mockPool.getClient(Mockito.anyString(), Mockito.anyString())).thenReturn(mockClient); + Mockito.when(mockPool.getClusterId(mockKubeConfig)).thenReturn(clusterId); + } + + @AfterEach + public void after() { + if (mockedKubernetesClientPool != null) { + mockedKubernetesClientPool.close(); + } + } + + /** + * test: getClusterId,getClient,closePool,returnClient + */ + @Test + public void testKubernetesClientPoolBasicFunction() { + KubernetesClientPool mockPool = KubernetesClientPool.getInstance(); + + String clusterId = "mock-cluster-id"; + Mockito.when(mockPool.getClusterId(Mockito.anyString())).thenReturn(clusterId); + + KubernetesClient mockClient = Mockito.mock(KubernetesClient.class); + Mockito.when(mockPool.getClient(clusterId, mockKubeConfig)).thenReturn(mockClient); + + String actualClusterId = mockPool.getClusterId(mockKubeConfig); + Assertions.assertEquals(clusterId, actualClusterId); + + KubernetesClient client1 = mockPool.getClient(clusterId, mockKubeConfig); + Assertions.assertNotNull(client1); + Assertions.assertEquals(mockClient, client1); + + mockPool.returnClient(clusterId, client1); + Mockito.verify(mockPool).returnClient(clusterId, client1); + + KubernetesClient client2 = mockPool.getClient(clusterId, mockKubeConfig); + Assertions.assertNotNull(client2); + + mockPool.closePool(clusterId); + Mockito.verify(mockPool).closePool(clusterId); + } + + /** + * test:PoolConfig, + */ + @Test + public void testKubernetesClientPoolConfig() { + try { + KubernetesClientPool.PoolConfig expectedConfig = new KubernetesClientPool.PoolConfig( + 10, // maxSize + 2, // minIdle + 5, // maxIdle + 10000, // maxWaitMs + 600000 // idleTimeoutMs + ); + + KubernetesClientPool mockPool = Mockito.mock(KubernetesClientPool.class); + + java.lang.reflect.Field configField = KubernetesClientPool.class.getDeclaredField("poolConfig"); + configField.setAccessible(true); + configField.set(mockPool, expectedConfig); + + Assertions.assertEquals(10, expectedConfig.getMaxSize()); + Assertions.assertEquals(2, expectedConfig.getMinIdle()); + Assertions.assertEquals(5, expectedConfig.getMaxIdle()); + Assertions.assertEquals(10000, expectedConfig.getMaxWaitMs()); + } catch (Exception e) { + Assertions.fail("Failed to test connection pool config: " + e.getMessage()); + } + } + + /** + * Test handling of invalid kubeConfig + */ + @Test + public void testInvalidKubeConfig() { + String invalidKubeConfig = "invalid config"; + // Mock KubernetesClientPool.getInstance() to return our mockPool + KubernetesClientPool realPool = Mockito.spy(KubernetesClientPool.class); // spy on actual method + String clusterId = realPool.getClusterId(invalidKubeConfig); + Assertions.assertNotNull(clusterId, "Cluster ID should not be null even for invalid kubeConfig"); + + // Mock getClient to throw exception for invalid kubeConfig + Mockito.when(mockPool.getClient(clusterId, invalidKubeConfig)) + .thenThrow(new RuntimeException("Invalid kubeconfig")); + + // Test getClient with invalid kubeConfig (should throw exception) + Assertions.assertThrows(Exception.class, () -> mockPool.getClient(clusterId, invalidKubeConfig), + "getClient should throw exception for invalid kubeConfig"); + } + @Test + public void testClusterIdGeneration() { + + KubernetesClientPool mockPool = KubernetesClientPool.getInstance(); + + String mockClusterId = "mock-cluster-id-1"; + Mockito.when(mockPool.getClusterId(mockKubeConfig)).thenReturn(mockClusterId); + + String differentKubeConfig = mockKubeConfig + "#different"; + String mockDifferentClusterId = "mock-cluster-id-2"; + Mockito.when(mockPool.getClusterId(differentKubeConfig)).thenReturn(mockDifferentClusterId); + + String clusterId1 = mockPool.getClusterId(mockKubeConfig); + String clusterId2 = mockPool.getClusterId(mockKubeConfig); + Assertions.assertEquals(clusterId1, clusterId2); + Assertions.assertEquals(mockClusterId, clusterId1); + + String clusterId3 = mockPool.getClusterId(differentKubeConfig); + Assertions.assertNotEquals(clusterId1, clusterId3); + Assertions.assertEquals(mockDifferentClusterId, clusterId3); + } + + /** + * Test singleton pattern of KubernetesClientPool + */ + @Test + public void testSingletonPattern() { + KubernetesClientPool instance1 = KubernetesClientPool.getInstance(); + KubernetesClientPool instance2 = KubernetesClientPool.getInstance(); + Assertions.assertSame(instance1, instance2, "KubernetesClientPool should be a singleton"); + } + + /** + * Test getClusterId method with valid kubeConfig + */ + @Test + public void testGetClusterId() { + String clusterId = mockPool.getClusterId(mockKubeConfig); + Assertions.assertNotNull(clusterId, "Cluster ID should not be null"); + // Verify that the same kubeConfig returns the same clusterId + String sameClusterId = mockPool.getClusterId(mockKubeConfig); + Assertions.assertEquals(clusterId, sameClusterId, "Same kubeConfig should return same cluster ID"); + } + + /** + * Test getClusterId method with different kubeConfigs + */ + @Test + public void testGetClusterIdDifferentConfigs() { + String clusterId1 = mockPool.getClusterId(mockKubeConfig); + String differentKubeConfig = mockKubeConfig + "#different"; + String clusterId2 = mockPool.getClusterId(differentKubeConfig); + Assertions.assertNotEquals(clusterId1, clusterId2, "Different kubeConfigs should return different cluster IDs"); + } + + /** + * Test getClient method with valid parameters + */ + @Test + public void testGetClient() throws Exception { + // Mock a KubernetesClient and ClusterClientPool + KubernetesClient mockClient = Mockito.mock(KubernetesClient.class); + // Get the real instance for testing + KubernetesClientPool realPool = Mockito.spy(KubernetesClientPool.class); + + // Mock the clusterClientPools field to return a mock ClusterClientPool + java.lang.reflect.Field clusterClientPoolsField = + KubernetesClientPool.class.getDeclaredField("clusterClientPools"); + clusterClientPoolsField.setAccessible(true); + ConcurrentHashMap<String, KubernetesClientPool.ClusterClientPool> mockClusterClientPools = + new ConcurrentHashMap<>(); + + // Create a mock ClusterClientPool + KubernetesClientPool.ClusterClientPool mockClusterClientPool = + Mockito.mock(KubernetesClientPool.ClusterClientPool.class); + mockClusterClientPools.put(clusterId, mockClusterClientPool); + + // Mock the borrowObject method + Mockito.doReturn(mockClient).when(mockClusterClientPool).borrowObject(); + + // Set the mock clusterClientPools into the realPool + clusterClientPoolsField.set(realPool, mockClusterClientPools); + + // Mock the getInstance method to return our spy + Mockito.when(KubernetesClientPool.getInstance()).thenReturn(realPool); + + // Test getting a client + KubernetesClient client = realPool.getClient(clusterId, mockKubeConfig); + + // Verify the client is not null + Assertions.assertNotNull(client, "Client should not be null"); + Assertions.assertEquals(mockClient, client, "Returned client should match the mock client"); + } + + /** + * Test returnClient method + */ + @Test + public void testReturnClient() throws Exception { + // Mock a KubernetesClient and ClusterClientPool + KubernetesClient mockClient = Mockito.mock(KubernetesClient.class); + + // Get the real instance for testing + KubernetesClientPool realPool = Mockito.spy(KubernetesClientPool.class); + + // Mock the clusterClientPools field to return a mock ClusterClientPool + java.lang.reflect.Field clusterClientPoolsField = + KubernetesClientPool.class.getDeclaredField("clusterClientPools"); + clusterClientPoolsField.setAccessible(true); + ConcurrentHashMap<String, KubernetesClientPool.ClusterClientPool> mockClusterClientPools = + new ConcurrentHashMap<>(); + + // Create a mock ClusterClientPool + KubernetesClientPool.ClusterClientPool mockClusterClientPool = + Mockito.mock(KubernetesClientPool.ClusterClientPool.class); + mockClusterClientPools.put(clusterId, mockClusterClientPool); + + // Mock the borrowObject method to return our mock client + Mockito.doReturn(mockClient).when(mockClusterClientPool).borrowObject(); + + // Set the mock clusterClientPools into the realPool + clusterClientPoolsField.set(realPool, mockClusterClientPools); + + // Mock the getInstance method to return our spy + Mockito.when(KubernetesClientPool.getInstance()).thenReturn(realPool); + + // Get a client + KubernetesClient client = realPool.getClient(clusterId, mockKubeConfig); + Assertions.assertNotNull(client, "Client should not be null"); + + // Return the client to the pool + realPool.returnClient(clusterId, client); + + // Verify that returnObject was called on the mockClusterClientPool + Mockito.verify(mockClusterClientPool).returnObject(client); + + // Verify that getClient calls borrowObject again + realPool.getClient(clusterId, mockKubeConfig); + Mockito.verify(mockClusterClientPool, Mockito.times(2)).borrowObject(); + } + + /** + * Test closePool method + */ + @Test + public void testClosePool() throws Exception { + // Get the real instance for testing + KubernetesClientPool realPool = Mockito.spy(KubernetesClientPool.class); + + // Mock the clusterClientPools field to return a mock ClusterClientPool + java.lang.reflect.Field clusterClientPoolsField = + KubernetesClientPool.class.getDeclaredField("clusterClientPools"); + clusterClientPoolsField.setAccessible(true); + ConcurrentHashMap<String, KubernetesClientPool.ClusterClientPool> mockClusterClientPools = + new ConcurrentHashMap<>(); + + // Create a mock ClusterClientPool + KubernetesClientPool.ClusterClientPool mockClusterClientPool = + Mockito.mock(KubernetesClientPool.ClusterClientPool.class); + mockClusterClientPools.put(clusterId, mockClusterClientPool); + + // Set the mock clusterClientPools into the realPool + clusterClientPoolsField.set(realPool, mockClusterClientPools); + + // Mock the getInstance method to return our spy + Mockito.when(KubernetesClientPool.getInstance()).thenReturn(realPool); + + // Close the pool + realPool.closePool(clusterId); + + // Verify that the ClusterClientPool's close method was called + Mockito.verify(mockClusterClientPool).close(); + + // Verify that the clusterClientPools no longer contains the clusterId + Assertions.assertFalse(mockClusterClientPools.containsKey(clusterId), + "Cluster client pool should be removed after close"); + } + + /** + * Test close method (closes all pools) + */ + @Test + public void testClose() throws Exception { + // Get the real instance for testing + KubernetesClientPool realPool = Mockito.spy(KubernetesClientPool.class); + + // Mock the clusterClientPools field + java.lang.reflect.Field clusterClientPoolsField = + KubernetesClientPool.class.getDeclaredField("clusterClientPools"); + clusterClientPoolsField.setAccessible(true); + ConcurrentHashMap<String, KubernetesClientPool.ClusterClientPool> mockClusterClientPools = + new ConcurrentHashMap<>(); + + // Create multiple mock ClusterClientPools for different clusters + String clusterId1 = clusterId; + String clusterId2 = clusterId + "_2"; + + KubernetesClientPool.ClusterClientPool mockClusterClientPool1 = + Mockito.mock(KubernetesClientPool.ClusterClientPool.class); + KubernetesClientPool.ClusterClientPool mockClusterClientPool2 = + Mockito.mock(KubernetesClientPool.ClusterClientPool.class); + + mockClusterClientPools.put(clusterId1, mockClusterClientPool1); + mockClusterClientPools.put(clusterId2, mockClusterClientPool2); + + // Set the mock clusterClientPools into the realPool + clusterClientPoolsField.set(realPool, mockClusterClientPools); + + // Mock the getInstance method to return our spy + Mockito.when(KubernetesClientPool.getInstance()).thenReturn(realPool); + + // Close all pools + realPool.closePool(clusterId1); + realPool.closePool(clusterId2); + + // Verify that both ClusterClientPools' close methods were called + Mockito.verify(mockClusterClientPool1).close(); + Mockito.verify(mockClusterClientPool2).close(); + + // Verify that the clusterClientPools is now empty + Assertions.assertTrue(mockClusterClientPools.isEmpty(), + "All cluster client pools should be removed after close"); + } + + @Test + public void testConcurrentAccess() throws Exception { + final int threadCount = 10; + final int operationsPerThread = 5; + final CountDownLatch latch = new CountDownLatch(threadCount); + final ExecutorService executorService = Executors.newFixedThreadPool(threadCount); + final List<Exception> exceptions = new ArrayList<>(); + + // Get the real instance for testing + KubernetesClientPool realPool = Mockito.spy(KubernetesClientPool.class); + + // Mock the clusterClientPools field + java.lang.reflect.Field clusterClientPoolsField = + KubernetesClientPool.class.getDeclaredField("clusterClientPools"); + clusterClientPoolsField.setAccessible(true); + ConcurrentHashMap<String, KubernetesClientPool.ClusterClientPool> mockClusterClientPools = + new ConcurrentHashMap<>(); + + // Create a mock ClusterClientPool + KubernetesClientPool.ClusterClientPool mockClusterClientPool = + Mockito.mock(KubernetesClientPool.ClusterClientPool.class); + mockClusterClientPools.put(clusterId, mockClusterClientPool); + + // Create a mock KubernetesClient + KubernetesClient mockClient = Mockito.mock(KubernetesClient.class); + + // Mock the borrowObject method to return our mock client + Mockito.when(mockClusterClientPool.borrowObject()).thenReturn(mockClient); + + // Set the mock clusterClientPools into the realPool + clusterClientPoolsField.set(realPool, mockClusterClientPools); + + // Mock the getInstance method to return our spy + Mockito.when(KubernetesClientPool.getInstance()).thenReturn(realPool); + + try { + // Start multiple threads that concurrently access the client pool + for (int i = 0; i < threadCount; i++) { + executorService.submit(() -> { + try { + for (int j = 0; j < operationsPerThread; j++) { + // Get a client + KubernetesClient client = realPool.getClient(clusterId, mockKubeConfig); + Assertions.assertNotNull(client, "Client should not be null"); + + // Simulate some work with the client + Awaitility.await().atMost(Duration.ofMillis(10)); + + // Return the client to the pool + realPool.returnClient(clusterId, client); + } + } catch (Exception e) { + exceptions.add(e); + } finally { + latch.countDown(); + } + }); + } + + // Wait for all threads to complete + boolean completed = latch.await(60, TimeUnit.SECONDS); + Assertions.assertTrue(completed, "All threads should complete within timeout"); + + // Verify no exceptions occurred during concurrent access + Assertions.assertTrue(exceptions.isEmpty(), "No exceptions should occur during concurrent access"); + + // Verify that borrowObject and returnObject were called the expected number of times + int expectedCalls = threadCount * operationsPerThread; + Mockito.verify(mockClusterClientPool, Mockito.times(expectedCalls)).borrowObject(); + Mockito.verify(mockClusterClientPool, Mockito.times(expectedCalls)).returnObject(Mockito.any()); + } finally { + executorService.shutdown(); + } + } + + /** + * Test handling of closed client + */ + @Test + public void testHandlingClosedClient() { + // Get the real instance for testing + KubernetesClientPool realPool = Mockito.spy(KubernetesClientPool.class); + + // Mock the clusterClientPools field to return a mock ClusterClientPool + try { + java.lang.reflect.Field clusterClientPoolsField = + KubernetesClientPool.class.getDeclaredField("clusterClientPools"); + clusterClientPoolsField.setAccessible(true); + ConcurrentHashMap<String, KubernetesClientPool.ClusterClientPool> mockClusterClientPools = + new ConcurrentHashMap<>(); + + // Create a mock ClusterClientPool + KubernetesClientPool.ClusterClientPool mockClusterClientPool = + Mockito.mock(KubernetesClientPool.ClusterClientPool.class); + mockClusterClientPools.put(clusterId, mockClusterClientPool); + + // Mock the borrowObject method to throw an exception, simulating a closed client scenario + Mockito.doThrow(new RuntimeException("Client is closed")).when(mockClusterClientPool).borrowObject(); + + // Set the mock clusterClientPools into the realPool + clusterClientPoolsField.set(realPool, mockClusterClientPools); + + // Test getClient with a closed client scenario + try { + // This should handle the exception gracefully + KubernetesClient client = realPool.getClient(clusterId, mockKubeConfig); + // If no exception is thrown, verify the client is handled properly + Assertions.assertNull(client, "Client should be null when borrowObject fails"); + } catch (Exception e) { + // If exception propagates, this is also acceptable behavior + System.out.println("Expected exception when getting client in closed scenario: " + e.getMessage()); + } + + // Verify that borrowObject was called + Mockito.verify(mockClusterClientPool).borrowObject(); + } catch (Exception e) { + Assertions.fail("Failed to setup test for closed client scenario: " + e.getMessage()); + } + } + + /** + * Test that PooledClient constructor correctly initializes the client and lastUsedTime. + */ + @Test + public void testPooledClientConstructor() throws Exception { + // Mock a KubernetesClient + KubernetesClient mockClient = Mockito.mock(KubernetesClient.class); + Class<?> pooledClientClass = getPooledClientClass(); + long currentTime = System.currentTimeMillis(); + + // Create a PooledClient instance using reflection + Constructor<?> constructor = pooledClientClass.getDeclaredConstructor(KubernetesClient.class); + constructor.setAccessible(true); + Object pooledClient = constructor.newInstance(mockClient); + + // Verify that the client was set correctly + Field clientField = pooledClientClass.getDeclaredField("client"); + Field lastUsedTimeField = pooledClientClass.getDeclaredField("lastUsedTime"); + lastUsedTimeField.setAccessible(true); + clientField.setAccessible(true); + + Object clientValue = clientField.get(pooledClient); + // Verify the client is accessible and correct + Assertions.assertNotNull(clientValue, "Client should not be null"); + Assertions.assertSame(mockClient, clientValue, "Client reference should match the original client"); + + // Verify that lastUsedTime was initialized correctly + long lastUsedTimeValue = lastUsedTimeField.getLong(pooledClient); + + // Allow for a small time difference (up to 1 second) between test execution and constructor call + Assertions.assertEquals(currentTime, lastUsedTimeValue, 1000, + "lastUsedTime should be initialized to current time"); + + // Test that we can modify the lastUsedTime field. + long newTime = System.currentTimeMillis() + 10000; // 10 seconds in the future + lastUsedTimeField.setLong(pooledClient, newTime); + // Verify the modification + long updatedTime = lastUsedTimeField.getLong(pooledClient); + Assertions.assertEquals(newTime, updatedTime, "lastUsedTime should be successfully updated"); + } + + /** + * Helper method to get the PooledClient class using reflection. + */ + private Class<?> getPooledClientClass() throws ClassNotFoundException { + Class<?> clusterClientPoolClass = + Class.forName("org.apache.dolphinscheduler.plugin.task.api.k8s.KubernetesClientPool$ClusterClientPool"); + Class<?>[] nestedClasses = clusterClientPoolClass.getDeclaredClasses(); + Class<?> targetClass = KubernetesClientPool.ClusterClientPool.PooledClient.class; + + for (Class<?> nestedClass : nestedClasses) { + if (nestedClass == targetClass) { + return nestedClass; + } + } + + throw new ClassNotFoundException("Could not find PooledClient class"); + } + + @Test + public void testClusterClientPoolBorrowObjectTimeout() throws Exception { + // Mock a KubernetesClient and create a PoolConfig with small timeout + KubernetesClientPool.PoolConfig poolConfig = new KubernetesClientPool.PoolConfig( + 2, // maxSize + 0, // minIdle + 2, // maxIdle + 500, // maxWaitMs (500ms timeout) + 600000); // idleTimeoutMs + + // Create a real ClusterClientPool instance using reflection + Class<?> clusterClientPoolClass = + Class.forName("org.apache.dolphinscheduler.plugin.task.api.k8s.KubernetesClientPool$ClusterClientPool"); + Constructor<?> constructor = clusterClientPoolClass.getDeclaredConstructor(String.class, String.class, + KubernetesClientPool.PoolConfig.class); + constructor.setAccessible(true); + + // Create mock clients + KubernetesClient mockClient1 = Mockito.mock(KubernetesClient.class); + KubernetesClient mockClient2 = Mockito.mock(KubernetesClient.class); + + // Use a class-level AtomicInteger to track builder calls + AtomicInteger builderCounter = new AtomicInteger(0); + + // Mock KubernetesClientBuilder to control client creation + try ( + MockedConstruction<KubernetesClientBuilder> mockedConstruction = + Mockito.mockConstruction(KubernetesClientBuilder.class, + (mock, context) -> { + // Configure the mock behavior + Mockito.when(mock.withConfig((Config) Mockito.any())).thenReturn(mock); + // Use the class-level counter to determine which mock client to return + Mockito.when(mock.build()).thenAnswer(invocation -> { + int count = builderCounter.getAndIncrement(); + if (count == 0) { + return mockClient1; + } else if (count == 1) { + return mockClient2; + } else { + // For any additional calls, just return mockClient2 + return mockClient2; + } + }); + })) { + + Object clusterClientPool = constructor.newInstance(clusterId, mockKubeConfig, poolConfig); + + // Get the borrowObject method + Method borrowObjectMethod = clusterClientPoolClass.getDeclaredMethod("borrowObject"); + borrowObjectMethod.setAccessible(true); + + // Get all available clients (maxSize = 2) + KubernetesClient client1 = (KubernetesClient) borrowObjectMethod.invoke(clusterClientPool); + KubernetesClient client2 = (KubernetesClient) borrowObjectMethod.invoke(clusterClientPool); Review Comment: Variable 'KubernetesClient client2' is never read. ```suggestion borrowObjectMethod.invoke(clusterClientPool); ``` ########## dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java: ########## @@ -186,34 +251,39 @@ private TaskExecutionStatus getApplicationStatus(KubernetesApplicationManagerCon /** * get pod's log watcher * - * @param kubernetesApplicationManagerContext - * @return + * @param kubernetesApplicationManagerContext Context + * @return Watcher */ @SneakyThrows public LogWatch getPodLogWatcher(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) { - KubernetesClient client = getClient(kubernetesApplicationManagerContext); + KubernetesClient client = null; boolean podIsReady = false; Pod pod = null; - while (!podIsReady) { - FilterWatchListDeletable<Pod, PodList, PodResource> watchList = - getListenPod(kubernetesApplicationManagerContext); - List<Pod> podList = watchList == null ? null : watchList.list().getItems(); - if (CollectionUtils.isEmpty(podList)) { - return null; - } - pod = podList.get(0); - String phase = pod.getStatus().getPhase(); - if (phase.equals(PENDING) || phase.equals(UNKNOWN)) { - Thread.sleep(SLEEP_TIME_MILLIS); - } else { - podIsReady = true; + try { + client = getClient(kubernetesApplicationManagerContext); + while (!podIsReady) { + FilterWatchListDeletable<Pod, PodList, PodResource> watchList = + getListenPod(kubernetesApplicationManagerContext); + List<Pod> podList = watchList == null ? null : watchList.list().getItems(); + if (CollectionUtils.isEmpty(podList)) { + return null; + } + pod = podList.get(0); + String phase = pod.getStatus().getPhase(); + if (phase.equals(PENDING) || phase.equals(UNKNOWN)) { + Thread.sleep(SLEEP_TIME_MILLIS); + } else { + podIsReady = true; + } } - } - return client.pods().inNamespace(pod.getMetadata().getNamespace()) - .withName(pod.getMetadata().getName()) - .inContainer(kubernetesApplicationManagerContext.getContainerName()) - .watchLog(); + return client.pods().inNamespace(pod.getMetadata().getNamespace()) + .withName(pod.getMetadata().getName()) + .inContainer(kubernetesApplicationManagerContext.getContainerName()) + .watchLog(); + } finally { + log.debug("Log watch client is not returned immediately, will be managed by caller after watch completes"); + } } Review Comment: Similar to the issue in K8sUtils.createBatchJobWatcher, the client obtained in getPodLogWatcher (line 263) is not returned to the pool in the finally block. The debug message suggests it's intentional, but this creates a connection leak as there's no guarantee the caller will properly return the client after the watch completes. ```suggestion LogWatch logWatch = client.pods().inNamespace(pod.getMetadata().getNamespace()) .withName(pod.getMetadata().getName()) .inContainer(kubernetesApplicationManagerContext.getContainerName()) .watchLog(); return new ClientReturningLogWatch(logWatch, client, kubernetesApplicationManagerContext); } catch (Exception e) { if (client != null) { KubernetesClientPool.returnClient(kubernetesApplicationManagerContext, client); } throw e; } } /** * Wrapper for LogWatch that returns the client to the pool when closed. */ private static class ClientReturningLogWatch implements LogWatch { private final LogWatch delegate; private final KubernetesClient client; private final KubernetesApplicationManagerContext context; private boolean closed = false; public ClientReturningLogWatch(LogWatch delegate, KubernetesClient client, KubernetesApplicationManagerContext context) { this.delegate = delegate; this.client = client; this.context = context; } @Override public void close() { if (!closed) { try { delegate.close(); } finally { KubernetesClientPool.returnClient(context, client); closed = true; } } } @Override public java.io.InputStream getOutput() { return delegate.getOutput(); } @Override public java.io.InputStream getError() { return delegate.getError(); } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
