This is an automated email from the ASF dual-hosted git repository.

chufenggao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new c532fea00e [Improvement] Refactoring K8S task plugin with connections 
managed in connection center (#14977)
c532fea00e is described below

commit c532fea00ef876a920551067eb2f5c72e2b72d61
Author: chenrj <[email protected]>
AuthorDate: Tue Oct 31 17:07:24 2023 +0800

    [Improvement] Refactoring K8S task plugin with connections managed in 
connection center (#14977)
---
 .../dolphinscheduler-datasource-all/pom.xml        |   5 +
 .../dolphinscheduler-datasource-k8s/pom.xml        |  50 ++++++++
 .../plugin/datasource/k8s/K8sClientWrapper.java    |  55 +++++++++
 .../datasource/k8s/K8sDataSourceChannel.java       |  37 ++++++
 .../k8s/K8sDataSourceChannelFactory.java           |  38 +++---
 .../datasource/k8s/param/K8sConnectionParam.java   |  28 ++---
 .../k8s/param/K8sDataSourceParamDTO.java           |  31 ++---
 .../k8s/param/K8sDataSourceProcessor.java          | 128 +++++++++++++++++++++
 .../datasource/k8s/K8sDataSourceProcessorTest.java | 107 +++++++++++++++++
 .../SagemakerDataSourceProcessorTest.java          |   1 -
 dolphinscheduler-datasource-plugin/pom.xml         |   1 +
 .../e2e/pages/datasource/DataSourcePage.java       |   6 +
 .../apache/dolphinscheduler/spi/enums/DbType.java  |   4 +-
 .../plugin/task/api/K8sTaskExecutionContext.java   |  10 +-
 .../task/api/am/KubernetesApplicationManager.java  |   3 +
 .../plugin/task/api/k8s/K8sTaskMainParameters.java |   1 -
 .../plugin/task/api/k8s/impl/K8sTaskExecutor.java  |   4 +-
 .../task/api/parameters/K8sTaskParameters.java     |  28 ++++-
 .../plugin/task/api/k8s/K8sTaskExecutorTest.java   |  11 +-
 .../dolphinscheduler/plugin/task/k8s/K8sTask.java  |  36 ++++--
 .../plugin/task/k8s/K8sTaskChannel.java            |   2 +-
 .../plugin/task/k8s/K8sParametersTest.java         |   2 +-
 .../plugin/task/k8s/K8sTaskTest.java               |  82 +++++++++----
 .../plugin/task/sagemaker/SagemakerTaskTest.java   |   1 -
 .../plugin/task/zeppelin/ZeppelinTask.java         |   2 +-
 .../src/service/modules/data-source/types.ts       |   4 +
 .../src/views/datasource/list/detail.tsx           |  41 ++++++-
 .../src/views/datasource/list/use-form.ts          |  23 +++-
 .../task/components/node/fields/use-k8s.ts         |   8 +-
 .../projects/task/components/node/format-data.ts   |   3 +
 .../projects/task/components/node/tasks/use-k8s.ts |   7 +-
 31 files changed, 627 insertions(+), 132 deletions(-)

diff --git 
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml
index cb9c78f9d3..d1f39e1ad7 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml
@@ -148,5 +148,10 @@
             <artifactId>dolphinscheduler-datasource-sagemaker</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-datasource-k8s</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/pom.xml 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/pom.xml
new file mode 100644
index 0000000000..2d51b85fed
--- /dev/null
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/pom.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.dolphinscheduler</groupId>
+        <artifactId>dolphinscheduler-datasource-plugin</artifactId>
+        <version>dev-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>dolphinscheduler-datasource-k8s</artifactId>
+    <packaging>jar</packaging>
+    <name>${project.artifactId}</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-spi</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-datasource-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>io.fabric8</groupId>
+            <artifactId>kubernetes-client</artifactId>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git 
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sClientWrapper.java
 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sClientWrapper.java
new file mode 100644
index 0000000000..292eaf2f3d
--- /dev/null
+++ 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sClientWrapper.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.k8s;
+
+import lombok.extern.slf4j.Slf4j;
+import io.fabric8.kubernetes.api.model.NamespaceList;
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientBuilder;
+
+@Slf4j
+public class K8sClientWrapper implements AutoCloseable {
+
+    private KubernetesClient client;
+
+    public K8sClientWrapper() {
+    }
+
+    public boolean checkConnect(String kubeConfigYaml, String namespace) {
+        try {
+            Config config = Config.fromKubeconfig(kubeConfigYaml);
+            client = new KubernetesClientBuilder().withConfig(config).build();
+            NamespaceList namespaceList = client.namespaces().list();
+            if (!namespaceList.getItems().stream().anyMatch(ns -> 
ns.getMetadata().getName().equals(namespace))) {
+                log.info("failed to connect to the K8S cluster, namespace not 
found\n");
+                return false;
+            }
+            log.info("successfully connected to the K8S cluster");
+            return true;
+        } catch (Exception e) {
+            log.info("failed to connect to the K8S cluster\n");
+            return false;
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+
+    }
+}
diff --git 
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sDataSourceChannel.java
 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sDataSourceChannel.java
new file mode 100644
index 0000000000..3dac639259
--- /dev/null
+++ 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sDataSourceChannel.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.k8s;
+
+import org.apache.dolphinscheduler.spi.datasource.AdHocDataSourceClient;
+import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
+import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel;
+import org.apache.dolphinscheduler.spi.datasource.PooledDataSourceClient;
+import org.apache.dolphinscheduler.spi.enums.DbType;
+
+public class K8sDataSourceChannel implements DataSourceChannel {
+
+    @Override
+    public AdHocDataSourceClient 
createAdHocDataSourceClient(BaseConnectionParam baseConnectionParam, DbType 
dbType) {
+        throw new UnsupportedOperationException("K8S AdHocDataSourceClient is 
not supported");
+    }
+
+    @Override
+    public PooledDataSourceClient 
createPooledDataSourceClient(BaseConnectionParam baseConnectionParam, DbType 
dbType) {
+        throw new UnsupportedOperationException("K8S AdHocDataSourceClient is 
not supported");
+    }
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskMainParameters.java
 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sDataSourceChannelFactory.java
similarity index 53%
copy from 
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskMainParameters.java
copy to 
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sDataSourceChannelFactory.java
index a6c4703f10..03ec046de8 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskMainParameters.java
+++ 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sDataSourceChannelFactory.java
@@ -15,30 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.plugin.task.api.k8s;
+package org.apache.dolphinscheduler.plugin.datasource.k8s;
 
-import java.util.List;
-import java.util.Map;
+import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel;
+import org.apache.dolphinscheduler.spi.datasource.DataSourceChannelFactory;
 
-import lombok.Data;
-import io.fabric8.kubernetes.api.model.NodeSelectorRequirement;
+import com.google.auto.service.AutoService;
 
-/**
- * k8s task parameters
- */
-@Data
-public class K8sTaskMainParameters {
+@AutoService(DataSourceChannelFactory.class)
+public class K8sDataSourceChannelFactory implements DataSourceChannelFactory {
+
+    @Override
+    public DataSourceChannel create() {
+        return new K8sDataSourceChannel();
+    }
+
+    @Override
+    public String getName() {
+        return "k8s";
+    }
 
-    private String image;
-    private String command;
-    private String args;
-    private String pullSecret;
-    private String namespaceName;
-    private String clusterName;
-    private String imagePullPolicy;
-    private double minCpuCores;
-    private double minMemorySpace;
-    private Map<String, String> paramsMap;
-    private Map<String, String> labelMap;
-    private List<NodeSelectorRequirement> nodeSelectorRequirements;
 }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskMainParameters.java
 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/param/K8sConnectionParam.java
similarity index 55%
copy from 
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskMainParameters.java
copy to 
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/param/K8sConnectionParam.java
index a6c4703f10..09e3e706a1 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskMainParameters.java
+++ 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/param/K8sConnectionParam.java
@@ -15,30 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.plugin.task.api.k8s;
+package org.apache.dolphinscheduler.plugin.datasource.k8s.param;
 
-import java.util.List;
-import java.util.Map;
+import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
 
 import lombok.Data;
-import io.fabric8.kubernetes.api.model.NodeSelectorRequirement;
 
-/**
- * k8s task parameters
- */
+import com.fasterxml.jackson.annotation.JsonInclude;
+
 @Data
-public class K8sTaskMainParameters {
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class K8sConnectionParam implements ConnectionParam {
 
-    private String image;
-    private String command;
-    private String args;
-    private String pullSecret;
-    private String namespaceName;
-    private String clusterName;
-    private String imagePullPolicy;
-    private double minCpuCores;
-    private double minMemorySpace;
-    private Map<String, String> paramsMap;
-    private Map<String, String> labelMap;
-    private List<NodeSelectorRequirement> nodeSelectorRequirements;
+    protected String kubeConfig;
+    protected String namespace;
 }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskMainParameters.java
 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/param/K8sDataSourceParamDTO.java
similarity index 55%
copy from 
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskMainParameters.java
copy to 
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/param/K8sDataSourceParamDTO.java
index a6c4703f10..f2ea50a40b 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskMainParameters.java
+++ 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/param/K8sDataSourceParamDTO.java
@@ -15,30 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.plugin.task.api.k8s;
+package org.apache.dolphinscheduler.plugin.datasource.k8s.param;
 
-import java.util.List;
-import java.util.Map;
+import 
org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
+import org.apache.dolphinscheduler.spi.enums.DbType;
 
 import lombok.Data;
-import io.fabric8.kubernetes.api.model.NodeSelectorRequirement;
 
-/**
- * k8s task parameters
- */
 @Data
-public class K8sTaskMainParameters {
+public class K8sDataSourceParamDTO extends BaseDataSourceParamDTO {
+
+    protected String kubeConfig;
+    protected String namespace;
 
-    private String image;
-    private String command;
-    private String args;
-    private String pullSecret;
-    private String namespaceName;
-    private String clusterName;
-    private String imagePullPolicy;
-    private double minCpuCores;
-    private double minMemorySpace;
-    private Map<String, String> paramsMap;
-    private Map<String, String> labelMap;
-    private List<NodeSelectorRequirement> nodeSelectorRequirements;
+    @Override
+    public DbType getType() {
+        return DbType.K8S;
+    }
 }
diff --git 
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/param/K8sDataSourceProcessor.java
 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/param/K8sDataSourceProcessor.java
new file mode 100644
index 0000000000..8f61e8bb45
--- /dev/null
+++ 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/main/java/org/apache/dolphinscheduler/plugin/datasource/k8s/param/K8sDataSourceProcessor.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.k8s.param;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import 
org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
+import 
org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
+import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
+import org.apache.dolphinscheduler.plugin.datasource.k8s.K8sClientWrapper;
+import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
+import org.apache.dolphinscheduler.spi.enums.DbType;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.text.MessageFormat;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(DataSourceProcessor.class)
+@Slf4j
+public class K8sDataSourceProcessor implements DataSourceProcessor {
+
+    @Override
+    public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) {
+        return JSONUtils.parseObject(paramJson, K8sDataSourceParamDTO.class);
+
+    }
+
+    @Override
+    public void checkDatasourceParam(BaseDataSourceParamDTO datasourceParam) {
+        K8sDataSourceParamDTO k8sDataSourceParamDTO = (K8sDataSourceParamDTO) 
datasourceParam;
+        if (StringUtils.isEmpty(k8sDataSourceParamDTO.getKubeConfig())) {
+            throw new IllegalArgumentException("sagemaker datasource param is 
not valid");
+        }
+    }
+
+    @Override
+    public String getDatasourceUniqueId(ConnectionParam connectionParam, 
DbType dbType) {
+        K8sConnectionParam baseConnectionParam = (K8sConnectionParam) 
connectionParam;
+        return MessageFormat.format("{0}@{1}@{2}", dbType.getDescp(),
+                
PasswordUtils.encodePassword(baseConnectionParam.getKubeConfig()), 
baseConnectionParam.getNamespace());
+    }
+
+    @Override
+    public BaseDataSourceParamDTO createDatasourceParamDTO(String 
connectionJson) {
+        K8sConnectionParam connectionParams = (K8sConnectionParam) 
createConnectionParams(connectionJson);
+        K8sDataSourceParamDTO k8sDataSourceParamDTO = new 
K8sDataSourceParamDTO();
+        k8sDataSourceParamDTO.setKubeConfig(connectionParams.getKubeConfig());
+        k8sDataSourceParamDTO.setNamespace(connectionParams.getNamespace());
+        return k8sDataSourceParamDTO;
+    }
+
+    @Override
+    public K8sConnectionParam createConnectionParams(BaseDataSourceParamDTO 
datasourceParam) {
+        K8sDataSourceParamDTO k8sDataSourceParam = (K8sDataSourceParamDTO) 
datasourceParam;
+        K8sConnectionParam k8sConnectionParam = new K8sConnectionParam();
+        k8sConnectionParam.setKubeConfig(k8sDataSourceParam.getKubeConfig());
+        k8sConnectionParam.setNamespace(k8sDataSourceParam.getNamespace());
+        return k8sConnectionParam;
+    }
+
+    @Override
+    public ConnectionParam createConnectionParams(String connectionJson) {
+        return JSONUtils.parseObject(connectionJson, K8sConnectionParam.class);
+    }
+
+    @Override
+    public String getDatasourceDriver() {
+        return "";
+    }
+
+    @Override
+    public String getValidationQuery() {
+        return "";
+    }
+
+    @Override
+    public String getJdbcUrl(ConnectionParam connectionParam) {
+        return "";
+    }
+
+    @Override
+    public Connection getConnection(ConnectionParam connectionParam) throws 
ClassNotFoundException, SQLException, IOException {
+        return null;
+    }
+
+    @Override
+    public boolean checkDataSourceConnectivity(ConnectionParam 
connectionParam) {
+        K8sConnectionParam baseConnectionParam = (K8sConnectionParam) 
connectionParam;
+        try (
+                K8sClientWrapper k8sClientWrapper = new K8sClientWrapper()) {
+            return 
k8sClientWrapper.checkConnect(baseConnectionParam.kubeConfig, 
baseConnectionParam.namespace);
+        } catch (Exception e) {
+            log.error("failed to connect to the K8S cluster", e);
+            return false;
+        }
+    }
+
+    @Override
+    public DbType getDbType() {
+        return DbType.K8S;
+    }
+
+    @Override
+    public DataSourceProcessor create() {
+        return new K8sDataSourceProcessor();
+    }
+}
diff --git 
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/test/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sDataSourceProcessorTest.java
 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/test/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sDataSourceProcessorTest.java
new file mode 100644
index 0000000000..fe41f97686
--- /dev/null
+++ 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-k8s/src/test/java/org/apache/dolphinscheduler/plugin/datasource/k8s/K8sDataSourceProcessorTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.k8s;
+
+import 
org.apache.dolphinscheduler.plugin.datasource.k8s.param.K8sConnectionParam;
+import 
org.apache.dolphinscheduler.plugin.datasource.k8s.param.K8sDataSourceParamDTO;
+import 
org.apache.dolphinscheduler.plugin.datasource.k8s.param.K8sDataSourceProcessor;
+import org.apache.dolphinscheduler.spi.enums.DbType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.MockedConstruction;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class K8sDataSourceProcessorTest {
+
+    private K8sDataSourceProcessor k8sDataSourceProcessor;
+
+    private String connectJson =
+            "{\"namespace\":\"namespace\",\"kubeConfig\":\"kubeConfig\"}";
+
+    @BeforeEach
+    public void init() {
+        k8sDataSourceProcessor = new K8sDataSourceProcessor();
+    }
+
+    @Test
+    void testCheckDatasourceParam() {
+        K8sDataSourceParamDTO k8sDataSourceParamDTO = new 
K8sDataSourceParamDTO();
+        Assertions.assertThrows(IllegalArgumentException.class,
+                () -> 
k8sDataSourceProcessor.checkDatasourceParam(k8sDataSourceParamDTO));
+        k8sDataSourceParamDTO.setNamespace("namespace");
+        Assertions.assertThrows(IllegalArgumentException.class,
+                () -> 
k8sDataSourceProcessor.checkDatasourceParam(k8sDataSourceParamDTO));
+        k8sDataSourceParamDTO.setKubeConfig("kubeConfig");
+        Assertions
+                .assertDoesNotThrow(
+                        () -> 
k8sDataSourceProcessor.checkDatasourceParam(k8sDataSourceParamDTO));
+    }
+
+    @Test
+    void testGetDatasourceUniqueId() {
+        K8sConnectionParam k8sConnectionParam = new K8sConnectionParam();
+        k8sConnectionParam.setNamespace("namespace");
+        k8sConnectionParam.setKubeConfig("kubeConfig");
+        Assertions.assertEquals("k8s@kubeConfig@namespace",
+                
k8sDataSourceProcessor.getDatasourceUniqueId(k8sConnectionParam, DbType.K8S));
+
+    }
+
+    @Test
+    void testCreateDatasourceParamDTO() {
+        K8sDataSourceParamDTO k8sDataSourceParamDTO =
+                (K8sDataSourceParamDTO) 
k8sDataSourceProcessor.createDatasourceParamDTO(connectJson);
+        Assertions.assertEquals("namespace", 
k8sDataSourceParamDTO.getNamespace());
+        Assertions.assertEquals("kubeConfig", 
k8sDataSourceParamDTO.getKubeConfig());
+    }
+
+    @Test
+    void testCreateConnectionParams() {
+        K8sDataSourceParamDTO k8sDataSourceParamDTO =
+                (K8sDataSourceParamDTO) 
k8sDataSourceProcessor.createDatasourceParamDTO(connectJson);
+        K8sConnectionParam k8sConnectionParam =
+                
k8sDataSourceProcessor.createConnectionParams(k8sDataSourceParamDTO);
+        Assertions.assertEquals("namespace", 
k8sConnectionParam.getNamespace());
+        Assertions.assertEquals("kubeConfig", 
k8sConnectionParam.getKubeConfig());
+    }
+
+    @Test
+    void testTestConnection() {
+        K8sDataSourceParamDTO k8sDataSourceParamDTO =
+                (K8sDataSourceParamDTO) 
k8sDataSourceProcessor.createDatasourceParamDTO(connectJson);
+        K8sConnectionParam connectionParam =
+                
k8sDataSourceProcessor.createConnectionParams(k8sDataSourceParamDTO);
+        
Assertions.assertFalse(k8sDataSourceProcessor.checkDataSourceConnectivity(connectionParam));
+
+        try (
+                MockedConstruction<K8sClientWrapper> 
k8sClientWrapperMockedConstruction =
+                        Mockito.mockConstruction(K8sClientWrapper.class, 
(mock, context) -> {
+                            Mockito.when(
+                                    
mock.checkConnect(connectionParam.getKubeConfig(), 
connectionParam.getNamespace()))
+                                    .thenReturn(true);
+                        })) {
+            
Assertions.assertTrue(k8sDataSourceProcessor.checkDataSourceConnectivity(connectionParam));
+        }
+
+    }
+}
diff --git 
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceProcessorTest.java
 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceProcessorTest.java
index 424f478699..395e7e5d3f 100644
--- 
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceProcessorTest.java
+++ 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceProcessorTest.java
@@ -31,7 +31,6 @@ import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
 
 @ExtendWith(MockitoExtension.class)
-
 public class SagemakerDataSourceProcessorTest {
 
     private SagemakerDataSourceProcessor sagemakerDataSourceProcessor;
diff --git a/dolphinscheduler-datasource-plugin/pom.xml 
b/dolphinscheduler-datasource-plugin/pom.xml
index fb08aa7f6d..f5c59cfab5 100644
--- a/dolphinscheduler-datasource-plugin/pom.xml
+++ b/dolphinscheduler-datasource-plugin/pom.xml
@@ -54,6 +54,7 @@
         <module>dolphinscheduler-datasource-vertica</module>
         <module>dolphinscheduler-datasource-doris</module>
         <module>dolphinscheduler-datasource-sagemaker</module>
+        <module>dolphinscheduler-datasource-k8s</module>
     </modules>
 
     <dependencyManagement>
diff --git 
a/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/datasource/DataSourcePage.java
 
b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/datasource/DataSourcePage.java
index 3b8633443d..bd2f7e795b 100644
--- 
a/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/datasource/DataSourcePage.java
+++ 
b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/datasource/DataSourcePage.java
@@ -193,5 +193,11 @@ public class DataSourcePage extends NavBarPage implements 
NavBarPage.NavBarItem
         })
         private WebElement inputZeppelinRestEndpoint;
 
+        @FindBys({
+                @FindBy(className = "input-kubeConfig"),
+                @FindBy(tagName = "textarea"),
+        })
+        private WebElement inputKubeConfig;
+
     }
 }
diff --git 
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java
 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java
index 2dd8958924..e7ebbeee0a 100644
--- 
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java
+++ 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java
@@ -53,7 +53,9 @@ public enum DbType {
     HANA(22, "hana"),
     DORIS(23, "doris"),
     ZEPPELIN(24, "zeppelin"),
-    SAGEMAKER(25, "sagemaker");
+    SAGEMAKER(25, "sagemaker"),
+
+    K8S(26, "k8s");
     private static final Map<Integer, DbType> DB_TYPE_MAP =
             Arrays.stream(DbType.values()).collect(toMap(DbType::getCode, 
Functions.identity()));
     @EnumValue
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java
index aa5bf62fe2..4138e5465f 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java
@@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.plugin.task.api;
 
 import java.io.Serializable;
 
-import lombok.Value;
+import lombok.Data;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -27,13 +27,18 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 /**
  *  k8s Task ExecutionContext
  */
-@Value
+@Data
 public class K8sTaskExecutionContext implements Serializable {
 
     private String configYaml;
 
     private String namespace;
 
+    private String connectionParams;
+
+    public K8sTaskExecutionContext() {
+    }
+
     @JsonCreator(mode = JsonCreator.Mode.PROPERTIES)
     public K8sTaskExecutionContext(
                                    @JsonProperty("configYaml") String 
configYaml,
@@ -47,6 +52,7 @@ public class K8sTaskExecutionContext implements Serializable {
         return "K8sTaskExecutionContext{"
                 + "namespace=" + namespace
                 + ", configYaml='" + configYaml + '\''
+                + ", connectionParams='" + connectionParams + '\''
                 + '}';
     }
 }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java
index ac1ce69f76..a18637a4ff 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java
@@ -22,6 +22,7 @@ import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.UNIQUE_L
 
 import org.apache.dolphinscheduler.common.enums.ResourceManagerType;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@@ -131,6 +132,8 @@ public class KubernetesApplicationManager implements 
ApplicationManager {
     private KubernetesClient getClient(KubernetesApplicationManagerContext 
kubernetesApplicationManagerContext) {
         K8sTaskExecutionContext k8sTaskExecutionContext =
                 
kubernetesApplicationManagerContext.getK8sTaskExecutionContext();
+        k8sTaskExecutionContext
+                
.setConfigYaml(JSONUtils.getNodeString(k8sTaskExecutionContext.getConnectionParams(),
 "kubeConfig"));
         return 
cacheClientMap.computeIfAbsent(kubernetesApplicationManagerContext.getLabelValue(),
                 key -> new KubernetesClientBuilder()
                         
.withConfig(Config.fromKubeconfig(k8sTaskExecutionContext.getConfigYaml())).build());
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskMainParameters.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskMainParameters.java
index a6c4703f10..8adc912161 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskMainParameters.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskMainParameters.java
@@ -34,7 +34,6 @@ public class K8sTaskMainParameters {
     private String args;
     private String pullSecret;
     private String namespaceName;
-    private String clusterName;
     private String imagePullPolicy;
     private double minCpuCores;
     private double minMemorySpace;
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
index 2a30e4e537..66665512b6 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
@@ -295,7 +295,9 @@ public class K8sTaskExecutor extends 
AbstractK8sTaskExecutor {
                 return result;
             }
             K8sTaskExecutionContext k8sTaskExecutionContext = 
taskRequest.getK8sTaskExecutionContext();
-            String configYaml = k8sTaskExecutionContext.getConfigYaml();
+            String connectionParams = 
k8sTaskExecutionContext.getConnectionParams();
+            String kubeConfig = JSONUtils.getNodeString(connectionParams, 
"kubeConfig");
+            String configYaml = kubeConfig;
             k8sUtils.buildClient(configYaml);
             submitJob2k8s(k8sParameterStr);
             parsePodLogOutput();
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/K8sTaskParameters.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/K8sTaskParameters.java
index ba9f6498af..d3d5e4963b 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/K8sTaskParameters.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/K8sTaskParameters.java
@@ -17,21 +17,28 @@
 
 package org.apache.dolphinscheduler.plugin.task.api.parameters;
 
+import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType;
 import org.apache.dolphinscheduler.plugin.task.api.model.Label;
 import 
org.apache.dolphinscheduler.plugin.task.api.model.NodeSelectorExpression;
 import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
+import 
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters;
+import 
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
 
 import org.apache.commons.lang3.StringUtils;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 
 import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
 
 /**
  * k8s task parameters
  */
 @Data
+@Slf4j
 public class K8sTaskParameters extends AbstractParameters {
 
     private String image;
@@ -44,14 +51,29 @@ public class K8sTaskParameters extends AbstractParameters {
     private double minMemorySpace;
     private List<Label> customizedLabels;
     private List<NodeSelectorExpression> nodeSelectors;
-
+    private String kubeConfig;
+    private int datasource;
+    private String type;
     @Override
     public boolean checkParameters() {
-        return StringUtils.isNotEmpty(image) && 
StringUtils.isNotEmpty(namespace);
+        return StringUtils.isNotEmpty(image);
+    }
+    public K8sTaskExecutionContext 
generateExtendedContext(ResourceParametersHelper parametersHelper) {
+        DataSourceParameters dataSourceParameters =
+                (DataSourceParameters) 
parametersHelper.getResourceParameters(ResourceType.DATASOURCE, datasource);
+        K8sTaskExecutionContext k8sTaskExecutionContext = new 
K8sTaskExecutionContext();
+        k8sTaskExecutionContext.setConnectionParams(
+                Objects.nonNull(dataSourceParameters) ? 
dataSourceParameters.getConnectionParams() : null);
+        return k8sTaskExecutionContext;
     }
-
     @Override
     public List<ResourceInfo> getResourceFilesList() {
         return new ArrayList<>();
     }
+    @Override
+    public ResourceParametersHelper getResources() {
+        ResourceParametersHelper resources = super.getResources();
+        resources.put(ResourceType.DATASOURCE, datasource);
+        return resources;
+    }
 }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java
index 76d37c5fbf..2a793dc80a 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java
@@ -17,12 +17,9 @@
 
 package org.apache.dolphinscheduler.plugin.task.api.k8s;
 
-import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CLUSTER;
 import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
-import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.NAMESPACE_NAME;
 import static 
org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils.VAR_DELIMITER;
 
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.k8s.impl.K8sTaskExecutor;
@@ -50,7 +47,7 @@ public class K8sTaskExecutorTest {
     private K8sTaskMainParameters k8sTaskMainParameters = null;
     private final String image = "ds-dev";
     private final String imagePullPolicy = "IfNotPresent";
-    private final String namespace = 
"{\"name\":\"default\",\"cluster\":\"lab\"}";
+    private final String namespace = "namespace";
     private final double minCpuCores = 2;
     private final double minMemorySpace = 10;
     private final int taskInstanceId = 1000;
@@ -61,9 +58,6 @@ public class K8sTaskExecutorTest {
         TaskExecutionContext taskRequest = new TaskExecutionContext();
         taskRequest.setTaskInstanceId(taskInstanceId);
         taskRequest.setTaskName(taskName);
-        Map<String, String> namespace = JSONUtils.toMap(this.namespace);
-        String namespaceName = namespace.get(NAMESPACE_NAME);
-        String clusterName = namespace.get(CLUSTER);
         Map<String, String> labelMap = new HashMap<>();
         labelMap.put("test", "1234");
 
@@ -75,8 +69,7 @@ public class K8sTaskExecutorTest {
         k8sTaskMainParameters = new K8sTaskMainParameters();
         k8sTaskMainParameters.setImage(image);
         k8sTaskMainParameters.setImagePullPolicy(imagePullPolicy);
-        k8sTaskMainParameters.setNamespaceName(namespaceName);
-        k8sTaskMainParameters.setClusterName(clusterName);
+        k8sTaskMainParameters.setNamespaceName(namespace);
         k8sTaskMainParameters.setMinCpuCores(minCpuCores);
         k8sTaskMainParameters.setMinMemorySpace(minMemorySpace);
         k8sTaskMainParameters.setCommand("[\"perl\" ,\"-Mbignum=bpi\", 
\"-wle\", \"print bpi(2000)\"]");
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java
index 9b42481c6c..c02adbfa76 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java
@@ -17,10 +17,10 @@
 
 package org.apache.dolphinscheduler.plugin.task.k8s;
 
-import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CLUSTER;
-import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.NAMESPACE_NAME;
-
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
+import 
org.apache.dolphinscheduler.plugin.datasource.k8s.param.K8sConnectionParam;
+import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.k8s.AbstractK8sTask;
@@ -31,6 +31,7 @@ import 
org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import 
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
 import 
org.apache.dolphinscheduler.plugin.task.api.parameters.K8sTaskParameters;
 import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
+import org.apache.dolphinscheduler.spi.enums.DbType;
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -54,19 +55,37 @@ public class K8sTask extends AbstractK8sTask {
     /**
      * task parameters
      */
-    private final K8sTaskParameters k8sTaskParameters;
+    private K8sTaskParameters k8sTaskParameters;
+
+    private K8sTaskExecutionContext k8sTaskExecutionContext;
 
+    private K8sConnectionParam k8sConnectionParam;
     /**
      * @param taskRequest taskRequest
      */
     public K8sTask(TaskExecutionContext taskRequest) {
         super(taskRequest);
         this.taskExecutionContext = taskRequest;
-        this.k8sTaskParameters = 
JSONUtils.parseObject(taskExecutionContext.getTaskParams(), 
K8sTaskParameters.class);
-        log.info("Initialize k8s task parameters {}", 
JSONUtils.toPrettyJsonString(k8sTaskParameters));
+    }
+
+    @Override
+    public void init() {
+        String taskParams = taskExecutionContext.getTaskParams();
+        k8sTaskParameters = JSONUtils.parseObject(taskParams, 
K8sTaskParameters.class);
         if (k8sTaskParameters == null || !k8sTaskParameters.checkParameters()) 
{
             throw new TaskException("K8S task params is not valid");
         }
+
+        k8sTaskExecutionContext =
+                
k8sTaskParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
+        taskRequest.setK8sTaskExecutionContext(k8sTaskExecutionContext);
+        k8sConnectionParam =
+                (K8sConnectionParam) 
DataSourceUtils.buildConnectionParams(DbType.valueOf(k8sTaskParameters.getType()),
+                        k8sTaskExecutionContext.getConnectionParams());
+        String kubeConfig = k8sConnectionParam.getKubeConfig();
+        k8sTaskParameters.setNamespace(k8sConnectionParam.getNamespace());
+        k8sTaskParameters.setKubeConfig(kubeConfig);
+        log.info("Initialize k8s task params:{}", 
JSONUtils.toPrettyJsonString(k8sTaskParameters));
     }
 
     @Override
@@ -83,13 +102,10 @@ public class K8sTask extends AbstractK8sTask {
     protected String buildCommand() {
         K8sTaskMainParameters k8sTaskMainParameters = new 
K8sTaskMainParameters();
         Map<String, Property> paramsMap = 
taskExecutionContext.getPrepareParamsMap();
-        Map<String, String> namespace = 
JSONUtils.toMap(k8sTaskParameters.getNamespace());
-        String namespaceName = namespace.get(NAMESPACE_NAME);
-        String clusterName = namespace.get(CLUSTER);
+        String namespaceName = k8sTaskParameters.getNamespace();
         k8sTaskMainParameters.setImage(k8sTaskParameters.getImage());
         k8sTaskMainParameters.setPullSecret(k8sTaskParameters.getPullSecret());
         k8sTaskMainParameters.setNamespaceName(namespaceName);
-        k8sTaskMainParameters.setClusterName(clusterName);
         
k8sTaskMainParameters.setMinCpuCores(k8sTaskParameters.getMinCpuCores());
         
k8sTaskMainParameters.setMinMemorySpace(k8sTaskParameters.getMinMemorySpace());
         k8sTaskMainParameters.setParamsMap(ParameterUtils.convert(paramsMap));
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskChannel.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskChannel.java
index f15ab4496e..91c0faf684 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskChannel.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskChannel.java
@@ -40,7 +40,7 @@ public class K8sTaskChannel implements TaskChannel {
 
     @Override
     public ResourceParametersHelper getResources(String parameters) {
-        return null;
+        return JSONUtils.parseObject(parameters, 
K8sTaskParameters.class).getResources();
     }
 
     @Override
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sParametersTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sParametersTest.java
index eb65ede9d0..543ec3e4bc 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sParametersTest.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sParametersTest.java
@@ -33,7 +33,7 @@ public class K8sParametersTest {
     private K8sTaskParameters k8sTaskParameters = null;
     private final String image = "ds-dev";
     private final String imagePullPolicy = "IfNotPresent";
-    private final String namespace = 
"{\"name\":\"default\",\"cluster\":\"lab\"}";
+    private final String namespace = "namespace";
     private final double minCpuCores = 2;
     private final double minMemorySpace = 10;
     private final String command = "[\"/bin/bash\", \"-c\"]";
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java
index 7699cd489b..ef21d34e8e 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java
@@ -18,8 +18,13 @@
 package org.apache.dolphinscheduler.plugin.task.k8s;
 
 import static 
org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils.VAR_DELIMITER;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
+import 
org.apache.dolphinscheduler.plugin.datasource.k8s.param.K8sConnectionParam;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
 import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
@@ -27,6 +32,7 @@ import 
org.apache.dolphinscheduler.plugin.task.api.model.Label;
 import 
org.apache.dolphinscheduler.plugin.task.api.model.NodeSelectorExpression;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import 
org.apache.dolphinscheduler.plugin.task.api.parameters.K8sTaskParameters;
+import 
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -34,9 +40,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 
 import io.fabric8.kubernetes.api.model.NodeSelectorRequirement;
 
@@ -50,7 +59,7 @@ public class K8sTaskTest {
 
     private final String pullSecret = "ds-secret";
 
-    private final String namespace = 
"{\"name\":\"default\",\"cluster\":\"lab\"}";
+    private final String namespace = "namespace";
 
     private final double minCpuCores = 2;
 
@@ -62,13 +71,53 @@ public class K8sTaskTest {
     private final String date = "20220507";
     private final String command = "[\"/bin/bash\", \"-c\"]";
     private final String args = "[\"echo hello world\"]";
+    private final String kubeConfig = "{}";
+    private final String type = "K8S";
+
+    private final Map<String, Property> prepareParamsMap = new HashMap<String, 
Property>() {
+
+        {
+            put(DAY, new Property() {
+
+                {
+                    setProp(DAY);
+                    setValue(date);
+                }
+            });
+        }
+    };
+
+    private final int datasource = 0;
     private final List<Label> labels = Arrays.asList(new Label("test", 
"1234"));
     private final List<NodeSelectorExpression> nodeSelectorExpressions =
             Arrays.asList(new NodeSelectorExpression("node-label", "In", 
"1234,12345"));
 
+    private static MockedStatic<DataSourceUtils> dataSourceUtilsStaticMock = 
null;
+
     @BeforeEach
     public void before() {
-        k8sTaskParameters = new K8sTaskParameters();
+        String k8sTaskParameters = buildK8sTaskParameters();
+        TaskExecutionContext taskExecutionContext = 
mock(TaskExecutionContext.class);
+        ResourceParametersHelper resourceParametersHelper = 
mock(ResourceParametersHelper.class);
+        K8sConnectionParam k8sConnectionParam = mock(K8sConnectionParam.class);
+        
when(taskExecutionContext.getTaskParams()).thenReturn(k8sTaskParameters);
+        when(k8sConnectionParam.getNamespace()).thenReturn(namespace);
+        when(k8sConnectionParam.getKubeConfig()).thenReturn(kubeConfig);
+        
when(taskExecutionContext.getPrepareParamsMap()).thenReturn(prepareParamsMap);
+        
when(taskExecutionContext.getResourceParametersHelper()).thenReturn(resourceParametersHelper);
+        dataSourceUtilsStaticMock = Mockito.mockStatic(DataSourceUtils.class);
+        dataSourceUtilsStaticMock.when(() -> 
DataSourceUtils.buildConnectionParams(Mockito.any(), Mockito.any()))
+                .thenReturn(k8sConnectionParam);
+        k8sTask = spy(new K8sTask(taskExecutionContext));
+        k8sTask.init();
+    }
+
+    @AfterEach
+    public void afterEach() {
+        dataSourceUtilsStaticMock.close();
+    }
+    private String buildK8sTaskParameters() {
+        K8sTaskParameters k8sTaskParameters = new K8sTaskParameters();
         k8sTaskParameters.setImage(image);
         k8sTaskParameters.setImagePullPolicy(imagePullPolicy);
         k8sTaskParameters.setNamespace(namespace);
@@ -80,32 +129,15 @@ public class K8sTaskTest {
         k8sTaskParameters.setNodeSelectors(nodeSelectorExpressions);
         k8sTaskParameters.setLocalParams(new ArrayList<>());
         k8sTaskParameters.setPullSecret(pullSecret);
-        TaskExecutionContext taskRequest = new TaskExecutionContext();
-        taskRequest.setTaskInstanceId(taskInstanceId);
-        taskRequest.setTaskName(taskName);
-        taskRequest.setTaskParams(JSONUtils.toJsonString(k8sTaskParameters));
-        Property property = new Property();
-        property.setProp(DAY);
-        property.setDirect(Direct.IN);
-        property.setType(DataType.VARCHAR);
-        property.setValue(date);
-        Map<String, Property> paramsMap = new HashMap<>();
-        paramsMap.put(DAY, property);
-        taskRequest.setParamsMap(paramsMap);
-
-        Map<String, Property> prepareParamsMap = new HashMap<>();
-        Property property1 = new Property();
-        property1.setProp("day");
-        property1.setValue("20220507");
-        prepareParamsMap.put("day", property1);
-        taskRequest.setPrepareParamsMap(prepareParamsMap);
-        k8sTask = new K8sTask(taskRequest);
+        k8sTaskParameters.setType(type);
+        k8sTaskParameters.setKubeConfig(kubeConfig);
+        k8sTaskParameters.setDatasource(datasource);
+        return JSONUtils.toJsonString(k8sTaskParameters);
     }
-
     @Test
     public void testBuildCommandNormal() {
         String expectedStr =
-                "{\"image\":\"ds-dev\",\"command\":\"[\\\"/bin/bash\\\", 
\\\"-c\\\"]\",\"args\":\"[\\\"echo hello 
world\\\"]\",\"pullSecret\":\"ds-secret\",\"namespaceName\":\"default\",\"clusterName\":\"lab\",\"imagePullPolicy\":\"IfNotPresent\",\"minCpuCores\":2.0,\"minMemorySpace\":10.0,\"paramsMap\":{\"day\":\"20220507\"},\"labelMap\":{\"test\":\"1234\"},\"nodeSelectorRequirements\":[{\"key\":\"node-label\",\"operator\":\"In\",\"values\":[\"1234\",\"12345\"]}]}";
+                "{\"image\":\"ds-dev\",\"command\":\"[\\\"/bin/bash\\\", 
\\\"-c\\\"]\",\"args\":\"[\\\"echo hello 
world\\\"]\",\"pullSecret\":\"ds-secret\",\"namespaceName\":\"namespace\",\"imagePullPolicy\":\"IfNotPresent\",\"minCpuCores\":2.0,\"minMemorySpace\":10.0,\"paramsMap\":{\"day\":\"20220507\"},\"labelMap\":{\"test\":\"1234\"},\"nodeSelectorRequirements\":[{\"key\":\"node-label\",\"operator\":\"In\",\"values\":[\"1234\",\"12345\"]}]}";
         String commandStr = k8sTask.buildCommand();
         Assertions.assertEquals(expectedStr, commandStr);
     }
@@ -113,7 +145,7 @@ public class K8sTaskTest {
     @Test
     public void testGetParametersNormal() {
         String expectedStr =
-                "K8sTaskParameters(image=ds-dev, 
namespace={\"name\":\"default\",\"cluster\":\"lab\"}, command=[\"/bin/bash\", 
\"-c\"], args=[\"echo hello world\"], pullSecret=ds-secret, 
imagePullPolicy=IfNotPresent, minCpuCores=2.0, minMemorySpace=10.0, 
customizedLabels=[Label(label=test, value=1234)], 
nodeSelectors=[NodeSelectorExpression(key=node-label, operator=In, 
values=1234,12345)])";
+                "K8sTaskParameters(image=ds-dev, namespace=namespace, 
command=[\"/bin/bash\", \"-c\"], args=[\"echo hello world\"], 
pullSecret=ds-secret, imagePullPolicy=IfNotPresent, minCpuCores=2.0, 
minMemorySpace=10.0, customizedLabels=[Label(label=test, value=1234)], 
nodeSelectors=[NodeSelectorExpression(key=node-label, operator=In, 
values=1234,12345)], kubeConfig={}, datasource=0, type=K8S)";
         String result = k8sTask.getParameters().toString();
         Assertions.assertEquals(expectedStr, result);
     }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskTest.java
index bbe3136dbf..0c8dd6cf6b 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskTest.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskTest.java
@@ -66,7 +66,6 @@ public class SagemakerTaskTest {
     @BeforeEach
     public void before() {
         String parameters = buildParameters();
-        System.out.println(parameters);
         TaskExecutionContext taskExecutionContext = 
Mockito.mock(TaskExecutionContext.class);
         ResourceParametersHelper resourceParametersHelper = 
Mockito.mock(ResourceParametersHelper.class);
         SagemakerConnectionParam sagemakerConnectionParam = 
Mockito.mock(SagemakerConnectionParam.class);
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
index f6fa8e89ff..01459111b5 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
@@ -91,7 +91,7 @@ public class ZeppelinTask extends AbstractRemoteTask {
         zeppelinParameters.setUsername(zeppelinConnectionParam.getUsername());
         zeppelinParameters.setPassword(zeppelinConnectionParam.getPassword());
         
zeppelinParameters.setRestEndpoint(zeppelinConnectionParam.getRestEndpoint());
-        log.info("Initialize zeppelin task params:{}", 
JSONUtils.toPrettyJsonString(taskParams));
+        log.info("Initialize zeppelin task params:{}", 
JSONUtils.toPrettyJsonString(zeppelinParameters));
         this.zClient = getZeppelinClient();
     }
 
diff --git a/dolphinscheduler-ui/src/service/modules/data-source/types.ts 
b/dolphinscheduler-ui/src/service/modules/data-source/types.ts
index a5fbb1a8a5..444f5293dd 100644
--- a/dolphinscheduler-ui/src/service/modules/data-source/types.ts
+++ b/dolphinscheduler-ui/src/service/modules/data-source/types.ts
@@ -41,6 +41,7 @@ type IDataBase =
   | 'KYUUBI'
   | 'ZEPPELIN'
   | 'SAGEMAKER'
+  | 'K8S'
 
 type IDataBaseLabel =
   | 'MYSQL'
@@ -63,6 +64,7 @@ type IDataBaseLabel =
   | 'KYUUBI'
   | 'ZEPPELIN'
   | 'SAGEMAKER'
+  | 'K8S'
 
 interface IDataSource {
   id?: number
@@ -85,6 +87,8 @@ interface IDataSource {
   other?: object
   endpoint?: string
   restEndpoint?: string
+  kubeConfig?: string
+  namespace?: string
   MSIClientId?: string
   dbUser?: string
   compatibleMode?: string
diff --git a/dolphinscheduler-ui/src/views/datasource/list/detail.tsx 
b/dolphinscheduler-ui/src/views/datasource/list/detail.tsx
index 3a7c1e6074..4842651290 100644
--- a/dolphinscheduler-ui/src/views/datasource/list/detail.tsx
+++ b/dolphinscheduler-ui/src/views/datasource/list/detail.tsx
@@ -163,6 +163,8 @@ const DetailModal = defineComponent({
       showDataBaseName,
       showJDBCConnectParameters,
       showPublicKey,
+      showNamespace,
+      showKubeConfig,
       modeOptions,
       redShiftModeOptions,
       sagemakerModeOption,
@@ -542,7 +544,10 @@ const DetailModal = defineComponent({
                   />
                 </NFormItem>
                 <NFormItem
-                  v-show={!showMode || detailForm.mode === 'password'}
+                  v-show={
+                    (!showMode || detailForm.mode === 'password') &&
+                    detailForm.type != 'K8S'
+                  }
                   label={t('datasource.user_name')}
                   path='userName'
                   show-require-mark
@@ -557,7 +562,10 @@ const DetailModal = defineComponent({
                   />
                 </NFormItem>
                 <NFormItem
-                  v-show={!showMode || detailForm.mode === 'password'}
+                  v-show={
+                    (!showMode || detailForm.mode === 'password') &&
+                    detailForm.type != 'K8S'
+                  }
                   label={t('datasource.user_password')}
                   path='password'
                 >
@@ -678,6 +686,35 @@ const DetailModal = defineComponent({
                     }}
                   />
                 </NFormItem>
+                <NFormItem
+                  v-show={showKubeConfig}
+                  label={t('datasource.kubeConfig')}
+                  path='kubeConfig'
+                  show-require-mark
+                >
+                  <NInput
+                    allowInput={this.trim}
+                    class='input-kubeConfig'
+                    v-model={[detailForm.kubeConfig, 'value']}
+                    type='textarea'
+                    autosize={{
+                      minRows: 14
+                    }}
+                    placeholder={t('datasource.kubeConfig_tips')}
+                  />
+                </NFormItem>
+                <NFormItem
+                  v-show={showNamespace}
+                  label={t('datasource.namespace')}
+                  path='namespace'
+                  show-require-mark
+                >
+                  <NInput
+                    allowInput={this.trim}
+                    v-model={[detailForm.namespace, 'value']}
+                    placeholder={t('datasource.namespace_tips')}
+                  />
+                </NFormItem>
               </NForm>
             </NSpin>
           ),
diff --git a/dolphinscheduler-ui/src/views/datasource/list/use-form.ts 
b/dolphinscheduler-ui/src/views/datasource/list/use-form.ts
index ed383c3720..21916667d2 100644
--- a/dolphinscheduler-ui/src/views/datasource/list/use-form.ts
+++ b/dolphinscheduler-ui/src/views/datasource/list/use-form.ts
@@ -69,6 +69,8 @@ export function useForm(id?: number) {
     showDataBaseName: true,
     showJDBCConnectParameters: true,
     showPublicKey: false,
+    showNamespace: false,
+    showKubeConfig: false,
     rules: {
       name: {
         trigger: ['input'],
@@ -118,7 +120,8 @@ export function useForm(id?: number) {
         validator() {
           if (
             !state.detailForm.userName &&
-            state.detailForm.type !== 'AZURESQL'
+            state.detailForm.type !== 'AZURESQL' &&
+            state.detailForm.type !== 'K8S'
           ) {
             return new Error(t('datasource.user_name_tips'))
           }
@@ -261,7 +264,12 @@ export function useForm(id?: number) {
     } else {
       state.showPrincipal = false
     }
-    if (type === 'SSH' || type === 'ZEPPELIN' || type === 'SAGEMAKER') {
+    if (
+      type === 'SSH' ||
+      type === 'ZEPPELIN' ||
+      type === 'SAGEMAKER' ||
+      type === 'K8S'
+    ) {
       state.showDataBaseName = false
       state.requiredDataBase = false
       state.showJDBCConnectParameters = false
@@ -274,10 +282,14 @@ export function useForm(id?: number) {
         state.showPort = false
         state.showRestEndpoint = true
       }
-      if (type === 'SAGEMAKER') {
+      if (type === 'SAGEMAKER' || type === 'K8S') {
         state.showHost = false
         state.showPort = false
       }
+      if (type === 'K8S') {
+        state.showNamespace = true
+        state.showKubeConfig = true
+      }
     } else {
       state.showDataBaseName = true
       state.requiredDataBase = true
@@ -441,6 +453,11 @@ export const datasourceType: IDataBaseOptionKeys = {
     value: 'SAGEMAKER',
     label: 'SAGEMAKER',
     defaultPort: 0
+  },
+  K8S: {
+    value: 'K8S',
+    label: 'K8S',
+    defaultPort: 6443
   }
 }
 
diff --git 
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-k8s.ts 
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-k8s.ts
index b545d15463..ed9d83a6ac 100644
--- 
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-k8s.ts
+++ 
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-k8s.ts
@@ -14,12 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import {
-  useCustomParams,
-  useNamespace,
-  useCustomLabels,
-  useNodeSelectors
-} from '.'
+import { useCustomParams, useCustomLabels, useNodeSelectors } from '.'
 import type { IJsonItem } from '../types'
 import { useI18n } from 'vue-i18n'
 
@@ -27,7 +22,6 @@ export function useK8s(model: { [field: string]: any }): 
IJsonItem[] {
   const { t } = useI18n()
 
   return [
-    useNamespace(),
     {
       type: 'input-number',
       field: 'minCpuCores',
diff --git 
a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts 
b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
index a5598cd8df..c00f9be782 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
@@ -361,6 +361,9 @@ export function formatParams(data: INodeData): {
     taskParams.args = data.args
     taskParams.customizedLabels = data.customizedLabels
     taskParams.nodeSelectors = data.nodeSelectors
+    taskParams.datasource = data.datasource
+    taskParams.type = data.type
+    taskParams.kubeConfig = data.kubeConfig
     taskParams.pullSecret = data.pullSecret
   }
 
diff --git 
a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-k8s.ts 
b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-k8s.ts
index 8cce04b568..c72443c218 100644
--- 
a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-k8s.ts
+++ 
b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-k8s.ts
@@ -46,7 +46,11 @@ export function useK8s({
     workerGroup: 'default',
     delayTime: 0,
     timeout: 30,
-    timeoutNotifyStrategy: ['WARN']
+    type: 'K8S',
+    displayRows: 10,
+    timeoutNotifyStrategy: ['WARN'],
+    kubeConfig: '',
+    namespace: ''
   } as INodeData)
 
   return {
@@ -63,6 +67,7 @@ export function useK8s({
       ...Fields.useFailed(),
       Fields.useDelayTime(model),
       ...Fields.useTimeoutAlarm(model),
+      ...Fields.useDatasource(model),
       ...Fields.useK8s(model),
       Fields.usePreTasks()
     ] as IJsonItem[],

Reply via email to