Copilot commented on code in PR #17510:
URL: 
https://github.com/apache/dolphinscheduler/pull/17510#discussion_r2476463566


##########
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtilsTest.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.utils;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.k8s.KubernetesClientPool;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.api.model.StatusDetails;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.api.model.batch.v1.JobList;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.Watch;
+import io.fabric8.kubernetes.client.Watcher;
+import io.fabric8.kubernetes.client.dsl.BatchAPIGroupDSL;
+import io.fabric8.kubernetes.client.dsl.MixedOperation;
+import io.fabric8.kubernetes.client.dsl.PodResource;
+import io.fabric8.kubernetes.client.dsl.PrettyLoggable;
+import io.fabric8.kubernetes.client.dsl.ScalableResource;
+import io.fabric8.kubernetes.client.dsl.V1BatchAPIGroupDSL;
+
+public class K8sUtilsTest {
+
+    private K8sUtils k8sUtils;
+    private MockedStatic<KubernetesClientPool> mockedKubernetesClientPool;
+    private KubernetesClient mockClient;
+    private final String mockClusterId = "mock-cluster-id";
+    private final String mockKubeConfig =
+            "apiVersion: v1\nclusters:\n- cluster:\n    server: 
https://kubernetes.default.svc\n  name: mock-cluster\ncontexts:\n- context:\n   
 cluster: mock-cluster\n    namespace: default\n    user: mock-user\n  name: 
mock-context\ncurrent-context: mock-context\nkind: Config\npreferences: 
{}\nusers:\n- name: mock-user\n  user: {}\n";
+    private final String mockNamespace = "default";
+    private final String mockJobName = "test-job-123";
+
+    private BatchAPIGroupDSL mockBatch;
+    private V1BatchAPIGroupDSL mockV1;
+    private MixedOperation<Job, JobList, ScalableResource<Job>> mockJobs;
+    private MixedOperation<Job, JobList, ScalableResource<Job>> 
mockInNamespace;
+    private ScalableResource<Job> mockWithName;
+    private ScalableResource<Job> mockResource;
+    private Job mockJob;
+    private Watch mockWatch;
+    private Watcher<Job> mockWatcher;
+
+    private final String expectedLog = "Pod log content";
+
+    @SuppressWarnings("unchecked")
+    @BeforeEach
+    public void setUp() {
+        k8sUtils = new K8sUtils();
+        mockedKubernetesClientPool = 
Mockito.mockStatic(KubernetesClientPool.class); // 拦截所有使用静态方法的请求

Review Comment:
   Chinese comment should be removed or translated to English. The comment 
'拦截所有使用静态方法的请求' (intercept all requests using static methods) is inconsistent 
with the rest of the codebase which uses English comments.
   ```suggestion
           mockedKubernetesClientPool = 
Mockito.mockStatic(KubernetesClientPool.class); // Intercept all requests using 
static methods
   ```



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java:
##########
@@ -292,16 +301,13 @@ private void parsePodLogOutput() {
     }
 
     @Override
-    public TaskResponse run(String k8sParameterStr) throws Exception {
+    public TaskResponse run(String k8sParameterStr) {

Review Comment:
   The method signature changed from `throws Exception` to no exception 
declaration, but the method body still contains try-catch blocks that catch 
Exception. This is a breaking API change that could affect callers expecting to 
handle exceptions. The removed exception should either be kept in the signature 
or properly handled/logged in all code paths.
   ```suggestion
       public TaskResponse run(String k8sParameterStr) throws Exception {
   ```



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtils.java:
##########
@@ -59,33 +67,58 @@ public void deleteJob(String jobName, String namespace) {
                     .delete();
         } catch (Exception e) {
             throw new TaskException("fail to delete job", e);
+        } finally {
+            if (client != null) {
+                KubernetesClientPool.getInstance().returnClient(clusterId, 
client);
+            }
         }
     }
 
-    public Boolean jobExist(String jobName, String namespace) {
+    public Boolean jobExist(String configYaml, String jobName, String 
namespace) {
+        String clusterId = 
KubernetesClientPool.getInstance().getClusterId(configYaml);
+        KubernetesClient client = null;
         try {
+            client = KubernetesClientPool.getInstance().getClient(clusterId, 
configYaml);
             Job job = 
client.batch().v1().jobs().inNamespace(namespace).withName(jobName).get();
             return job != null;
         } catch (Exception e) {
-            throw new TaskException("fail to check job: ", e);
+            throw new TaskException("fail to check job", e);
+        } finally {
+            if (client != null) {
+                KubernetesClientPool.getInstance().returnClient(clusterId, 
client);
+            }
         }
     }
 
-    public Watch createBatchJobWatcher(String jobName, Watcher<Job> watcher) {
+    public Watch createBatchJobWatcher(String configYaml, String jobName, 
Watcher<Job> watcher) {
+        String clusterId = 
KubernetesClientPool.getInstance().getClusterId(configYaml);
+        KubernetesClient client = null;
         try {
+            client = KubernetesClientPool.getInstance().getClient(clusterId, 
configYaml);
             return client.batch()
                     .v1()
                     .jobs()
                     .withName(jobName)
                     .watch(watcher);
         } catch (Exception e) {
             throw new TaskException("fail to register batch job watcher", e);
+        } finally {
+            if (client != null) {
+                log.debug(
+                        "createBatchJobWatcher does not return client 
immediately, caller should manage client lifecycle");

Review Comment:
   The comment indicates the client is not returned immediately and the caller 
should manage the lifecycle, but the client is obtained in the try block (line 
97) and logged in the finally block (line 106-109) without actually being 
returned to the pool. This will cause a client connection leak as the client 
remains borrowed but is never returned.



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java:
##########
@@ -96,56 +107,90 @@ public ResourceManagerType getResourceManagerType() {
     /**
      * get driver pod
      *
-     * @param kubernetesApplicationManagerContext
-     * @return
+     * @param kubernetesApplicationManagerContext Context
+     * @return pods
      */
     @SneakyThrows
-    private FilterWatchListDeletable<Pod, PodList, PodResource> 
getListenPod(KubernetesApplicationManagerContext 
kubernetesApplicationManagerContext) {
-        KubernetesClient client = 
getClient(kubernetesApplicationManagerContext);
+    public FilterWatchListDeletable<Pod, PodList, PodResource> 
getListenPod(KubernetesApplicationManagerContext 
kubernetesApplicationManagerContext) {
+        String clusterId = 
getClusterId(kubernetesApplicationManagerContext.getK8sTaskExecutionContext());
+        KubernetesClient client = null;
         String labelValue = 
kubernetesApplicationManagerContext.getLabelValue();
         List<Pod> podList = null;
         FilterWatchListDeletable<Pod, PodList, PodResource> watchList = null;
         int retryTimes = 0;
-        while (CollectionUtils.isEmpty(podList) && retryTimes < 
MAX_RETRY_TIMES) {
-            watchList = client.pods()
-                    
.inNamespace(kubernetesApplicationManagerContext.getK8sTaskExecutionContext().getNamespace())
-                    .withLabel(UNIQUE_LABEL_NAME, labelValue);
-            podList = watchList.list().getItems();
-            if (!CollectionUtils.isEmpty(podList)) {
-                break;
+        try {
+            client = getClient(kubernetesApplicationManagerContext);
+            while (CollectionUtils.isEmpty(podList) && retryTimes < 
MAX_RETRY_TIMES) {
+                watchList = client.pods()
+                        
.inNamespace(kubernetesApplicationManagerContext.getK8sTaskExecutionContext().getNamespace())
+                        .withLabel(UNIQUE_LABEL_NAME, labelValue);
+                podList = watchList.list().getItems();
+                if (!CollectionUtils.isEmpty(podList)) {
+                    break;
+                }
+                Thread.sleep(SLEEP_TIME_MILLIS);
+                retryTimes += 1;
+            }
+            return watchList;
+        } finally {
+            if (client != null) {
+                returnClient(clusterId, client);
             }
-            Thread.sleep(SLEEP_TIME_MILLIS);
-            retryTimes += 1;
         }
-
-        return watchList;
     }
 
     /**
-     * create client or get from cache map
+     * Retrieve Kubernetes clients from the connection pool
      *
-     * @param kubernetesApplicationManagerContext
-     * @return
+     * @param kubernetesApplicationManagerContext Context parameters
+     * @return Kubernetes Client
      */
-    private KubernetesClient getClient(KubernetesApplicationManagerContext 
kubernetesApplicationManagerContext) {
+    public KubernetesClient getClient(KubernetesApplicationManagerContext 
kubernetesApplicationManagerContext) {
         K8sTaskExecutionContext k8sTaskExecutionContext =
                 
kubernetesApplicationManagerContext.getK8sTaskExecutionContext();
-        return 
cacheClientMap.computeIfAbsent(kubernetesApplicationManagerContext.getLabelValue(),
-                key -> new KubernetesClientBuilder()
-                        
.withConfig(Config.fromKubeconfig(k8sTaskExecutionContext.getConfigYaml())).build());
+
+        // Using k8s configuration as cluster identifier
+        String clusterId = getClusterId(k8sTaskExecutionContext);
+        String kubeConfig = k8sTaskExecutionContext.getConfigYaml();
+
+        try {
+            return clientPool.getClient(clusterId, kubeConfig);
+        } catch (Exception e) {
+            log.error("Failed to get Kubernetes client from pool", e);
+            throw new RuntimeException("Failed to get Kubernetes client", e);
+        }
+    }
+
+    /**
+     * Get Cluster Identifier
+     */
+    public String getClusterId(K8sTaskExecutionContext 
k8sTaskExecutionContext) {
+        String kubeConfig = k8sTaskExecutionContext.getConfigYaml();
+        int hashCode = kubeConfig.hashCode();
+        int nonNegativeHash = hashCode & 0x7FFFFFFF;
+        return K8S_CLUSTER_PREFIX + nonNegativeHash;

Review Comment:
   The cluster ID generation logic is duplicated between 
`KubernetesApplicationManager.getClusterId()` and 
`KubernetesClientPool.getClusterId()`. These methods use different algorithms 
(SHA-256 in the pool vs hashCode in the manager), which will produce different 
cluster IDs for the same kubeConfig. This inconsistency will prevent proper 
client pooling and cause connection leaks.



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/KubernetesClientPoolTest.java:
##########
@@ -0,0 +1,1262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.k8s;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedConstruction;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import io.fabric8.kubernetes.api.model.Namespace;
+import io.fabric8.kubernetes.api.model.NamespaceList;
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientBuilder;
+import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
+import io.fabric8.kubernetes.client.dsl.Resource;
+
+public class KubernetesClientPoolTest {
+
+    private KubernetesClientPool mockPool;
+    private final String mockKubeConfig =
+            "apiVersion: v1\nclusters:\n- cluster:\n    server: 
https://kubernetes.default.svc\n  name: mock-cluster\ncontexts:\n- context:\n   
 cluster: mock-cluster\n    namespace: default\n    user: mock-user\n  name: 
mock-context\ncurrent-context: mock-context\nkind: Config\npreferences: 
{}\nusers:\n- name: mock-user\n  user: {}";
+    private final String clusterId = "mock-cluster-id";
+    private MockedStatic<KubernetesClientPool> mockedKubernetesClientPool;
+
+    @BeforeEach
+    public void before() throws Exception {
+        mockedKubernetesClientPool = 
Mockito.mockStatic(KubernetesClientPool.class);
+
+        mockPool = Mockito.mock(KubernetesClientPool.class);
+        KubernetesClient mockClient = Mockito.mock(KubernetesClient.class);
+
+        Mockito.when(KubernetesClientPool.getInstance()).thenReturn(mockPool);
+        Mockito.when(mockPool.getClient(Mockito.anyString(), 
Mockito.anyString())).thenReturn(mockClient);
+        
Mockito.when(mockPool.getClusterId(mockKubeConfig)).thenReturn(clusterId);
+    }
+
+    @AfterEach
+    public void after() {
+        if (mockedKubernetesClientPool != null) {
+            mockedKubernetesClientPool.close();
+        }
+    }
+
+    /**
+     * test: getClusterId,getClient,closePool,returnClient
+     */
+    @Test
+    public void testKubernetesClientPoolBasicFunction() {
+        KubernetesClientPool mockPool = KubernetesClientPool.getInstance();
+
+        String clusterId = "mock-cluster-id";
+        
Mockito.when(mockPool.getClusterId(Mockito.anyString())).thenReturn(clusterId);
+
+        KubernetesClient mockClient = Mockito.mock(KubernetesClient.class);
+        Mockito.when(mockPool.getClient(clusterId, 
mockKubeConfig)).thenReturn(mockClient);
+
+        String actualClusterId = mockPool.getClusterId(mockKubeConfig);
+        Assertions.assertEquals(clusterId, actualClusterId);
+
+        KubernetesClient client1 = mockPool.getClient(clusterId, 
mockKubeConfig);
+        Assertions.assertNotNull(client1);
+        Assertions.assertEquals(mockClient, client1);
+
+        mockPool.returnClient(clusterId, client1);
+        Mockito.verify(mockPool).returnClient(clusterId, client1);
+
+        KubernetesClient client2 = mockPool.getClient(clusterId, 
mockKubeConfig);
+        Assertions.assertNotNull(client2);
+
+        mockPool.closePool(clusterId);
+        Mockito.verify(mockPool).closePool(clusterId);
+    }
+
+    /**
+     * test:PoolConfig,
+     */
+    @Test
+    public void testKubernetesClientPoolConfig() {
+        try {
+            KubernetesClientPool.PoolConfig expectedConfig = new 
KubernetesClientPool.PoolConfig(
+                    10, // maxSize
+                    2, // minIdle
+                    5, // maxIdle
+                    10000, // maxWaitMs
+                    600000 // idleTimeoutMs
+            );
+
+            KubernetesClientPool mockPool = 
Mockito.mock(KubernetesClientPool.class);
+
+            java.lang.reflect.Field configField = 
KubernetesClientPool.class.getDeclaredField("poolConfig");
+            configField.setAccessible(true);
+            configField.set(mockPool, expectedConfig);
+
+            Assertions.assertEquals(10, expectedConfig.getMaxSize());
+            Assertions.assertEquals(2, expectedConfig.getMinIdle());
+            Assertions.assertEquals(5, expectedConfig.getMaxIdle());
+            Assertions.assertEquals(10000, expectedConfig.getMaxWaitMs());
+        } catch (Exception e) {
+            Assertions.fail("Failed to test connection pool config: " + 
e.getMessage());
+        }
+    }
+
+    /**
+     * Test handling of invalid kubeConfig
+     */
+    @Test
+    public void testInvalidKubeConfig() {
+        String invalidKubeConfig = "invalid config";
+        // Mock KubernetesClientPool.getInstance() to return our mockPool
+        KubernetesClientPool realPool = 
Mockito.spy(KubernetesClientPool.class); // spy on actual method
+        String clusterId = realPool.getClusterId(invalidKubeConfig);
+        Assertions.assertNotNull(clusterId, "Cluster ID should not be null 
even for invalid kubeConfig");
+
+        // Mock getClient to throw exception for invalid kubeConfig
+        Mockito.when(mockPool.getClient(clusterId, invalidKubeConfig))
+                .thenThrow(new RuntimeException("Invalid kubeconfig"));
+
+        // Test getClient with invalid kubeConfig (should throw exception)
+        Assertions.assertThrows(Exception.class, () -> 
mockPool.getClient(clusterId, invalidKubeConfig),
+                "getClient should throw exception for invalid kubeConfig");
+    }
+    @Test
+    public void testClusterIdGeneration() {
+
+        KubernetesClientPool mockPool = KubernetesClientPool.getInstance();
+
+        String mockClusterId = "mock-cluster-id-1";
+        
Mockito.when(mockPool.getClusterId(mockKubeConfig)).thenReturn(mockClusterId);
+
+        String differentKubeConfig = mockKubeConfig + "#different";
+        String mockDifferentClusterId = "mock-cluster-id-2";
+        
Mockito.when(mockPool.getClusterId(differentKubeConfig)).thenReturn(mockDifferentClusterId);
+
+        String clusterId1 = mockPool.getClusterId(mockKubeConfig);
+        String clusterId2 = mockPool.getClusterId(mockKubeConfig);
+        Assertions.assertEquals(clusterId1, clusterId2);
+        Assertions.assertEquals(mockClusterId, clusterId1);
+
+        String clusterId3 = mockPool.getClusterId(differentKubeConfig);
+        Assertions.assertNotEquals(clusterId1, clusterId3);
+        Assertions.assertEquals(mockDifferentClusterId, clusterId3);
+    }
+
+    /**
+     * Test singleton pattern of KubernetesClientPool
+     */
+    @Test
+    public void testSingletonPattern() {
+        KubernetesClientPool instance1 = KubernetesClientPool.getInstance();
+        KubernetesClientPool instance2 = KubernetesClientPool.getInstance();
+        Assertions.assertSame(instance1, instance2, "KubernetesClientPool 
should be a singleton");
+    }
+
+    /**
+     * Test getClusterId method with valid kubeConfig
+     */
+    @Test
+    public void testGetClusterId() {
+        String clusterId = mockPool.getClusterId(mockKubeConfig);
+        Assertions.assertNotNull(clusterId, "Cluster ID should not be null");
+        // Verify that the same kubeConfig returns the same clusterId
+        String sameClusterId = mockPool.getClusterId(mockKubeConfig);
+        Assertions.assertEquals(clusterId, sameClusterId, "Same kubeConfig 
should return same cluster ID");
+    }
+
+    /**
+     * Test getClusterId method with different kubeConfigs
+     */
+    @Test
+    public void testGetClusterIdDifferentConfigs() {
+        String clusterId1 = mockPool.getClusterId(mockKubeConfig);
+        String differentKubeConfig = mockKubeConfig + "#different";
+        String clusterId2 = mockPool.getClusterId(differentKubeConfig);
+        Assertions.assertNotEquals(clusterId1, clusterId2, "Different 
kubeConfigs should return different cluster IDs");
+    }
+
+    /**
+     * Test getClient method with valid parameters
+     */
+    @Test
+    public void testGetClient() throws Exception {
+        // Mock a KubernetesClient and ClusterClientPool
+        KubernetesClient mockClient = Mockito.mock(KubernetesClient.class);
+        // Get the real instance for testing
+        KubernetesClientPool realPool = 
Mockito.spy(KubernetesClientPool.class);
+
+        // Mock the clusterClientPools field to return a mock ClusterClientPool
+        java.lang.reflect.Field clusterClientPoolsField =
+                
KubernetesClientPool.class.getDeclaredField("clusterClientPools");
+        clusterClientPoolsField.setAccessible(true);
+        ConcurrentHashMap<String, KubernetesClientPool.ClusterClientPool> 
mockClusterClientPools =
+                new ConcurrentHashMap<>();
+
+        // Create a mock ClusterClientPool
+        KubernetesClientPool.ClusterClientPool mockClusterClientPool =
+                Mockito.mock(KubernetesClientPool.ClusterClientPool.class);
+        mockClusterClientPools.put(clusterId, mockClusterClientPool);
+
+        // Mock the borrowObject method
+        
Mockito.doReturn(mockClient).when(mockClusterClientPool).borrowObject();
+
+        // Set the mock clusterClientPools into the realPool
+        clusterClientPoolsField.set(realPool, mockClusterClientPools);
+
+        // Mock the getInstance method to return our spy
+        Mockito.when(KubernetesClientPool.getInstance()).thenReturn(realPool);
+
+        // Test getting a client
+        KubernetesClient client = realPool.getClient(clusterId, 
mockKubeConfig);
+
+        // Verify the client is not null
+        Assertions.assertNotNull(client, "Client should not be null");
+        Assertions.assertEquals(mockClient, client, "Returned client should 
match the mock client");
+    }
+
+    /**
+     * Test returnClient method
+     */
+    @Test
+    public void testReturnClient() throws Exception {
+        // Mock a KubernetesClient and ClusterClientPool
+        KubernetesClient mockClient = Mockito.mock(KubernetesClient.class);
+
+        // Get the real instance for testing
+        KubernetesClientPool realPool = 
Mockito.spy(KubernetesClientPool.class);
+
+        // Mock the clusterClientPools field to return a mock ClusterClientPool
+        java.lang.reflect.Field clusterClientPoolsField =
+                
KubernetesClientPool.class.getDeclaredField("clusterClientPools");
+        clusterClientPoolsField.setAccessible(true);
+        ConcurrentHashMap<String, KubernetesClientPool.ClusterClientPool> 
mockClusterClientPools =
+                new ConcurrentHashMap<>();
+
+        // Create a mock ClusterClientPool
+        KubernetesClientPool.ClusterClientPool mockClusterClientPool =
+                Mockito.mock(KubernetesClientPool.ClusterClientPool.class);
+        mockClusterClientPools.put(clusterId, mockClusterClientPool);
+
+        // Mock the borrowObject method to return our mock client
+        
Mockito.doReturn(mockClient).when(mockClusterClientPool).borrowObject();
+
+        // Set the mock clusterClientPools into the realPool
+        clusterClientPoolsField.set(realPool, mockClusterClientPools);
+
+        // Mock the getInstance method to return our spy
+        Mockito.when(KubernetesClientPool.getInstance()).thenReturn(realPool);
+
+        // Get a client
+        KubernetesClient client = realPool.getClient(clusterId, 
mockKubeConfig);
+        Assertions.assertNotNull(client, "Client should not be null");
+
+        // Return the client to the pool
+        realPool.returnClient(clusterId, client);
+
+        // Verify that returnObject was called on the mockClusterClientPool
+        Mockito.verify(mockClusterClientPool).returnObject(client);
+
+        // Verify that getClient calls borrowObject again
+        realPool.getClient(clusterId, mockKubeConfig);
+        Mockito.verify(mockClusterClientPool, Mockito.times(2)).borrowObject();
+    }
+
+    /**
+     * Test closePool method
+     */
+    @Test
+    public void testClosePool() throws Exception {
+        // Get the real instance for testing
+        KubernetesClientPool realPool = 
Mockito.spy(KubernetesClientPool.class);
+
+        // Mock the clusterClientPools field to return a mock ClusterClientPool
+        java.lang.reflect.Field clusterClientPoolsField =
+                
KubernetesClientPool.class.getDeclaredField("clusterClientPools");
+        clusterClientPoolsField.setAccessible(true);
+        ConcurrentHashMap<String, KubernetesClientPool.ClusterClientPool> 
mockClusterClientPools =
+                new ConcurrentHashMap<>();
+
+        // Create a mock ClusterClientPool
+        KubernetesClientPool.ClusterClientPool mockClusterClientPool =
+                Mockito.mock(KubernetesClientPool.ClusterClientPool.class);
+        mockClusterClientPools.put(clusterId, mockClusterClientPool);
+
+        // Set the mock clusterClientPools into the realPool
+        clusterClientPoolsField.set(realPool, mockClusterClientPools);
+
+        // Mock the getInstance method to return our spy
+        Mockito.when(KubernetesClientPool.getInstance()).thenReturn(realPool);
+
+        // Close the pool
+        realPool.closePool(clusterId);
+
+        // Verify that the ClusterClientPool's close method was called
+        Mockito.verify(mockClusterClientPool).close();
+
+        // Verify that the clusterClientPools no longer contains the clusterId
+        Assertions.assertFalse(mockClusterClientPools.containsKey(clusterId),
+                "Cluster client pool should be removed after close");
+    }
+
+    /**
+     * Test close method (closes all pools)
+     */
+    @Test
+    public void testClose() throws Exception {
+        // Get the real instance for testing
+        KubernetesClientPool realPool = 
Mockito.spy(KubernetesClientPool.class);
+
+        // Mock the clusterClientPools field
+        java.lang.reflect.Field clusterClientPoolsField =
+                
KubernetesClientPool.class.getDeclaredField("clusterClientPools");
+        clusterClientPoolsField.setAccessible(true);
+        ConcurrentHashMap<String, KubernetesClientPool.ClusterClientPool> 
mockClusterClientPools =
+                new ConcurrentHashMap<>();
+
+        // Create multiple mock ClusterClientPools for different clusters
+        String clusterId1 = clusterId;
+        String clusterId2 = clusterId + "_2";
+
+        KubernetesClientPool.ClusterClientPool mockClusterClientPool1 =
+                Mockito.mock(KubernetesClientPool.ClusterClientPool.class);
+        KubernetesClientPool.ClusterClientPool mockClusterClientPool2 =
+                Mockito.mock(KubernetesClientPool.ClusterClientPool.class);
+
+        mockClusterClientPools.put(clusterId1, mockClusterClientPool1);
+        mockClusterClientPools.put(clusterId2, mockClusterClientPool2);
+
+        // Set the mock clusterClientPools into the realPool
+        clusterClientPoolsField.set(realPool, mockClusterClientPools);
+
+        // Mock the getInstance method to return our spy
+        Mockito.when(KubernetesClientPool.getInstance()).thenReturn(realPool);
+
+        // Close all pools
+        realPool.closePool(clusterId1);
+        realPool.closePool(clusterId2);
+
+        // Verify that both ClusterClientPools' close methods were called
+        Mockito.verify(mockClusterClientPool1).close();
+        Mockito.verify(mockClusterClientPool2).close();
+
+        // Verify that the clusterClientPools is now empty
+        Assertions.assertTrue(mockClusterClientPools.isEmpty(),
+                "All cluster client pools should be removed after close");
+    }
+
+    @Test
+    public void testConcurrentAccess() throws Exception {
+        final int threadCount = 10;
+        final int operationsPerThread = 5;
+        final CountDownLatch latch = new CountDownLatch(threadCount);
+        final ExecutorService executorService = 
Executors.newFixedThreadPool(threadCount);
+        final List<Exception> exceptions = new ArrayList<>();
+
+        // Get the real instance for testing
+        KubernetesClientPool realPool = 
Mockito.spy(KubernetesClientPool.class);
+
+        // Mock the clusterClientPools field
+        java.lang.reflect.Field clusterClientPoolsField =
+                
KubernetesClientPool.class.getDeclaredField("clusterClientPools");
+        clusterClientPoolsField.setAccessible(true);
+        ConcurrentHashMap<String, KubernetesClientPool.ClusterClientPool> 
mockClusterClientPools =
+                new ConcurrentHashMap<>();
+
+        // Create a mock ClusterClientPool
+        KubernetesClientPool.ClusterClientPool mockClusterClientPool =
+                Mockito.mock(KubernetesClientPool.ClusterClientPool.class);
+        mockClusterClientPools.put(clusterId, mockClusterClientPool);
+
+        // Create a mock KubernetesClient
+        KubernetesClient mockClient = Mockito.mock(KubernetesClient.class);
+
+        // Mock the borrowObject method to return our mock client
+        
Mockito.when(mockClusterClientPool.borrowObject()).thenReturn(mockClient);
+
+        // Set the mock clusterClientPools into the realPool
+        clusterClientPoolsField.set(realPool, mockClusterClientPools);
+
+        // Mock the getInstance method to return our spy
+        Mockito.when(KubernetesClientPool.getInstance()).thenReturn(realPool);
+
+        try {
+            // Start multiple threads that concurrently access the client pool
+            for (int i = 0; i < threadCount; i++) {
+                executorService.submit(() -> {
+                    try {
+                        for (int j = 0; j < operationsPerThread; j++) {
+                            // Get a client
+                            KubernetesClient client = 
realPool.getClient(clusterId, mockKubeConfig);
+                            Assertions.assertNotNull(client, "Client should 
not be null");
+
+                            // Simulate some work with the client
+                            Awaitility.await().atMost(Duration.ofMillis(10));
+
+                            // Return the client to the pool
+                            realPool.returnClient(clusterId, client);
+                        }
+                    } catch (Exception e) {
+                        exceptions.add(e);
+                    } finally {
+                        latch.countDown();
+                    }
+                });
+            }
+
+            // Wait for all threads to complete
+            boolean completed = latch.await(60, TimeUnit.SECONDS);
+            Assertions.assertTrue(completed, "All threads should complete 
within timeout");
+
+            // Verify no exceptions occurred during concurrent access
+            Assertions.assertTrue(exceptions.isEmpty(), "No exceptions should 
occur during concurrent access");
+
+            // Verify that borrowObject and returnObject were called the 
expected number of times
+            int expectedCalls = threadCount * operationsPerThread;
+            Mockito.verify(mockClusterClientPool, 
Mockito.times(expectedCalls)).borrowObject();
+            Mockito.verify(mockClusterClientPool, 
Mockito.times(expectedCalls)).returnObject(Mockito.any());
+        } finally {
+            executorService.shutdown();
+        }
+    }
+
+    /**
+     * Test handling of closed client
+     */
+    @Test
+    public void testHandlingClosedClient() {
+        // Get the real instance for testing
+        KubernetesClientPool realPool = 
Mockito.spy(KubernetesClientPool.class);
+
+        // Mock the clusterClientPools field to return a mock ClusterClientPool
+        try {
+            java.lang.reflect.Field clusterClientPoolsField =
+                    
KubernetesClientPool.class.getDeclaredField("clusterClientPools");
+            clusterClientPoolsField.setAccessible(true);
+            ConcurrentHashMap<String, KubernetesClientPool.ClusterClientPool> 
mockClusterClientPools =
+                    new ConcurrentHashMap<>();
+
+            // Create a mock ClusterClientPool
+            KubernetesClientPool.ClusterClientPool mockClusterClientPool =
+                    Mockito.mock(KubernetesClientPool.ClusterClientPool.class);
+            mockClusterClientPools.put(clusterId, mockClusterClientPool);
+
+            // Mock the borrowObject method to throw an exception, simulating 
a closed client scenario
+            Mockito.doThrow(new RuntimeException("Client is 
closed")).when(mockClusterClientPool).borrowObject();
+
+            // Set the mock clusterClientPools into the realPool
+            clusterClientPoolsField.set(realPool, mockClusterClientPools);
+
+            // Test getClient with a closed client scenario
+            try {
+                // This should handle the exception gracefully
+                KubernetesClient client = realPool.getClient(clusterId, 
mockKubeConfig);
+                // If no exception is thrown, verify the client is handled 
properly
+                Assertions.assertNull(client, "Client should be null when 
borrowObject fails");
+            } catch (Exception e) {
+                // If exception propagates, this is also acceptable behavior
+                System.out.println("Expected exception when getting client in 
closed scenario: " + e.getMessage());
+            }
+
+            // Verify that borrowObject was called
+            Mockito.verify(mockClusterClientPool).borrowObject();
+        } catch (Exception e) {
+            Assertions.fail("Failed to setup test for closed client scenario: 
" + e.getMessage());
+        }
+    }
+
+    /**
+     * Test that PooledClient constructor correctly initializes the client and 
lastUsedTime.
+     */
+    @Test
+    public void testPooledClientConstructor() throws Exception {
+        // Mock a KubernetesClient
+        KubernetesClient mockClient = Mockito.mock(KubernetesClient.class);
+        Class<?> pooledClientClass = getPooledClientClass();
+        long currentTime = System.currentTimeMillis();
+
+        // Create a PooledClient instance using reflection
+        Constructor<?> constructor = 
pooledClientClass.getDeclaredConstructor(KubernetesClient.class);
+        constructor.setAccessible(true);
+        Object pooledClient = constructor.newInstance(mockClient);
+
+        // Verify that the client was set correctly
+        Field clientField = pooledClientClass.getDeclaredField("client");
+        Field lastUsedTimeField = 
pooledClientClass.getDeclaredField("lastUsedTime");
+        lastUsedTimeField.setAccessible(true);
+        clientField.setAccessible(true);
+
+        Object clientValue = clientField.get(pooledClient);
+        // Verify the client is accessible and correct
+        Assertions.assertNotNull(clientValue, "Client should not be null");
+        Assertions.assertSame(mockClient, clientValue, "Client reference 
should match the original client");
+
+        // Verify that lastUsedTime was initialized correctly
+        long lastUsedTimeValue = lastUsedTimeField.getLong(pooledClient);
+
+        // Allow for a small time difference (up to 1 second) between test 
execution and constructor call
+        Assertions.assertEquals(currentTime, lastUsedTimeValue, 1000,
+                "lastUsedTime should be initialized to current time");
+
+        // Test that we can modify the lastUsedTime field.
+        long newTime = System.currentTimeMillis() + 10000; // 10 seconds in 
the future
+        lastUsedTimeField.setLong(pooledClient, newTime);
+        // Verify the modification
+        long updatedTime = lastUsedTimeField.getLong(pooledClient);
+        Assertions.assertEquals(newTime, updatedTime, "lastUsedTime should be 
successfully updated");
+    }
+
+    /**
+     * Helper method to get the PooledClient class using reflection.
+     */
+    private Class<?> getPooledClientClass() throws ClassNotFoundException {
+        Class<?> clusterClientPoolClass =
+                
Class.forName("org.apache.dolphinscheduler.plugin.task.api.k8s.KubernetesClientPool$ClusterClientPool");
+        Class<?>[] nestedClasses = clusterClientPoolClass.getDeclaredClasses();
+        Class<?> targetClass = 
KubernetesClientPool.ClusterClientPool.PooledClient.class;
+
+        for (Class<?> nestedClass : nestedClasses) {
+            if (nestedClass == targetClass) {
+                return nestedClass;
+            }
+        }
+
+        throw new ClassNotFoundException("Could not find PooledClient class");
+    }
+
+    @Test
+    public void testClusterClientPoolBorrowObjectTimeout() throws Exception {
+        // Mock a KubernetesClient and create a PoolConfig with small timeout
+        KubernetesClientPool.PoolConfig poolConfig = new 
KubernetesClientPool.PoolConfig(
+                2, // maxSize
+                0, // minIdle
+                2, // maxIdle
+                500, // maxWaitMs (500ms timeout)
+                600000); // idleTimeoutMs
+
+        // Create a real ClusterClientPool instance using reflection
+        Class<?> clusterClientPoolClass =
+                
Class.forName("org.apache.dolphinscheduler.plugin.task.api.k8s.KubernetesClientPool$ClusterClientPool");
+        Constructor<?> constructor = 
clusterClientPoolClass.getDeclaredConstructor(String.class, String.class,
+                KubernetesClientPool.PoolConfig.class);
+        constructor.setAccessible(true);
+
+        // Create mock clients
+        KubernetesClient mockClient1 = Mockito.mock(KubernetesClient.class);
+        KubernetesClient mockClient2 = Mockito.mock(KubernetesClient.class);
+
+        // Use a class-level AtomicInteger to track builder calls
+        AtomicInteger builderCounter = new AtomicInteger(0);
+
+        // Mock KubernetesClientBuilder to control client creation
+        try (
+                MockedConstruction<KubernetesClientBuilder> mockedConstruction 
=
+                        Mockito.mockConstruction(KubernetesClientBuilder.class,
+                                (mock, context) -> {
+                                    // Configure the mock behavior
+                                    Mockito.when(mock.withConfig((Config) 
Mockito.any())).thenReturn(mock);
+                                    // Use the class-level counter to 
determine which mock client to return
+                                    
Mockito.when(mock.build()).thenAnswer(invocation -> {
+                                        int count = 
builderCounter.getAndIncrement();
+                                        if (count == 0) {
+                                            return mockClient1;
+                                        } else if (count == 1) {
+                                            return mockClient2;
+                                        } else {
+                                            // For any additional calls, just 
return mockClient2
+                                            return mockClient2;
+                                        }
+                                    });
+                                })) {
+
+            Object clusterClientPool = constructor.newInstance(clusterId, 
mockKubeConfig, poolConfig);
+
+            // Get the borrowObject method
+            Method borrowObjectMethod = 
clusterClientPoolClass.getDeclaredMethod("borrowObject");
+            borrowObjectMethod.setAccessible(true);
+
+            // Get all available clients (maxSize = 2)
+            KubernetesClient client1 = (KubernetesClient) 
borrowObjectMethod.invoke(clusterClientPool);
+            KubernetesClient client2 = (KubernetesClient) 
borrowObjectMethod.invoke(clusterClientPool);

Review Comment:
   Variable 'KubernetesClient client1' is never read.
   ```suggestion
               borrowObjectMethod.invoke(clusterClientPool);
               borrowObjectMethod.invoke(clusterClientPool);
   ```



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/KubernetesClientPool.java:
##########
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.k8s;
+
+import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.util.Base64;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import lombok.Getter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientBuilder;
+
+/**
+ * KubernetesClientPool is used to manage the Kubernetes client connection pool
+ * Maintain an independent connection pool for each K8s cluster to implement 
connection creation, acquisition, return, and closure
+ */
+@Slf4j
+public class KubernetesClientPool {
+
+    private static final String BASE64_PADDING_CHARACTER = "=";
+    private static final String EMPTY_STRING = "";
+    private final static int CLEANUP_THREAD_REGULAR = 30000;
+
+    /**
+     * Connection pool instance
+     */
+    private static final KubernetesClientPool INSTANCE = new 
KubernetesClientPool();
+
+    /**
+     * Cluster connection pool mapping, with the key being the cluster 
identifier
+     */
+    private final ConcurrentMap<String, ClusterClientPool> clusterClientPools 
= new ConcurrentHashMap<>();
+
+    /**
+     * Connection pool configuration
+     */
+    private final PoolConfig poolConfig;
+
+    private KubernetesClientPool() {
+        this.poolConfig = new PoolConfig(
+                PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MAX_SIZE, 
10),
+                PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MIN_IDLE, 
2),
+                PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MAX_IDLE, 
5),
+                
PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MAX_WAIT_MS, 30000),
+                
PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_IDLE_TIMEOUT_MS, 600000));
+        log.info("KubernetesClientPool initialized with config: {}", 
poolConfig);
+
+        // clean connection thread
+        startCleanupThread();
+    }
+
+    public static KubernetesClientPool getInstance() {
+        return INSTANCE;
+    }
+
+    /**
+     * Generate cluster identifier based on kubeconfig
+     * @param kubeConfig kubeconfig Configuration
+     * @return Cluster identification
+     */
+    public String getClusterId(String kubeConfig) {
+        try {
+            MessageDigest digest = MessageDigest.getInstance("SHA-256");
+            byte[] hashBytes = 
digest.digest(kubeConfig.getBytes(StandardCharsets.UTF_8));
+            String base64Hash = 
Base64.getUrlEncoder().encodeToString(hashBytes);
+            return base64Hash.replace(BASE64_PADDING_CHARACTER, EMPTY_STRING);
+        } catch (Exception e) {
+            log.error("Failed to generate cluster ID", e);
+            return Integer.toString(kubeConfig.hashCode());

Review Comment:
   The fallback cluster ID generation using `hashCode()` can produce negative 
values, which could cause issues. Although line 170 in 
KubernetesApplicationManager uses `hashCode & 0x7FFFFFFF` to ensure 
non-negative values, this fallback in KubernetesClientPool doesn't apply the 
same mask, creating inconsistency in cluster ID generation.
   ```suggestion
               return Integer.toString(kubeConfig.hashCode() & 0x7FFFFFFF);
   ```



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java:
##########
@@ -56,33 +53,47 @@ public class KubernetesApplicationManager implements 
ApplicationManager<Kubernet
     private static final String FAILED = "Failed";
     private static final String UNKNOWN = "Unknown";
 
+    private static final String K8S_CLUSTER_PREFIX = "k8s-cluster-";
     private static final int MAX_RETRY_TIMES = 10;
 
     /**
-     * cache k8s client for same task
+     * Get Kubernetes client connection pool instance
      */
-    private final Map<String, KubernetesClient> cacheClientMap = new 
ConcurrentHashMap<>();
+    public final KubernetesClientPool clientPool = 
KubernetesClientPool.getInstance();
 
     @Override
     public boolean killApplication(KubernetesApplicationManagerContext 
kubernetesApplicationManagerContext) throws TaskException {
 
         boolean isKill;
         String labelValue = 
kubernetesApplicationManagerContext.getLabelValue();
+
         FilterWatchListDeletable<Pod, PodList, PodResource> watchList =
                 getListenPod(kubernetesApplicationManagerContext);
         try {
             if (getApplicationStatus(kubernetesApplicationManagerContext, 
watchList).isFailure()) {
                 log.error("Driver pod is in FAILED or UNKNOWN status.");
                 isKill = false;
             } else {
-                watchList.delete();
-                isKill = true;
+                String clusterId = 
getClusterId(kubernetesApplicationManagerContext.getK8sTaskExecutionContext());
+                KubernetesClient client = null;
+                try {
+                    client = getClient(kubernetesApplicationManagerContext);
+                    // Retrieve watchList again, as the previous instance of 
tes client connection pool may have expired

Review Comment:
   Typo in comment: 'tes client' should be 'the client'.
   ```suggestion
                       // Retrieve watchList again, as the previous instance of 
the client connection pool may have expired
   ```



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/KubernetesClientPoolTest.java:
##########
@@ -0,0 +1,1262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.k8s;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedConstruction;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import io.fabric8.kubernetes.api.model.Namespace;
+import io.fabric8.kubernetes.api.model.NamespaceList;
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientBuilder;
+import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
+import io.fabric8.kubernetes.client.dsl.Resource;
+
+public class KubernetesClientPoolTest {
+
+    private KubernetesClientPool mockPool;
+    private final String mockKubeConfig =
+            "apiVersion: v1\nclusters:\n- cluster:\n    server: 
https://kubernetes.default.svc\n  name: mock-cluster\ncontexts:\n- context:\n   
 cluster: mock-cluster\n    namespace: default\n    user: mock-user\n  name: 
mock-context\ncurrent-context: mock-context\nkind: Config\npreferences: 
{}\nusers:\n- name: mock-user\n  user: {}";
+    private final String clusterId = "mock-cluster-id";
+    private MockedStatic<KubernetesClientPool> mockedKubernetesClientPool;
+
+    @BeforeEach
+    public void before() throws Exception {
+        mockedKubernetesClientPool = 
Mockito.mockStatic(KubernetesClientPool.class);
+
+        mockPool = Mockito.mock(KubernetesClientPool.class);
+        KubernetesClient mockClient = Mockito.mock(KubernetesClient.class);
+
+        Mockito.when(KubernetesClientPool.getInstance()).thenReturn(mockPool);
+        Mockito.when(mockPool.getClient(Mockito.anyString(), 
Mockito.anyString())).thenReturn(mockClient);
+        
Mockito.when(mockPool.getClusterId(mockKubeConfig)).thenReturn(clusterId);
+    }
+
+    @AfterEach
+    public void after() {
+        if (mockedKubernetesClientPool != null) {
+            mockedKubernetesClientPool.close();
+        }
+    }
+
+    /**
+     * test: getClusterId,getClient,closePool,returnClient
+     */
+    @Test
+    public void testKubernetesClientPoolBasicFunction() {
+        KubernetesClientPool mockPool = KubernetesClientPool.getInstance();
+
+        String clusterId = "mock-cluster-id";
+        
Mockito.when(mockPool.getClusterId(Mockito.anyString())).thenReturn(clusterId);
+
+        KubernetesClient mockClient = Mockito.mock(KubernetesClient.class);
+        Mockito.when(mockPool.getClient(clusterId, 
mockKubeConfig)).thenReturn(mockClient);
+
+        String actualClusterId = mockPool.getClusterId(mockKubeConfig);
+        Assertions.assertEquals(clusterId, actualClusterId);
+
+        KubernetesClient client1 = mockPool.getClient(clusterId, 
mockKubeConfig);
+        Assertions.assertNotNull(client1);
+        Assertions.assertEquals(mockClient, client1);
+
+        mockPool.returnClient(clusterId, client1);
+        Mockito.verify(mockPool).returnClient(clusterId, client1);
+
+        KubernetesClient client2 = mockPool.getClient(clusterId, 
mockKubeConfig);
+        Assertions.assertNotNull(client2);
+
+        mockPool.closePool(clusterId);
+        Mockito.verify(mockPool).closePool(clusterId);
+    }
+
+    /**
+     * test:PoolConfig,
+     */
+    @Test
+    public void testKubernetesClientPoolConfig() {
+        try {
+            KubernetesClientPool.PoolConfig expectedConfig = new 
KubernetesClientPool.PoolConfig(
+                    10, // maxSize
+                    2, // minIdle
+                    5, // maxIdle
+                    10000, // maxWaitMs
+                    600000 // idleTimeoutMs
+            );
+
+            KubernetesClientPool mockPool = 
Mockito.mock(KubernetesClientPool.class);
+
+            java.lang.reflect.Field configField = 
KubernetesClientPool.class.getDeclaredField("poolConfig");
+            configField.setAccessible(true);
+            configField.set(mockPool, expectedConfig);
+
+            Assertions.assertEquals(10, expectedConfig.getMaxSize());
+            Assertions.assertEquals(2, expectedConfig.getMinIdle());
+            Assertions.assertEquals(5, expectedConfig.getMaxIdle());
+            Assertions.assertEquals(10000, expectedConfig.getMaxWaitMs());
+        } catch (Exception e) {
+            Assertions.fail("Failed to test connection pool config: " + 
e.getMessage());
+        }
+    }
+
+    /**
+     * Test handling of invalid kubeConfig
+     */
+    @Test
+    public void testInvalidKubeConfig() {
+        String invalidKubeConfig = "invalid config";
+        // Mock KubernetesClientPool.getInstance() to return our mockPool
+        KubernetesClientPool realPool = 
Mockito.spy(KubernetesClientPool.class); // spy on actual method
+        String clusterId = realPool.getClusterId(invalidKubeConfig);
+        Assertions.assertNotNull(clusterId, "Cluster ID should not be null 
even for invalid kubeConfig");
+
+        // Mock getClient to throw exception for invalid kubeConfig
+        Mockito.when(mockPool.getClient(clusterId, invalidKubeConfig))
+                .thenThrow(new RuntimeException("Invalid kubeconfig"));
+
+        // Test getClient with invalid kubeConfig (should throw exception)
+        Assertions.assertThrows(Exception.class, () -> 
mockPool.getClient(clusterId, invalidKubeConfig),
+                "getClient should throw exception for invalid kubeConfig");
+    }
+    @Test
+    public void testClusterIdGeneration() {
+
+        KubernetesClientPool mockPool = KubernetesClientPool.getInstance();
+
+        String mockClusterId = "mock-cluster-id-1";
+        
Mockito.when(mockPool.getClusterId(mockKubeConfig)).thenReturn(mockClusterId);
+
+        String differentKubeConfig = mockKubeConfig + "#different";
+        String mockDifferentClusterId = "mock-cluster-id-2";
+        
Mockito.when(mockPool.getClusterId(differentKubeConfig)).thenReturn(mockDifferentClusterId);
+
+        String clusterId1 = mockPool.getClusterId(mockKubeConfig);
+        String clusterId2 = mockPool.getClusterId(mockKubeConfig);
+        Assertions.assertEquals(clusterId1, clusterId2);
+        Assertions.assertEquals(mockClusterId, clusterId1);
+
+        String clusterId3 = mockPool.getClusterId(differentKubeConfig);
+        Assertions.assertNotEquals(clusterId1, clusterId3);
+        Assertions.assertEquals(mockDifferentClusterId, clusterId3);
+    }
+
+    /**
+     * Test singleton pattern of KubernetesClientPool
+     */
+    @Test
+    public void testSingletonPattern() {
+        KubernetesClientPool instance1 = KubernetesClientPool.getInstance();
+        KubernetesClientPool instance2 = KubernetesClientPool.getInstance();
+        Assertions.assertSame(instance1, instance2, "KubernetesClientPool 
should be a singleton");
+    }
+
+    /**
+     * Test getClusterId method with valid kubeConfig
+     */
+    @Test
+    public void testGetClusterId() {
+        String clusterId = mockPool.getClusterId(mockKubeConfig);
+        Assertions.assertNotNull(clusterId, "Cluster ID should not be null");
+        // Verify that the same kubeConfig returns the same clusterId
+        String sameClusterId = mockPool.getClusterId(mockKubeConfig);
+        Assertions.assertEquals(clusterId, sameClusterId, "Same kubeConfig 
should return same cluster ID");
+    }
+
+    /**
+     * Test getClusterId method with different kubeConfigs
+     */
+    @Test
+    public void testGetClusterIdDifferentConfigs() {
+        String clusterId1 = mockPool.getClusterId(mockKubeConfig);
+        String differentKubeConfig = mockKubeConfig + "#different";
+        String clusterId2 = mockPool.getClusterId(differentKubeConfig);
+        Assertions.assertNotEquals(clusterId1, clusterId2, "Different 
kubeConfigs should return different cluster IDs");
+    }
+
+    /**
+     * Test getClient method with valid parameters
+     */
+    @Test
+    public void testGetClient() throws Exception {
+        // Mock a KubernetesClient and ClusterClientPool
+        KubernetesClient mockClient = Mockito.mock(KubernetesClient.class);
+        // Get the real instance for testing
+        KubernetesClientPool realPool = 
Mockito.spy(KubernetesClientPool.class);
+
+        // Mock the clusterClientPools field to return a mock ClusterClientPool
+        java.lang.reflect.Field clusterClientPoolsField =
+                
KubernetesClientPool.class.getDeclaredField("clusterClientPools");
+        clusterClientPoolsField.setAccessible(true);
+        ConcurrentHashMap<String, KubernetesClientPool.ClusterClientPool> 
mockClusterClientPools =
+                new ConcurrentHashMap<>();
+
+        // Create a mock ClusterClientPool
+        KubernetesClientPool.ClusterClientPool mockClusterClientPool =
+                Mockito.mock(KubernetesClientPool.ClusterClientPool.class);
+        mockClusterClientPools.put(clusterId, mockClusterClientPool);
+
+        // Mock the borrowObject method
+        
Mockito.doReturn(mockClient).when(mockClusterClientPool).borrowObject();
+
+        // Set the mock clusterClientPools into the realPool
+        clusterClientPoolsField.set(realPool, mockClusterClientPools);
+
+        // Mock the getInstance method to return our spy
+        Mockito.when(KubernetesClientPool.getInstance()).thenReturn(realPool);
+
+        // Test getting a client
+        KubernetesClient client = realPool.getClient(clusterId, 
mockKubeConfig);
+
+        // Verify the client is not null
+        Assertions.assertNotNull(client, "Client should not be null");
+        Assertions.assertEquals(mockClient, client, "Returned client should 
match the mock client");
+    }
+
+    /**
+     * Test returnClient method
+     */
+    @Test
+    public void testReturnClient() throws Exception {
+        // Mock a KubernetesClient and ClusterClientPool
+        KubernetesClient mockClient = Mockito.mock(KubernetesClient.class);
+
+        // Get the real instance for testing
+        KubernetesClientPool realPool = 
Mockito.spy(KubernetesClientPool.class);
+
+        // Mock the clusterClientPools field to return a mock ClusterClientPool
+        java.lang.reflect.Field clusterClientPoolsField =
+                
KubernetesClientPool.class.getDeclaredField("clusterClientPools");
+        clusterClientPoolsField.setAccessible(true);
+        ConcurrentHashMap<String, KubernetesClientPool.ClusterClientPool> 
mockClusterClientPools =
+                new ConcurrentHashMap<>();
+
+        // Create a mock ClusterClientPool
+        KubernetesClientPool.ClusterClientPool mockClusterClientPool =
+                Mockito.mock(KubernetesClientPool.ClusterClientPool.class);
+        mockClusterClientPools.put(clusterId, mockClusterClientPool);
+
+        // Mock the borrowObject method to return our mock client
+        
Mockito.doReturn(mockClient).when(mockClusterClientPool).borrowObject();
+
+        // Set the mock clusterClientPools into the realPool
+        clusterClientPoolsField.set(realPool, mockClusterClientPools);
+
+        // Mock the getInstance method to return our spy
+        Mockito.when(KubernetesClientPool.getInstance()).thenReturn(realPool);
+
+        // Get a client
+        KubernetesClient client = realPool.getClient(clusterId, 
mockKubeConfig);
+        Assertions.assertNotNull(client, "Client should not be null");
+
+        // Return the client to the pool
+        realPool.returnClient(clusterId, client);
+
+        // Verify that returnObject was called on the mockClusterClientPool
+        Mockito.verify(mockClusterClientPool).returnObject(client);
+
+        // Verify that getClient calls borrowObject again
+        realPool.getClient(clusterId, mockKubeConfig);
+        Mockito.verify(mockClusterClientPool, Mockito.times(2)).borrowObject();
+    }
+
+    /**
+     * Test closePool method
+     */
+    @Test
+    public void testClosePool() throws Exception {
+        // Get the real instance for testing
+        KubernetesClientPool realPool = 
Mockito.spy(KubernetesClientPool.class);
+
+        // Mock the clusterClientPools field to return a mock ClusterClientPool
+        java.lang.reflect.Field clusterClientPoolsField =
+                
KubernetesClientPool.class.getDeclaredField("clusterClientPools");
+        clusterClientPoolsField.setAccessible(true);
+        ConcurrentHashMap<String, KubernetesClientPool.ClusterClientPool> 
mockClusterClientPools =
+                new ConcurrentHashMap<>();
+
+        // Create a mock ClusterClientPool
+        KubernetesClientPool.ClusterClientPool mockClusterClientPool =
+                Mockito.mock(KubernetesClientPool.ClusterClientPool.class);
+        mockClusterClientPools.put(clusterId, mockClusterClientPool);
+
+        // Set the mock clusterClientPools into the realPool
+        clusterClientPoolsField.set(realPool, mockClusterClientPools);
+
+        // Mock the getInstance method to return our spy
+        Mockito.when(KubernetesClientPool.getInstance()).thenReturn(realPool);
+
+        // Close the pool
+        realPool.closePool(clusterId);
+
+        // Verify that the ClusterClientPool's close method was called
+        Mockito.verify(mockClusterClientPool).close();
+
+        // Verify that the clusterClientPools no longer contains the clusterId
+        Assertions.assertFalse(mockClusterClientPools.containsKey(clusterId),
+                "Cluster client pool should be removed after close");
+    }
+
+    /**
+     * Test close method (closes all pools)
+     */
+    @Test
+    public void testClose() throws Exception {
+        // Get the real instance for testing
+        KubernetesClientPool realPool = 
Mockito.spy(KubernetesClientPool.class);
+
+        // Mock the clusterClientPools field
+        java.lang.reflect.Field clusterClientPoolsField =
+                
KubernetesClientPool.class.getDeclaredField("clusterClientPools");
+        clusterClientPoolsField.setAccessible(true);
+        ConcurrentHashMap<String, KubernetesClientPool.ClusterClientPool> 
mockClusterClientPools =
+                new ConcurrentHashMap<>();
+
+        // Create multiple mock ClusterClientPools for different clusters
+        String clusterId1 = clusterId;
+        String clusterId2 = clusterId + "_2";
+
+        KubernetesClientPool.ClusterClientPool mockClusterClientPool1 =
+                Mockito.mock(KubernetesClientPool.ClusterClientPool.class);
+        KubernetesClientPool.ClusterClientPool mockClusterClientPool2 =
+                Mockito.mock(KubernetesClientPool.ClusterClientPool.class);
+
+        mockClusterClientPools.put(clusterId1, mockClusterClientPool1);
+        mockClusterClientPools.put(clusterId2, mockClusterClientPool2);
+
+        // Set the mock clusterClientPools into the realPool
+        clusterClientPoolsField.set(realPool, mockClusterClientPools);
+
+        // Mock the getInstance method to return our spy
+        Mockito.when(KubernetesClientPool.getInstance()).thenReturn(realPool);
+
+        // Close all pools
+        realPool.closePool(clusterId1);
+        realPool.closePool(clusterId2);
+
+        // Verify that both ClusterClientPools' close methods were called
+        Mockito.verify(mockClusterClientPool1).close();
+        Mockito.verify(mockClusterClientPool2).close();
+
+        // Verify that the clusterClientPools is now empty
+        Assertions.assertTrue(mockClusterClientPools.isEmpty(),
+                "All cluster client pools should be removed after close");
+    }
+
+    @Test
+    public void testConcurrentAccess() throws Exception {
+        final int threadCount = 10;
+        final int operationsPerThread = 5;
+        final CountDownLatch latch = new CountDownLatch(threadCount);
+        final ExecutorService executorService = 
Executors.newFixedThreadPool(threadCount);
+        final List<Exception> exceptions = new ArrayList<>();
+
+        // Get the real instance for testing
+        KubernetesClientPool realPool = 
Mockito.spy(KubernetesClientPool.class);
+
+        // Mock the clusterClientPools field
+        java.lang.reflect.Field clusterClientPoolsField =
+                
KubernetesClientPool.class.getDeclaredField("clusterClientPools");
+        clusterClientPoolsField.setAccessible(true);
+        ConcurrentHashMap<String, KubernetesClientPool.ClusterClientPool> 
mockClusterClientPools =
+                new ConcurrentHashMap<>();
+
+        // Create a mock ClusterClientPool
+        KubernetesClientPool.ClusterClientPool mockClusterClientPool =
+                Mockito.mock(KubernetesClientPool.ClusterClientPool.class);
+        mockClusterClientPools.put(clusterId, mockClusterClientPool);
+
+        // Create a mock KubernetesClient
+        KubernetesClient mockClient = Mockito.mock(KubernetesClient.class);
+
+        // Mock the borrowObject method to return our mock client
+        
Mockito.when(mockClusterClientPool.borrowObject()).thenReturn(mockClient);
+
+        // Set the mock clusterClientPools into the realPool
+        clusterClientPoolsField.set(realPool, mockClusterClientPools);
+
+        // Mock the getInstance method to return our spy
+        Mockito.when(KubernetesClientPool.getInstance()).thenReturn(realPool);
+
+        try {
+            // Start multiple threads that concurrently access the client pool
+            for (int i = 0; i < threadCount; i++) {
+                executorService.submit(() -> {
+                    try {
+                        for (int j = 0; j < operationsPerThread; j++) {
+                            // Get a client
+                            KubernetesClient client = 
realPool.getClient(clusterId, mockKubeConfig);
+                            Assertions.assertNotNull(client, "Client should 
not be null");
+
+                            // Simulate some work with the client
+                            Awaitility.await().atMost(Duration.ofMillis(10));
+
+                            // Return the client to the pool
+                            realPool.returnClient(clusterId, client);
+                        }
+                    } catch (Exception e) {
+                        exceptions.add(e);
+                    } finally {
+                        latch.countDown();
+                    }
+                });
+            }
+
+            // Wait for all threads to complete
+            boolean completed = latch.await(60, TimeUnit.SECONDS);
+            Assertions.assertTrue(completed, "All threads should complete 
within timeout");
+
+            // Verify no exceptions occurred during concurrent access
+            Assertions.assertTrue(exceptions.isEmpty(), "No exceptions should 
occur during concurrent access");
+
+            // Verify that borrowObject and returnObject were called the 
expected number of times
+            int expectedCalls = threadCount * operationsPerThread;
+            Mockito.verify(mockClusterClientPool, 
Mockito.times(expectedCalls)).borrowObject();
+            Mockito.verify(mockClusterClientPool, 
Mockito.times(expectedCalls)).returnObject(Mockito.any());
+        } finally {
+            executorService.shutdown();
+        }
+    }
+
+    /**
+     * Test handling of closed client
+     */
+    @Test
+    public void testHandlingClosedClient() {
+        // Get the real instance for testing
+        KubernetesClientPool realPool = 
Mockito.spy(KubernetesClientPool.class);
+
+        // Mock the clusterClientPools field to return a mock ClusterClientPool
+        try {
+            java.lang.reflect.Field clusterClientPoolsField =
+                    
KubernetesClientPool.class.getDeclaredField("clusterClientPools");
+            clusterClientPoolsField.setAccessible(true);
+            ConcurrentHashMap<String, KubernetesClientPool.ClusterClientPool> 
mockClusterClientPools =
+                    new ConcurrentHashMap<>();
+
+            // Create a mock ClusterClientPool
+            KubernetesClientPool.ClusterClientPool mockClusterClientPool =
+                    Mockito.mock(KubernetesClientPool.ClusterClientPool.class);
+            mockClusterClientPools.put(clusterId, mockClusterClientPool);
+
+            // Mock the borrowObject method to throw an exception, simulating 
a closed client scenario
+            Mockito.doThrow(new RuntimeException("Client is 
closed")).when(mockClusterClientPool).borrowObject();
+
+            // Set the mock clusterClientPools into the realPool
+            clusterClientPoolsField.set(realPool, mockClusterClientPools);
+
+            // Test getClient with a closed client scenario
+            try {
+                // This should handle the exception gracefully
+                KubernetesClient client = realPool.getClient(clusterId, 
mockKubeConfig);
+                // If no exception is thrown, verify the client is handled 
properly
+                Assertions.assertNull(client, "Client should be null when 
borrowObject fails");
+            } catch (Exception e) {
+                // If exception propagates, this is also acceptable behavior
+                System.out.println("Expected exception when getting client in 
closed scenario: " + e.getMessage());
+            }
+
+            // Verify that borrowObject was called
+            Mockito.verify(mockClusterClientPool).borrowObject();
+        } catch (Exception e) {
+            Assertions.fail("Failed to setup test for closed client scenario: 
" + e.getMessage());
+        }
+    }
+
+    /**
+     * Test that PooledClient constructor correctly initializes the client and 
lastUsedTime.
+     */
+    @Test
+    public void testPooledClientConstructor() throws Exception {
+        // Mock a KubernetesClient
+        KubernetesClient mockClient = Mockito.mock(KubernetesClient.class);
+        Class<?> pooledClientClass = getPooledClientClass();
+        long currentTime = System.currentTimeMillis();
+
+        // Create a PooledClient instance using reflection
+        Constructor<?> constructor = 
pooledClientClass.getDeclaredConstructor(KubernetesClient.class);
+        constructor.setAccessible(true);
+        Object pooledClient = constructor.newInstance(mockClient);
+
+        // Verify that the client was set correctly
+        Field clientField = pooledClientClass.getDeclaredField("client");
+        Field lastUsedTimeField = 
pooledClientClass.getDeclaredField("lastUsedTime");
+        lastUsedTimeField.setAccessible(true);
+        clientField.setAccessible(true);
+
+        Object clientValue = clientField.get(pooledClient);
+        // Verify the client is accessible and correct
+        Assertions.assertNotNull(clientValue, "Client should not be null");
+        Assertions.assertSame(mockClient, clientValue, "Client reference 
should match the original client");
+
+        // Verify that lastUsedTime was initialized correctly
+        long lastUsedTimeValue = lastUsedTimeField.getLong(pooledClient);
+
+        // Allow for a small time difference (up to 1 second) between test 
execution and constructor call
+        Assertions.assertEquals(currentTime, lastUsedTimeValue, 1000,
+                "lastUsedTime should be initialized to current time");
+
+        // Test that we can modify the lastUsedTime field.
+        long newTime = System.currentTimeMillis() + 10000; // 10 seconds in 
the future
+        lastUsedTimeField.setLong(pooledClient, newTime);
+        // Verify the modification
+        long updatedTime = lastUsedTimeField.getLong(pooledClient);
+        Assertions.assertEquals(newTime, updatedTime, "lastUsedTime should be 
successfully updated");
+    }
+
+    /**
+     * Helper method to get the PooledClient class using reflection.
+     */
+    private Class<?> getPooledClientClass() throws ClassNotFoundException {
+        Class<?> clusterClientPoolClass =
+                
Class.forName("org.apache.dolphinscheduler.plugin.task.api.k8s.KubernetesClientPool$ClusterClientPool");
+        Class<?>[] nestedClasses = clusterClientPoolClass.getDeclaredClasses();
+        Class<?> targetClass = 
KubernetesClientPool.ClusterClientPool.PooledClient.class;
+
+        for (Class<?> nestedClass : nestedClasses) {
+            if (nestedClass == targetClass) {
+                return nestedClass;
+            }
+        }
+
+        throw new ClassNotFoundException("Could not find PooledClient class");
+    }
+
+    @Test
+    public void testClusterClientPoolBorrowObjectTimeout() throws Exception {
+        // Mock a KubernetesClient and create a PoolConfig with small timeout
+        KubernetesClientPool.PoolConfig poolConfig = new 
KubernetesClientPool.PoolConfig(
+                2, // maxSize
+                0, // minIdle
+                2, // maxIdle
+                500, // maxWaitMs (500ms timeout)
+                600000); // idleTimeoutMs
+
+        // Create a real ClusterClientPool instance using reflection
+        Class<?> clusterClientPoolClass =
+                
Class.forName("org.apache.dolphinscheduler.plugin.task.api.k8s.KubernetesClientPool$ClusterClientPool");
+        Constructor<?> constructor = 
clusterClientPoolClass.getDeclaredConstructor(String.class, String.class,
+                KubernetesClientPool.PoolConfig.class);
+        constructor.setAccessible(true);
+
+        // Create mock clients
+        KubernetesClient mockClient1 = Mockito.mock(KubernetesClient.class);
+        KubernetesClient mockClient2 = Mockito.mock(KubernetesClient.class);
+
+        // Use a class-level AtomicInteger to track builder calls
+        AtomicInteger builderCounter = new AtomicInteger(0);
+
+        // Mock KubernetesClientBuilder to control client creation
+        try (
+                MockedConstruction<KubernetesClientBuilder> mockedConstruction 
=
+                        Mockito.mockConstruction(KubernetesClientBuilder.class,
+                                (mock, context) -> {
+                                    // Configure the mock behavior
+                                    Mockito.when(mock.withConfig((Config) 
Mockito.any())).thenReturn(mock);
+                                    // Use the class-level counter to 
determine which mock client to return
+                                    
Mockito.when(mock.build()).thenAnswer(invocation -> {
+                                        int count = 
builderCounter.getAndIncrement();
+                                        if (count == 0) {
+                                            return mockClient1;
+                                        } else if (count == 1) {
+                                            return mockClient2;
+                                        } else {
+                                            // For any additional calls, just 
return mockClient2
+                                            return mockClient2;
+                                        }
+                                    });
+                                })) {
+
+            Object clusterClientPool = constructor.newInstance(clusterId, 
mockKubeConfig, poolConfig);
+
+            // Get the borrowObject method
+            Method borrowObjectMethod = 
clusterClientPoolClass.getDeclaredMethod("borrowObject");
+            borrowObjectMethod.setAccessible(true);
+
+            // Get all available clients (maxSize = 2)
+            KubernetesClient client1 = (KubernetesClient) 
borrowObjectMethod.invoke(clusterClientPool);
+            KubernetesClient client2 = (KubernetesClient) 
borrowObjectMethod.invoke(clusterClientPool);

Review Comment:
   Variable 'KubernetesClient client2' is never read.
   ```suggestion
               borrowObjectMethod.invoke(clusterClientPool);
   ```



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java:
##########
@@ -186,34 +251,39 @@ private TaskExecutionStatus 
getApplicationStatus(KubernetesApplicationManagerCon
     /**
      * get pod's log watcher
      *
-     * @param kubernetesApplicationManagerContext
-     * @return
+     * @param kubernetesApplicationManagerContext Context
+     * @return Watcher
      */
     @SneakyThrows
     public LogWatch getPodLogWatcher(KubernetesApplicationManagerContext 
kubernetesApplicationManagerContext) {
-        KubernetesClient client = 
getClient(kubernetesApplicationManagerContext);
+        KubernetesClient client = null;
         boolean podIsReady = false;
         Pod pod = null;
-        while (!podIsReady) {
-            FilterWatchListDeletable<Pod, PodList, PodResource> watchList =
-                    getListenPod(kubernetesApplicationManagerContext);
-            List<Pod> podList = watchList == null ? null : 
watchList.list().getItems();
-            if (CollectionUtils.isEmpty(podList)) {
-                return null;
-            }
-            pod = podList.get(0);
-            String phase = pod.getStatus().getPhase();
-            if (phase.equals(PENDING) || phase.equals(UNKNOWN)) {
-                Thread.sleep(SLEEP_TIME_MILLIS);
-            } else {
-                podIsReady = true;
+        try {
+            client = getClient(kubernetesApplicationManagerContext);
+            while (!podIsReady) {
+                FilterWatchListDeletable<Pod, PodList, PodResource> watchList =
+                        getListenPod(kubernetesApplicationManagerContext);
+                List<Pod> podList = watchList == null ? null : 
watchList.list().getItems();
+                if (CollectionUtils.isEmpty(podList)) {
+                    return null;
+                }
+                pod = podList.get(0);
+                String phase = pod.getStatus().getPhase();
+                if (phase.equals(PENDING) || phase.equals(UNKNOWN)) {
+                    Thread.sleep(SLEEP_TIME_MILLIS);
+                } else {
+                    podIsReady = true;
+                }
             }
-        }
 
-        return client.pods().inNamespace(pod.getMetadata().getNamespace())
-                .withName(pod.getMetadata().getName())
-                
.inContainer(kubernetesApplicationManagerContext.getContainerName())
-                .watchLog();
+            return client.pods().inNamespace(pod.getMetadata().getNamespace())
+                    .withName(pod.getMetadata().getName())
+                    
.inContainer(kubernetesApplicationManagerContext.getContainerName())
+                    .watchLog();
+        } finally {
+            log.debug("Log watch client is not returned immediately, will be 
managed by caller after watch completes");
+        }
     }
 

Review Comment:
   Similar to the issue in K8sUtils.createBatchJobWatcher, the client obtained 
in getPodLogWatcher (line 263) is not returned to the pool in the finally 
block. The debug message suggests it's intentional, but this creates a 
connection leak as there's no guarantee the caller will properly return the 
client after the watch completes.
   ```suggestion
               LogWatch logWatch = 
client.pods().inNamespace(pod.getMetadata().getNamespace())
                       .withName(pod.getMetadata().getName())
                       
.inContainer(kubernetesApplicationManagerContext.getContainerName())
                       .watchLog();
               return new ClientReturningLogWatch(logWatch, client, 
kubernetesApplicationManagerContext);
           } catch (Exception e) {
               if (client != null) {
                   
KubernetesClientPool.returnClient(kubernetesApplicationManagerContext, client);
               }
               throw e;
           }
       }
   
       /**
        * Wrapper for LogWatch that returns the client to the pool when closed.
        */
       private static class ClientReturningLogWatch implements LogWatch {
           private final LogWatch delegate;
           private final KubernetesClient client;
           private final KubernetesApplicationManagerContext context;
           private boolean closed = false;
   
           public ClientReturningLogWatch(LogWatch delegate, KubernetesClient 
client, KubernetesApplicationManagerContext context) {
               this.delegate = delegate;
               this.client = client;
               this.context = context;
           }
   
           @Override
           public void close() {
               if (!closed) {
                   try {
                       delegate.close();
                   } finally {
                       KubernetesClientPool.returnClient(context, client);
                       closed = true;
                   }
               }
           }
   
           @Override
           public java.io.InputStream getOutput() {
               return delegate.getOutput();
           }
   
           @Override
           public java.io.InputStream getError() {
               return delegate.getError();
           }
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to