SbloodyS commented on code in PR #17510: URL: https://github.com/apache/dolphinscheduler/pull/17510#discussion_r2467633221
########## dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/KubernetesClientPool.java: ########## @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.k8s; + +import org.apache.dolphinscheduler.common.utils.PropertyUtils; +import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; + +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.util.Base64; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import lombok.extern.slf4j.Slf4j; +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; + +/** + * KubernetesClientPool is used to manage the Kubernetes client connection pool + * Maintain an independent connection pool for each K8s cluster to implement connection creation, acquisition, return, and closure + */ +@Slf4j +public class KubernetesClientPool { + + /** + * Connection pool instance + */ + private static final KubernetesClientPool INSTANCE = new KubernetesClientPool(); + + /** + * Cluster connection pool mapping, with the key being the cluster identifier + */ + private final ConcurrentMap<String, ClusterClientPool> clusterClientPools = new ConcurrentHashMap<>(); + + /** + * Connection pool configuration + */ + private final PoolConfig poolConfig; + + private KubernetesClientPool() { + this.poolConfig = new PoolConfig( + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MAX_SIZE, 10), + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MIN_IDLE, 2), + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MAX_IDLE, 5), + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MAX_WAIT_MS, 30000), + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_IDLE_TIMEOUT_MS, 600000)); + log.info("KubernetesClientPool initialized with config: {}", poolConfig); + + // clean connection thread + startCleanupThread(); + } + + public static KubernetesClientPool getInstance() { + return INSTANCE; + } + + /** + * Generate cluster identifier based on kubeconfig + * @param kubeConfig kubeconfig Configuration + * @return Cluster identification + */ + public String getClusterId(String kubeConfig) { + try { + + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + byte[] hashBytes = digest.digest(kubeConfig.getBytes(StandardCharsets.UTF_8)); + + String base64Hash = Base64.getUrlEncoder().encodeToString(hashBytes); + return base64Hash.replace("=", ""); Review Comment: It's better to use constants here. ########## dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/KubernetesClientPool.java: ########## @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.k8s; + +import org.apache.dolphinscheduler.common.utils.PropertyUtils; +import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; + +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.util.Base64; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import lombok.extern.slf4j.Slf4j; +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; + +/** + * KubernetesClientPool is used to manage the Kubernetes client connection pool + * Maintain an independent connection pool for each K8s cluster to implement connection creation, acquisition, return, and closure + */ +@Slf4j +public class KubernetesClientPool { + + /** + * Connection pool instance + */ + private static final KubernetesClientPool INSTANCE = new KubernetesClientPool(); + + /** + * Cluster connection pool mapping, with the key being the cluster identifier + */ + private final ConcurrentMap<String, ClusterClientPool> clusterClientPools = new ConcurrentHashMap<>(); + + /** + * Connection pool configuration + */ + private final PoolConfig poolConfig; + + private KubernetesClientPool() { + this.poolConfig = new PoolConfig( + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MAX_SIZE, 10), + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MIN_IDLE, 2), + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MAX_IDLE, 5), + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MAX_WAIT_MS, 30000), + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_IDLE_TIMEOUT_MS, 600000)); + log.info("KubernetesClientPool initialized with config: {}", poolConfig); + + // clean connection thread + startCleanupThread(); + } + + public static KubernetesClientPool getInstance() { + return INSTANCE; + } + + /** + * Generate cluster identifier based on kubeconfig + * @param kubeConfig kubeconfig Configuration + * @return Cluster identification + */ + public String getClusterId(String kubeConfig) { + try { + + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + byte[] hashBytes = digest.digest(kubeConfig.getBytes(StandardCharsets.UTF_8)); + + String base64Hash = Base64.getUrlEncoder().encodeToString(hashBytes); + return base64Hash.replace("=", ""); + } catch (Exception e) { + log.error("Failed to generate cluster ID", e); + return Integer.toString(kubeConfig.hashCode()); + } + } + + /** + * Obtain the Kubernetes client for the specified cluster + * @param clusterId Cluster Identifier + * @param kubeConfig kubeconfig Configuration + * @return Kubernetes Client + */ + public KubernetesClient getClient(String clusterId, String kubeConfig) { + ClusterClientPool pool = clusterClientPools.computeIfAbsent(clusterId, + k -> new ClusterClientPool(k, kubeConfig, poolConfig)); + try { + return pool.borrowObject(); + } catch (Exception e) { + log.error("Failed to get Kubernetes client", e); + return null; + } + } + + /** + * Return the Kubernetes client to the connection pool + * @param clusterId Cluster Identifier + * @param client Kubernetes Client will be returned + */ + public void returnClient(String clusterId, KubernetesClient client) { + ClusterClientPool pool = clusterClientPools.get(clusterId); + if (pool != null) { + pool.returnObject(client); + } + } + + /** + * Close the connection pool of the specified cluster + * @param clusterId Cluster Identifier + */ + public void closePool(String clusterId) { + ClusterClientPool pool = clusterClientPools.remove(clusterId); + if (pool != null) { + pool.close(); + } + } + + /** + * Start the cleanup thread to regularly clean up idle connections + */ + private void startCleanupThread() { + Thread cleanupThread = new Thread(() -> { + while (true) { + try { + // every 30s + Thread.sleep(30000); + cleanupIdleClients(); + } catch (InterruptedException e) { + log.warn("Cleanup thread interrupted", e); + Thread.currentThread().interrupt(); + break; + } + } + }, "k8s-client-cleanup-thread"); + cleanupThread.setDaemon(true); + cleanupThread.start(); + } + + /** + * clean free connections + */ + private void cleanupIdleClients() { + for (ClusterClientPool pool : clusterClientPools.values()) { + pool.cleanupIdle(); + } + } + + /** + * Configuration Class + */ + public static class PoolConfig { + + private final int maxSize; // max connection num + private final int minIdle; // min free connection num + private final int maxIdle; // max free connection num + private final long maxWaitMs; // max waiting time + private final long idleTimeoutMs; // free timeout + + public PoolConfig(int maxSize, int minIdle, int maxIdle, long maxWaitMs, long idleTimeoutMs) { + this.maxSize = maxSize; + this.minIdle = minIdle; + this.maxIdle = maxIdle; + this.maxWaitMs = maxWaitMs; + this.idleTimeoutMs = idleTimeoutMs; + } + + public int getMaxSize() { + return maxSize; + } + public int getMinIdle() { + return minIdle; + } + public int getMaxIdle() { + return maxIdle; + } + public long getMaxWaitMs() { + return maxWaitMs; + } + public long getIdleTimeoutMs() { + return idleTimeoutMs; + } Review Comment: Using `@Getter` here. ########## dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/KubernetesClientPool.java: ########## @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.k8s; + +import org.apache.dolphinscheduler.common.utils.PropertyUtils; +import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; + +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.util.Base64; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import lombok.extern.slf4j.Slf4j; +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; + +/** + * KubernetesClientPool is used to manage the Kubernetes client connection pool + * Maintain an independent connection pool for each K8s cluster to implement connection creation, acquisition, return, and closure + */ +@Slf4j +public class KubernetesClientPool { + + /** + * Connection pool instance + */ + private static final KubernetesClientPool INSTANCE = new KubernetesClientPool(); + + /** + * Cluster connection pool mapping, with the key being the cluster identifier + */ + private final ConcurrentMap<String, ClusterClientPool> clusterClientPools = new ConcurrentHashMap<>(); + + /** + * Connection pool configuration + */ + private final PoolConfig poolConfig; + + private KubernetesClientPool() { + this.poolConfig = new PoolConfig( + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MAX_SIZE, 10), + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MIN_IDLE, 2), + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MAX_IDLE, 5), + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MAX_WAIT_MS, 30000), + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_IDLE_TIMEOUT_MS, 600000)); + log.info("KubernetesClientPool initialized with config: {}", poolConfig); + + // clean connection thread + startCleanupThread(); + } + + public static KubernetesClientPool getInstance() { + return INSTANCE; + } + + /** + * Generate cluster identifier based on kubeconfig + * @param kubeConfig kubeconfig Configuration + * @return Cluster identification + */ + public String getClusterId(String kubeConfig) { + try { + + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + byte[] hashBytes = digest.digest(kubeConfig.getBytes(StandardCharsets.UTF_8)); + + String base64Hash = Base64.getUrlEncoder().encodeToString(hashBytes); + return base64Hash.replace("=", ""); + } catch (Exception e) { + log.error("Failed to generate cluster ID", e); + return Integer.toString(kubeConfig.hashCode()); + } + } + + /** + * Obtain the Kubernetes client for the specified cluster + * @param clusterId Cluster Identifier + * @param kubeConfig kubeconfig Configuration + * @return Kubernetes Client + */ + public KubernetesClient getClient(String clusterId, String kubeConfig) { + ClusterClientPool pool = clusterClientPools.computeIfAbsent(clusterId, + k -> new ClusterClientPool(k, kubeConfig, poolConfig)); + try { + return pool.borrowObject(); + } catch (Exception e) { + log.error("Failed to get Kubernetes client", e); + return null; + } + } + + /** + * Return the Kubernetes client to the connection pool + * @param clusterId Cluster Identifier + * @param client Kubernetes Client will be returned + */ + public void returnClient(String clusterId, KubernetesClient client) { + ClusterClientPool pool = clusterClientPools.get(clusterId); + if (pool != null) { + pool.returnObject(client); + } + } + + /** + * Close the connection pool of the specified cluster + * @param clusterId Cluster Identifier + */ + public void closePool(String clusterId) { + ClusterClientPool pool = clusterClientPools.remove(clusterId); + if (pool != null) { + pool.close(); + } + } + + /** + * Start the cleanup thread to regularly clean up idle connections + */ + private void startCleanupThread() { + Thread cleanupThread = new Thread(() -> { + while (true) { + try { + // every 30s + Thread.sleep(30000); Review Comment: It's better to use constants here. ########## dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/KubernetesClientPool.java: ########## @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.k8s; + +import org.apache.dolphinscheduler.common.utils.PropertyUtils; +import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; + +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.util.Base64; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import lombok.extern.slf4j.Slf4j; +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; + +/** + * KubernetesClientPool is used to manage the Kubernetes client connection pool + * Maintain an independent connection pool for each K8s cluster to implement connection creation, acquisition, return, and closure + */ +@Slf4j +public class KubernetesClientPool { + + /** + * Connection pool instance + */ + private static final KubernetesClientPool INSTANCE = new KubernetesClientPool(); + + /** + * Cluster connection pool mapping, with the key being the cluster identifier + */ + private final ConcurrentMap<String, ClusterClientPool> clusterClientPools = new ConcurrentHashMap<>(); + + /** + * Connection pool configuration + */ + private final PoolConfig poolConfig; + + private KubernetesClientPool() { + this.poolConfig = new PoolConfig( + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MAX_SIZE, 10), + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MIN_IDLE, 2), + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MAX_IDLE, 5), + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MAX_WAIT_MS, 30000), + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_IDLE_TIMEOUT_MS, 600000)); + log.info("KubernetesClientPool initialized with config: {}", poolConfig); + + // clean connection thread + startCleanupThread(); + } + + public static KubernetesClientPool getInstance() { + return INSTANCE; + } + + /** + * Generate cluster identifier based on kubeconfig + * @param kubeConfig kubeconfig Configuration + * @return Cluster identification + */ + public String getClusterId(String kubeConfig) { + try { + + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + byte[] hashBytes = digest.digest(kubeConfig.getBytes(StandardCharsets.UTF_8)); + + String base64Hash = Base64.getUrlEncoder().encodeToString(hashBytes); + return base64Hash.replace("=", ""); + } catch (Exception e) { + log.error("Failed to generate cluster ID", e); + return Integer.toString(kubeConfig.hashCode()); + } + } + + /** + * Obtain the Kubernetes client for the specified cluster + * @param clusterId Cluster Identifier + * @param kubeConfig kubeconfig Configuration + * @return Kubernetes Client + */ + public KubernetesClient getClient(String clusterId, String kubeConfig) { + ClusterClientPool pool = clusterClientPools.computeIfAbsent(clusterId, + k -> new ClusterClientPool(k, kubeConfig, poolConfig)); + try { + return pool.borrowObject(); + } catch (Exception e) { + log.error("Failed to get Kubernetes client", e); + return null; + } + } + + /** + * Return the Kubernetes client to the connection pool + * @param clusterId Cluster Identifier + * @param client Kubernetes Client will be returned + */ + public void returnClient(String clusterId, KubernetesClient client) { + ClusterClientPool pool = clusterClientPools.get(clusterId); + if (pool != null) { + pool.returnObject(client); + } + } + + /** + * Close the connection pool of the specified cluster + * @param clusterId Cluster Identifier + */ + public void closePool(String clusterId) { + ClusterClientPool pool = clusterClientPools.remove(clusterId); + if (pool != null) { + pool.close(); + } + } + + /** + * Start the cleanup thread to regularly clean up idle connections + */ + private void startCleanupThread() { + Thread cleanupThread = new Thread(() -> { + while (true) { + try { + // every 30s Review Comment: ```suggestion ``` ########## dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java: ########## @@ -315,12 +322,11 @@ public TaskResponse run(String k8sParameterStr) throws Exception { } } } catch (Exception e) { + // if Exception happen cancelApplication(k8sParameterStr); Thread.currentThread().interrupt(); result.setExitStatusCode(EXIT_CODE_FAILURE); - throw e; - } finally { - ProcessUtils.removeK8sClientCache(taskRequest.getTaskAppId()); + log.error(e.getMessage()); Review Comment: ```suggestion log.error("k8s task executor running error: ", e); ``` ########## dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java: ########## @@ -254,6 +263,7 @@ public void onClose(WatcherException e) { } catch (Exception e) { log.error("job failed in k8s: {}", e.getMessage(), e); taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); + log.error(e.getMessage()); Review Comment: ```suggestion ``` ########## dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java: ########## @@ -315,12 +322,11 @@ public TaskResponse run(String k8sParameterStr) throws Exception { } } } catch (Exception e) { + // if Exception happen Review Comment: ```suggestion ``` ########## dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/KubernetesClientPool.java: ########## @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.k8s; + +import org.apache.dolphinscheduler.common.utils.PropertyUtils; +import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; + +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.util.Base64; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import lombok.extern.slf4j.Slf4j; +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; + +/** + * KubernetesClientPool is used to manage the Kubernetes client connection pool + * Maintain an independent connection pool for each K8s cluster to implement connection creation, acquisition, return, and closure + */ +@Slf4j +public class KubernetesClientPool { + + /** + * Connection pool instance + */ + private static final KubernetesClientPool INSTANCE = new KubernetesClientPool(); + + /** + * Cluster connection pool mapping, with the key being the cluster identifier + */ + private final ConcurrentMap<String, ClusterClientPool> clusterClientPools = new ConcurrentHashMap<>(); + + /** + * Connection pool configuration + */ + private final PoolConfig poolConfig; + + private KubernetesClientPool() { + this.poolConfig = new PoolConfig( + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MAX_SIZE, 10), + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MIN_IDLE, 2), + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MAX_IDLE, 5), + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MAX_WAIT_MS, 30000), + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_IDLE_TIMEOUT_MS, 600000)); + log.info("KubernetesClientPool initialized with config: {}", poolConfig); + + // clean connection thread + startCleanupThread(); + } + + public static KubernetesClientPool getInstance() { + return INSTANCE; + } + + /** + * Generate cluster identifier based on kubeconfig + * @param kubeConfig kubeconfig Configuration + * @return Cluster identification + */ + public String getClusterId(String kubeConfig) { + try { + + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + byte[] hashBytes = digest.digest(kubeConfig.getBytes(StandardCharsets.UTF_8)); + + String base64Hash = Base64.getUrlEncoder().encodeToString(hashBytes); + return base64Hash.replace("=", ""); + } catch (Exception e) { + log.error("Failed to generate cluster ID", e); + return Integer.toString(kubeConfig.hashCode()); + } + } + + /** + * Obtain the Kubernetes client for the specified cluster + * @param clusterId Cluster Identifier + * @param kubeConfig kubeconfig Configuration + * @return Kubernetes Client + */ + public KubernetesClient getClient(String clusterId, String kubeConfig) { + ClusterClientPool pool = clusterClientPools.computeIfAbsent(clusterId, + k -> new ClusterClientPool(k, kubeConfig, poolConfig)); + try { + return pool.borrowObject(); + } catch (Exception e) { + log.error("Failed to get Kubernetes client", e); + return null; + } + } + + /** + * Return the Kubernetes client to the connection pool + * @param clusterId Cluster Identifier + * @param client Kubernetes Client will be returned + */ + public void returnClient(String clusterId, KubernetesClient client) { + ClusterClientPool pool = clusterClientPools.get(clusterId); + if (pool != null) { + pool.returnObject(client); + } + } + + /** + * Close the connection pool of the specified cluster + * @param clusterId Cluster Identifier + */ + public void closePool(String clusterId) { + ClusterClientPool pool = clusterClientPools.remove(clusterId); + if (pool != null) { + pool.close(); + } + } + + /** + * Start the cleanup thread to regularly clean up idle connections + */ + private void startCleanupThread() { + Thread cleanupThread = new Thread(() -> { + while (true) { + try { + // every 30s + Thread.sleep(30000); + cleanupIdleClients(); + } catch (InterruptedException e) { + log.warn("Cleanup thread interrupted", e); + Thread.currentThread().interrupt(); + break; + } + } + }, "k8s-client-cleanup-thread"); + cleanupThread.setDaemon(true); + cleanupThread.start(); + } + + /** + * clean free connections + */ + private void cleanupIdleClients() { + for (ClusterClientPool pool : clusterClientPools.values()) { + pool.cleanupIdle(); + } + } + + /** + * Configuration Class + */ + public static class PoolConfig { + + private final int maxSize; // max connection num + private final int minIdle; // min free connection num + private final int maxIdle; // max free connection num + private final long maxWaitMs; // max waiting time + private final long idleTimeoutMs; // free timeout + + public PoolConfig(int maxSize, int minIdle, int maxIdle, long maxWaitMs, long idleTimeoutMs) { + this.maxSize = maxSize; + this.minIdle = minIdle; + this.maxIdle = maxIdle; + this.maxWaitMs = maxWaitMs; + this.idleTimeoutMs = idleTimeoutMs; + } + + public int getMaxSize() { + return maxSize; + } + public int getMinIdle() { + return minIdle; + } + public int getMaxIdle() { + return maxIdle; + } + public long getMaxWaitMs() { + return maxWaitMs; + } + public long getIdleTimeoutMs() { + return idleTimeoutMs; + } + + @Override + public String toString() { + return "PoolConfig{" + + "maxSize=" + maxSize + + ", minIdle=" + minIdle + + ", maxIdle=" + maxIdle + + ", maxWaitMs=" + maxWaitMs + + ", idleTimeoutMs=" + idleTimeoutMs + + '}'; + } + } + + /** + * Cluster Connection Pool Class + */ + public static class ClusterClientPool { + + private final String clusterId; + private final String kubeConfig; + private final PoolConfig config; + + private final BlockingQueue<PooledClient> idleClients; + private final Set<PooledClient> activeClients; + private final AtomicInteger createdCount = new AtomicInteger(0); + + public ClusterClientPool(String clusterId, String kubeConfig, PoolConfig config) { + this.clusterId = clusterId; + this.kubeConfig = kubeConfig; + this.config = config; + this.idleClients = new LinkedBlockingQueue<>(); + this.activeClients = new HashSet<>(); + + // initial + initializeMinIdleConnections(); + } + + private void initializeMinIdleConnections() { + for (int i = 0; i < config.getMinIdle(); i++) { + try { + createIdleConnection(); + } catch (Exception e) { + log.error("Failed to initialize idle connection for cluster {}", clusterId, e); Review Comment: It's better to throw exception here. ########## dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java: ########## @@ -96,56 +106,90 @@ public ResourceManagerType getResourceManagerType() { /** * get driver pod * - * @param kubernetesApplicationManagerContext - * @return + * @param kubernetesApplicationManagerContext Context + * @return pods */ @SneakyThrows - private FilterWatchListDeletable<Pod, PodList, PodResource> getListenPod(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) { - KubernetesClient client = getClient(kubernetesApplicationManagerContext); + public FilterWatchListDeletable<Pod, PodList, PodResource> getListenPod(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) { + String clusterId = getClusterId(kubernetesApplicationManagerContext.getK8sTaskExecutionContext()); + KubernetesClient client = null; String labelValue = kubernetesApplicationManagerContext.getLabelValue(); List<Pod> podList = null; FilterWatchListDeletable<Pod, PodList, PodResource> watchList = null; int retryTimes = 0; - while (CollectionUtils.isEmpty(podList) && retryTimes < MAX_RETRY_TIMES) { - watchList = client.pods() - .inNamespace(kubernetesApplicationManagerContext.getK8sTaskExecutionContext().getNamespace()) - .withLabel(UNIQUE_LABEL_NAME, labelValue); - podList = watchList.list().getItems(); - if (!CollectionUtils.isEmpty(podList)) { - break; + try { + client = getClient(kubernetesApplicationManagerContext); + while (CollectionUtils.isEmpty(podList) && retryTimes < MAX_RETRY_TIMES) { + watchList = client.pods() + .inNamespace(kubernetesApplicationManagerContext.getK8sTaskExecutionContext().getNamespace()) + .withLabel(UNIQUE_LABEL_NAME, labelValue); + podList = watchList.list().getItems(); + if (!CollectionUtils.isEmpty(podList)) { + break; + } + Thread.sleep(SLEEP_TIME_MILLIS); + retryTimes += 1; + } + return watchList; + } finally { + if (client != null) { + returnClient(clusterId, client); } - Thread.sleep(SLEEP_TIME_MILLIS); - retryTimes += 1; } - - return watchList; } /** - * create client or get from cache map + * Retrieve Kubernetes clients from the connection pool * - * @param kubernetesApplicationManagerContext - * @return + * @param kubernetesApplicationManagerContext Context parameters + * @return Kubernetes Client */ - private KubernetesClient getClient(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) { + public KubernetesClient getClient(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) { K8sTaskExecutionContext k8sTaskExecutionContext = kubernetesApplicationManagerContext.getK8sTaskExecutionContext(); - return cacheClientMap.computeIfAbsent(kubernetesApplicationManagerContext.getLabelValue(), - key -> new KubernetesClientBuilder() - .withConfig(Config.fromKubeconfig(k8sTaskExecutionContext.getConfigYaml())).build()); + + // Using k8s configuration as cluster identifier + String clusterId = getClusterId(k8sTaskExecutionContext); + String kubeConfig = k8sTaskExecutionContext.getConfigYaml(); + + try { + return clientPool.getClient(clusterId, kubeConfig); + } catch (Exception e) { + log.error("Failed to get Kubernetes client from pool", e); + throw new RuntimeException("Failed to get Kubernetes client", e); + } + } + + /** + * Get Cluster Identifier + */ + public String getClusterId(K8sTaskExecutionContext k8sTaskExecutionContext) { + String kubeConfig = k8sTaskExecutionContext.getConfigYaml(); + int hashCode = kubeConfig.hashCode(); + int nonNegativeHash = hashCode & 0x7FFFFFFF; + return "k8s-cluster-" + nonNegativeHash; Review Comment: It's better to use constants here. ########## dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/KubernetesClientPool.java: ########## @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.k8s; + +import org.apache.dolphinscheduler.common.utils.PropertyUtils; +import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; + +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.util.Base64; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import lombok.extern.slf4j.Slf4j; +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; + +/** + * KubernetesClientPool is used to manage the Kubernetes client connection pool + * Maintain an independent connection pool for each K8s cluster to implement connection creation, acquisition, return, and closure + */ +@Slf4j +public class KubernetesClientPool { + + /** + * Connection pool instance + */ + private static final KubernetesClientPool INSTANCE = new KubernetesClientPool(); + + /** + * Cluster connection pool mapping, with the key being the cluster identifier + */ + private final ConcurrentMap<String, ClusterClientPool> clusterClientPools = new ConcurrentHashMap<>(); + + /** + * Connection pool configuration + */ + private final PoolConfig poolConfig; + + private KubernetesClientPool() { + this.poolConfig = new PoolConfig( + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MAX_SIZE, 10), + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MIN_IDLE, 2), + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MAX_IDLE, 5), + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MAX_WAIT_MS, 30000), + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_IDLE_TIMEOUT_MS, 600000)); + log.info("KubernetesClientPool initialized with config: {}", poolConfig); + + // clean connection thread + startCleanupThread(); + } + + public static KubernetesClientPool getInstance() { + return INSTANCE; + } + + /** + * Generate cluster identifier based on kubeconfig + * @param kubeConfig kubeconfig Configuration + * @return Cluster identification + */ + public String getClusterId(String kubeConfig) { + try { + + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + byte[] hashBytes = digest.digest(kubeConfig.getBytes(StandardCharsets.UTF_8)); + + String base64Hash = Base64.getUrlEncoder().encodeToString(hashBytes); + return base64Hash.replace("=", ""); + } catch (Exception e) { + log.error("Failed to generate cluster ID", e); + return Integer.toString(kubeConfig.hashCode()); + } + } + + /** + * Obtain the Kubernetes client for the specified cluster + * @param clusterId Cluster Identifier + * @param kubeConfig kubeconfig Configuration + * @return Kubernetes Client + */ + public KubernetesClient getClient(String clusterId, String kubeConfig) { + ClusterClientPool pool = clusterClientPools.computeIfAbsent(clusterId, + k -> new ClusterClientPool(k, kubeConfig, poolConfig)); + try { + return pool.borrowObject(); + } catch (Exception e) { + log.error("Failed to get Kubernetes client", e); + return null; + } + } + + /** + * Return the Kubernetes client to the connection pool + * @param clusterId Cluster Identifier + * @param client Kubernetes Client will be returned + */ + public void returnClient(String clusterId, KubernetesClient client) { + ClusterClientPool pool = clusterClientPools.get(clusterId); + if (pool != null) { + pool.returnObject(client); + } + } + + /** + * Close the connection pool of the specified cluster + * @param clusterId Cluster Identifier + */ + public void closePool(String clusterId) { + ClusterClientPool pool = clusterClientPools.remove(clusterId); + if (pool != null) { + pool.close(); + } + } + + /** + * Start the cleanup thread to regularly clean up idle connections + */ + private void startCleanupThread() { + Thread cleanupThread = new Thread(() -> { + while (true) { + try { + // every 30s + Thread.sleep(30000); + cleanupIdleClients(); + } catch (InterruptedException e) { + log.warn("Cleanup thread interrupted", e); + Thread.currentThread().interrupt(); + break; + } + } + }, "k8s-client-cleanup-thread"); + cleanupThread.setDaemon(true); + cleanupThread.start(); + } + + /** + * clean free connections + */ + private void cleanupIdleClients() { + for (ClusterClientPool pool : clusterClientPools.values()) { + pool.cleanupIdle(); + } + } + + /** + * Configuration Class + */ + public static class PoolConfig { + + private final int maxSize; // max connection num + private final int minIdle; // min free connection num + private final int maxIdle; // max free connection num + private final long maxWaitMs; // max waiting time + private final long idleTimeoutMs; // free timeout + + public PoolConfig(int maxSize, int minIdle, int maxIdle, long maxWaitMs, long idleTimeoutMs) { + this.maxSize = maxSize; + this.minIdle = minIdle; + this.maxIdle = maxIdle; + this.maxWaitMs = maxWaitMs; + this.idleTimeoutMs = idleTimeoutMs; + } + + public int getMaxSize() { + return maxSize; + } + public int getMinIdle() { + return minIdle; + } + public int getMaxIdle() { + return maxIdle; + } + public long getMaxWaitMs() { + return maxWaitMs; + } + public long getIdleTimeoutMs() { + return idleTimeoutMs; + } + + @Override + public String toString() { + return "PoolConfig{" + + "maxSize=" + maxSize + + ", minIdle=" + minIdle + + ", maxIdle=" + maxIdle + + ", maxWaitMs=" + maxWaitMs + + ", idleTimeoutMs=" + idleTimeoutMs + + '}'; + } Review Comment: Using `@ToString` here. ########## dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/KubernetesClientPool.java: ########## @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.k8s; + +import org.apache.dolphinscheduler.common.utils.PropertyUtils; +import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; + +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.util.Base64; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import lombok.extern.slf4j.Slf4j; +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; + +/** + * KubernetesClientPool is used to manage the Kubernetes client connection pool + * Maintain an independent connection pool for each K8s cluster to implement connection creation, acquisition, return, and closure + */ +@Slf4j +public class KubernetesClientPool { + + /** + * Connection pool instance + */ + private static final KubernetesClientPool INSTANCE = new KubernetesClientPool(); + + /** + * Cluster connection pool mapping, with the key being the cluster identifier + */ + private final ConcurrentMap<String, ClusterClientPool> clusterClientPools = new ConcurrentHashMap<>(); + + /** + * Connection pool configuration + */ + private final PoolConfig poolConfig; + + private KubernetesClientPool() { + this.poolConfig = new PoolConfig( + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MAX_SIZE, 10), + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MIN_IDLE, 2), + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MAX_IDLE, 5), + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MAX_WAIT_MS, 30000), + PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_IDLE_TIMEOUT_MS, 600000)); Review Comment: These params should add to docs and guide user how to use it. ########## dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtilsTest.java: ########## @@ -103,15 +111,41 @@ public void testGetPidList() throws Exception { Assertions.assertEquals(exceptPidList3, actualPidList3); } + /** + * 测试K8s客户端是否被正确归还到连接池 + */ Review Comment: Please avoid using chinese. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
