This is an automated email from the ASF dual-hosted git repository.

chufenggao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new e99c5950b9 [DSIP-19] Support zeppelin connections in the connection 
center, as well as external connections to the connection center in zeppelin 
tasks (#14434)
e99c5950b9 is described below

commit e99c5950b9a745878bf24e784b11fd0cbf68c5ba
Author: chenrj <[email protected]>
AuthorDate: Mon Oct 30 16:44:54 2023 +0800

    [DSIP-19] Support zeppelin connections in the connection center, as well as 
external connections to the connection center in zeppelin tasks (#14434)
    
    * Refactoring zeppelin task plugin with connections managed in connection 
center
    
    ---------
    
    Co-authored-by: Eric Gao <[email protected]>
---
 dolphinscheduler-bom/pom.xml                       |   7 ++
 .../dolphinscheduler-datasource-all/pom.xml        |   5 +
 .../dolphinscheduler-datasource-zeppelin}/pom.xml  |  12 +-
 .../datasource/zeppelin/ZeppelinClientWrapper.java |  55 +++++++++
 .../zeppelin/ZeppelinDataSourceChannel.java        |  37 ++++++
 .../zeppelin/ZeppelinDataSourceChannelFactory.java |  38 ++++++
 .../plugin/datasource/zeppelin/ZeppelinUtils.java  |  36 ++++++
 .../zeppelin/param/ZeppelinConnectionParam.java    |  35 ++++++
 .../zeppelin/param/ZeppelinDataSourceParamDTO.java |  34 ++++++
 .../param/ZeppelinDataSourceProcessor.java         | 131 +++++++++++++++++++++
 .../zeppelin/ZeppelinDataSourceProcessorTest.java  | 107 +++++++++++++++++
 dolphinscheduler-datasource-plugin/pom.xml         |   1 +
 .../e2e/pages/datasource/DataSourcePage.java       |   7 ++
 .../apache/dolphinscheduler/spi/enums/DbType.java  |   3 +-
 .../dolphinscheduler-task-zeppelin/pom.xml         |   5 +
 .../plugin/task/zeppelin/ZeppelinParameters.java   |  23 +++-
 .../plugin/task/zeppelin/ZeppelinTask.java         |  43 +++----
 .../plugin/task/zeppelin/ZeppelinTaskChannel.java  |   2 +-
 .../zeppelin/ZeppelinTaskExecutionContext.java     |  46 ++++++++
 .../plugin/task/zeppelin/ZeppelinTaskTest.java     |  33 +++++-
 .../src/locales/zh_CN/datasource.ts                |   4 +-
 .../src/service/modules/data-source/types.ts       |   3 +
 .../src/views/datasource/list/detail.tsx           |  16 +++
 .../src/views/datasource/list/use-form.ts          |  18 ++-
 .../task/components/node/fields/use-datasource.ts  |   5 +
 .../task/components/node/fields/use-zeppelin.ts    |  33 ------
 .../projects/task/components/node/format-data.ts   |   2 +
 .../task/components/node/tasks/use-zeppelin.ts     |   8 +-
 28 files changed, 681 insertions(+), 68 deletions(-)

diff --git a/dolphinscheduler-bom/pom.xml b/dolphinscheduler-bom/pom.xml
index 6e4770d60a..ccd6b3a4f7 100644
--- a/dolphinscheduler-bom/pom.xml
+++ b/dolphinscheduler-bom/pom.xml
@@ -117,6 +117,7 @@
         <protobuf.version>3.17.2</protobuf.version>
         <esdk-obs.version>3.23.3</esdk-obs.version>
         <system-lambda.version>1.2.1</system-lambda.version>
+        <zeppelin-client.version>0.10.1</zeppelin-client.version>
         <testcontainer.version>1.17.6</testcontainer.version>
         <checker-qual.version>3.19.0</checker-qual.version>
     </properties>
@@ -894,6 +895,12 @@
                 <version>${snowflake-jdbc.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>org.apache.zeppelin</groupId>
+                <artifactId>zeppelin-client</artifactId>
+                <version>${zeppelin-client.version}</version>
+            </dependency>
+
             <dependency>
                 <groupId>com.google.protobuf</groupId>
                 <artifactId>protobuf-java</artifactId>
diff --git 
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml
index 586948cab6..71905dea45 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml
@@ -132,6 +132,11 @@
             <artifactId>dolphinscheduler-datasource-vertica</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-datasource-zeppelin</artifactId>
+            <version>${project.version}</version>
+        </dependency>
 
         <dependency>
             <groupId>org.apache.dolphinscheduler</groupId>
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/pom.xml 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/pom.xml
similarity index 88%
copy from dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/pom.xml
copy to 
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/pom.xml
index 69a5a66994..90c7f1ece5 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/pom.xml
+++ 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/pom.xml
@@ -20,11 +20,13 @@
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.dolphinscheduler</groupId>
-        <artifactId>dolphinscheduler-task-plugin</artifactId>
+        <artifactId>dolphinscheduler-datasource-plugin</artifactId>
         <version>dev-SNAPSHOT</version>
     </parent>
-    <artifactId>dolphinscheduler-task-zeppelin</artifactId>
+
+    <artifactId>dolphinscheduler-datasource-zeppelin</artifactId>
     <packaging>jar</packaging>
+    <name>${project.artifactId}</name>
 
     <dependencies>
         <dependency>
@@ -32,15 +34,17 @@
             <artifactId>dolphinscheduler-spi</artifactId>
             <scope>provided</scope>
         </dependency>
+
         <dependency>
             <groupId>org.apache.dolphinscheduler</groupId>
-            <artifactId>dolphinscheduler-task-api</artifactId>
+            <artifactId>dolphinscheduler-datasource-api</artifactId>
             <version>${project.version}</version>
         </dependency>
+
         <dependency>
             <groupId>org.apache.zeppelin</groupId>
             <artifactId>zeppelin-client</artifactId>
-            <version>0.10.1</version>
         </dependency>
     </dependencies>
+
 </project>
diff --git 
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinClientWrapper.java
 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinClientWrapper.java
new file mode 100644
index 0000000000..4a6c840e40
--- /dev/null
+++ 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinClientWrapper.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.zeppelin;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.zeppelin.client.ClientConfig;
+import org.apache.zeppelin.client.ZeppelinClient;
+
+import lombok.extern.slf4j.Slf4j;
+@Slf4j
+public class ZeppelinClientWrapper implements AutoCloseable {
+
+    private ZeppelinClient zeppelinClient;
+
+    public ZeppelinClientWrapper(String restEndpoint)
+                                                      throws Exception {
+        checkNotNull(restEndpoint);
+        ClientConfig clientConfig = new ClientConfig(restEndpoint);
+        zeppelinClient = new ZeppelinClient(clientConfig);
+    }
+
+    public boolean checkConnect(String username, String password) {
+        try {
+            // If the login fails, an exception will be thrown directly
+            zeppelinClient.login(username, password);
+            String version = zeppelinClient.getVersion();
+            log.info("zeppelin client connects to server successfully, version 
is {}", version);
+            return true;
+        } catch (Exception e) {
+            log.info("zeppelin client failed to connect to the server");
+            return false;
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+
+    }
+}
diff --git 
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinDataSourceChannel.java
 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinDataSourceChannel.java
new file mode 100644
index 0000000000..c8e33611e7
--- /dev/null
+++ 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinDataSourceChannel.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.zeppelin;
+
+import org.apache.dolphinscheduler.spi.datasource.AdHocDataSourceClient;
+import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
+import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel;
+import org.apache.dolphinscheduler.spi.datasource.PooledDataSourceClient;
+import org.apache.dolphinscheduler.spi.enums.DbType;
+
+public class ZeppelinDataSourceChannel implements DataSourceChannel {
+
+    @Override
+    public AdHocDataSourceClient 
createAdHocDataSourceClient(BaseConnectionParam baseConnectionParam, DbType 
dbType) {
+        throw new UnsupportedOperationException("Zeppelin 
AdHocDataSourceClient is not supported");
+    }
+
+    @Override
+    public PooledDataSourceClient 
createPooledDataSourceClient(BaseConnectionParam baseConnectionParam, DbType 
dbType) {
+        throw new UnsupportedOperationException("Zeppelin 
AdHocDataSourceClient is not supported");
+    }
+}
diff --git 
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinDataSourceChannelFactory.java
 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinDataSourceChannelFactory.java
new file mode 100644
index 0000000000..692819cf78
--- /dev/null
+++ 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinDataSourceChannelFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.zeppelin;
+
+import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel;
+import org.apache.dolphinscheduler.spi.datasource.DataSourceChannelFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(DataSourceChannelFactory.class)
+public class ZeppelinDataSourceChannelFactory implements 
DataSourceChannelFactory {
+
+    @Override
+    public DataSourceChannel create() {
+        return new ZeppelinDataSourceChannel();
+    }
+
+    @Override
+    public String getName() {
+        return "zeppelin";
+    }
+
+}
diff --git 
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinUtils.java
 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinUtils.java
new file mode 100644
index 0000000000..308af03d8f
--- /dev/null
+++ 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinUtils.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.zeppelin;
+
+import 
org.apache.dolphinscheduler.plugin.datasource.zeppelin.param.ZeppelinConnectionParam;
+
+import org.apache.zeppelin.client.ClientConfig;
+import org.apache.zeppelin.client.ZeppelinClient;
+
+public class ZeppelinUtils {
+
+    private ZeppelinUtils() {
+        throw new IllegalStateException("Utility class");
+    }
+
+    public static ZeppelinClient getZeppelinClient(ZeppelinConnectionParam 
connectionParam) throws Exception {
+        ClientConfig clientConfig = new 
ClientConfig(connectionParam.getRestEndpoint());
+        return new ZeppelinClient(clientConfig);
+    }
+
+}
diff --git 
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/param/ZeppelinConnectionParam.java
 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/param/ZeppelinConnectionParam.java
new file mode 100644
index 0000000000..2c716a7e71
--- /dev/null
+++ 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/param/ZeppelinConnectionParam.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.zeppelin.param;
+
+import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
+
+import lombok.Data;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+
+@Data
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class ZeppelinConnectionParam implements ConnectionParam {
+
+    protected String username;
+
+    protected String password;
+
+    protected String restEndpoint;
+}
diff --git 
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/param/ZeppelinDataSourceParamDTO.java
 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/param/ZeppelinDataSourceParamDTO.java
new file mode 100644
index 0000000000..ae5a1d7025
--- /dev/null
+++ 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/param/ZeppelinDataSourceParamDTO.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.zeppelin.param;
+
+import 
org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
+import org.apache.dolphinscheduler.spi.enums.DbType;
+
+import lombok.Data;
+
+@Data
+public class ZeppelinDataSourceParamDTO extends BaseDataSourceParamDTO {
+
+    protected String restEndpoint;
+
+    @Override
+    public DbType getType() {
+        return DbType.ZEPPELIN;
+    }
+}
diff --git 
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/param/ZeppelinDataSourceProcessor.java
 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/param/ZeppelinDataSourceProcessor.java
new file mode 100644
index 0000000000..bf2795959e
--- /dev/null
+++ 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/param/ZeppelinDataSourceProcessor.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.zeppelin.param;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import 
org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
+import 
org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
+import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
+import 
org.apache.dolphinscheduler.plugin.datasource.zeppelin.ZeppelinClientWrapper;
+import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
+import org.apache.dolphinscheduler.spi.enums.DbType;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.sql.Connection;
+import java.text.MessageFormat;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(DataSourceProcessor.class)
+@Slf4j
+public class ZeppelinDataSourceProcessor implements DataSourceProcessor {
+
+    @Override
+    public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) {
+        return JSONUtils.parseObject(paramJson, 
ZeppelinDataSourceParamDTO.class);
+    }
+
+    @Override
+    public void checkDatasourceParam(BaseDataSourceParamDTO 
datasourceParamDTO) {
+        ZeppelinDataSourceParamDTO zeppelinDataSourceParamDTO = 
(ZeppelinDataSourceParamDTO) datasourceParamDTO;
+        if (StringUtils.isEmpty(zeppelinDataSourceParamDTO.getRestEndpoint())
+                || 
StringUtils.isEmpty(zeppelinDataSourceParamDTO.getUserName())) {
+            throw new IllegalArgumentException("zeppelin datasource param is 
not valid");
+        }
+    }
+
+    @Override
+    public String getDatasourceUniqueId(ConnectionParam connectionParam, 
DbType dbType) {
+        ZeppelinConnectionParam baseConnectionParam = 
(ZeppelinConnectionParam) connectionParam;
+        return MessageFormat.format("{0}@{1}@{2}@{3}", dbType.getDescp(), 
baseConnectionParam.getRestEndpoint(),
+                baseConnectionParam.getUsername(), 
PasswordUtils.encodePassword(baseConnectionParam.getPassword()));
+    }
+
+    @Override
+    public BaseDataSourceParamDTO createDatasourceParamDTO(String 
connectionJson) {
+        ZeppelinConnectionParam connectionParams = (ZeppelinConnectionParam) 
createConnectionParams(connectionJson);
+        ZeppelinDataSourceParamDTO zeppelinDataSourceParamDTO = new 
ZeppelinDataSourceParamDTO();
+
+        zeppelinDataSourceParamDTO.setUserName(connectionParams.getUsername());
+        zeppelinDataSourceParamDTO.setPassword(connectionParams.getPassword());
+        
zeppelinDataSourceParamDTO.setRestEndpoint(connectionParams.getRestEndpoint());
+        return zeppelinDataSourceParamDTO;
+    }
+
+    @Override
+    public ZeppelinConnectionParam 
createConnectionParams(BaseDataSourceParamDTO datasourceParam) {
+        ZeppelinDataSourceParamDTO zeppelinDataSourceParam = 
(ZeppelinDataSourceParamDTO) datasourceParam;
+        ZeppelinConnectionParam zeppelinConnectionParam = new 
ZeppelinConnectionParam();
+        
zeppelinConnectionParam.setUsername(zeppelinDataSourceParam.getUserName());
+        
zeppelinConnectionParam.setPassword(zeppelinDataSourceParam.getPassword());
+        
zeppelinConnectionParam.setRestEndpoint(zeppelinDataSourceParam.getRestEndpoint());
+
+        return zeppelinConnectionParam;
+    }
+
+    @Override
+    public ConnectionParam createConnectionParams(String connectionJson) {
+        return JSONUtils.parseObject(connectionJson, 
ZeppelinConnectionParam.class);
+    }
+
+    @Override
+    public String getDatasourceDriver() {
+        return "";
+    }
+
+    @Override
+    public String getValidationQuery() {
+        return "";
+    }
+
+    @Override
+    public String getJdbcUrl(ConnectionParam connectionParam) {
+        return "";
+    }
+
+    @Override
+    public Connection getConnection(ConnectionParam connectionParam) {
+        return null;
+    }
+
+    @Override
+    public boolean checkDataSourceConnectivity(ConnectionParam 
connectionParam) {
+        ZeppelinConnectionParam baseConnectionParam = 
(ZeppelinConnectionParam) connectionParam;
+        try (
+                ZeppelinClientWrapper zeppelinClientWrapper =
+                        new 
ZeppelinClientWrapper(baseConnectionParam.getRestEndpoint())) {
+            return 
zeppelinClientWrapper.checkConnect(baseConnectionParam.username, 
baseConnectionParam.password);
+        } catch (Exception e) {
+            log.error("zeppelin client failed to connect to the server", e);
+            return false;
+        }
+    }
+
+    @Override
+    public DbType getDbType() {
+        return DbType.ZEPPELIN;
+    }
+
+    @Override
+    public DataSourceProcessor create() {
+        return new ZeppelinDataSourceProcessor();
+    }
+}
diff --git 
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinDataSourceProcessorTest.java
 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinDataSourceProcessorTest.java
new file mode 100644
index 0000000000..05be02e722
--- /dev/null
+++ 
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinDataSourceProcessorTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.zeppelin;
+
+import 
org.apache.dolphinscheduler.plugin.datasource.zeppelin.param.ZeppelinConnectionParam;
+import 
org.apache.dolphinscheduler.plugin.datasource.zeppelin.param.ZeppelinDataSourceParamDTO;
+import 
org.apache.dolphinscheduler.plugin.datasource.zeppelin.param.ZeppelinDataSourceProcessor;
+import org.apache.dolphinscheduler.spi.enums.DbType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.MockedConstruction;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class ZeppelinDataSourceProcessorTest {
+
+    private ZeppelinDataSourceProcessor zeppelinDataSourceProcessor;
+
+    private String connectJson =
+            
"{\"username\":\"lucky\",\"password\":\"123456\",\"restEndpoint\":\"https://dolphinscheduler.com:8080\"}";;
+
+    @BeforeEach
+    public void init() {
+        zeppelinDataSourceProcessor = new ZeppelinDataSourceProcessor();
+    }
+
+    @Test
+    void testCheckDatasourceParam() {
+        ZeppelinDataSourceParamDTO zeppelinDataSourceParamDTO = new 
ZeppelinDataSourceParamDTO();
+        Assertions.assertThrows(IllegalArgumentException.class,
+                () -> 
zeppelinDataSourceProcessor.checkDatasourceParam(zeppelinDataSourceParamDTO));
+        
zeppelinDataSourceParamDTO.setRestEndpoint("http://dolphinscheduler.com:8080";);
+        Assertions.assertThrows(IllegalArgumentException.class,
+                () -> 
zeppelinDataSourceProcessor.checkDatasourceParam(zeppelinDataSourceParamDTO));
+        zeppelinDataSourceParamDTO.setUserName("root");
+        Assertions
+                .assertDoesNotThrow(() -> 
zeppelinDataSourceProcessor.checkDatasourceParam(zeppelinDataSourceParamDTO));
+    }
+
+    @Test
+    void testGetDatasourceUniqueId() {
+        ZeppelinConnectionParam zeppelinConnectionParam = new 
ZeppelinConnectionParam();
+        
zeppelinConnectionParam.setRestEndpoint("https://dolphinscheduler.com:8080";);
+        zeppelinConnectionParam.setUsername("root");
+        zeppelinConnectionParam.setPassword("123456");
+        
Assertions.assertEquals("zeppelin@https://dolphinscheduler.com:8080@root@123456";,
+                
zeppelinDataSourceProcessor.getDatasourceUniqueId(zeppelinConnectionParam, 
DbType.ZEPPELIN));
+
+    }
+
+    @Test
+    void testCreateDatasourceParamDTO() {
+        ZeppelinDataSourceParamDTO zeppelinDataSourceParamDTO =
+                (ZeppelinDataSourceParamDTO) 
zeppelinDataSourceProcessor.createDatasourceParamDTO(connectJson);
+        Assertions.assertEquals("lucky", 
zeppelinDataSourceParamDTO.getUserName());
+        Assertions.assertEquals("123456", 
zeppelinDataSourceParamDTO.getPassword());
+        Assertions.assertEquals("https://dolphinscheduler.com:8080";, 
zeppelinDataSourceParamDTO.getRestEndpoint());
+    }
+
+    @Test
+    void testCreateConnectionParams() {
+        ZeppelinDataSourceParamDTO zeppelinDataSourceParamDTO =
+                (ZeppelinDataSourceParamDTO) 
zeppelinDataSourceProcessor.createDatasourceParamDTO(connectJson);
+        ZeppelinConnectionParam zeppelinConnectionParam =
+                
zeppelinDataSourceProcessor.createConnectionParams(zeppelinDataSourceParamDTO);
+        Assertions.assertEquals("lucky", 
zeppelinConnectionParam.getUsername());
+        Assertions.assertEquals("123456", 
zeppelinConnectionParam.getPassword());
+        Assertions.assertEquals("https://dolphinscheduler.com:8080";, 
zeppelinConnectionParam.getRestEndpoint());
+    }
+
+    @Test
+    void testTestConnection() {
+        ZeppelinDataSourceParamDTO zeppelinDataSourceParamDTO =
+                (ZeppelinDataSourceParamDTO) 
zeppelinDataSourceProcessor.createDatasourceParamDTO(connectJson);
+        ZeppelinConnectionParam connectionParam =
+                
zeppelinDataSourceProcessor.createConnectionParams(zeppelinDataSourceParamDTO);
+        
Assertions.assertFalse(zeppelinDataSourceProcessor.checkDataSourceConnectivity(connectionParam));
+        try (
+                MockedConstruction<ZeppelinClientWrapper> 
sshClientWrapperMockedConstruction =
+                        Mockito.mockConstruction(ZeppelinClientWrapper.class, 
(mock, context) -> {
+                            Mockito.when(
+                                    
mock.checkConnect(connectionParam.getUsername(), connectionParam.getPassword()))
+                                    .thenReturn(true);
+                        })) {
+            
Assertions.assertTrue(zeppelinDataSourceProcessor.checkDataSourceConnectivity(connectionParam));
+        }
+    }
+}
diff --git a/dolphinscheduler-datasource-plugin/pom.xml 
b/dolphinscheduler-datasource-plugin/pom.xml
index 79261be779..91882bb0b6 100644
--- a/dolphinscheduler-datasource-plugin/pom.xml
+++ b/dolphinscheduler-datasource-plugin/pom.xml
@@ -48,6 +48,7 @@
         <module>dolphinscheduler-datasource-azure-sql</module>
         <module>dolphinscheduler-datasource-dameng</module>
         <module>dolphinscheduler-datasource-ssh</module>
+        <module>dolphinscheduler-datasource-zeppelin</module>
         <module>dolphinscheduler-datasource-databend</module>
         <module>dolphinscheduler-datasource-snowflake</module>
         <module>dolphinscheduler-datasource-vertica</module>
diff --git 
a/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/datasource/DataSourcePage.java
 
b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/datasource/DataSourcePage.java
index 1f6c76fd44..3b8633443d 100644
--- 
a/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/datasource/DataSourcePage.java
+++ 
b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/datasource/DataSourcePage.java
@@ -186,5 +186,12 @@ public class DataSourcePage extends NavBarPage implements 
NavBarPage.NavBarItem
 
         @FindBy(className = "btn-test-connection")
         private WebElement btnTestConnection;
+
+        @FindBys({
+                @FindBy(className = "input-zeppelin_rest_endpoint"),
+                @FindBy(tagName = "input"),
+        })
+        private WebElement inputZeppelinRestEndpoint;
+
     }
 }
diff --git 
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java
 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java
index 169f0ad9bf..967eec3b86 100644
--- 
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java
+++ 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java
@@ -51,7 +51,8 @@ public enum DbType {
     SNOWFLAKE(20, "snowflake"),
     VERTICA(21, "vertica"),
     HANA(22, "hana"),
-    DORIS(23, "doris");
+    DORIS(23, "doris"),
+    ZEPPELIN(24, "zeppelin");
 
     private static final Map<Integer, DbType> DB_TYPE_MAP =
             Arrays.stream(DbType.values()).collect(toMap(DbType::getCode, 
Functions.identity()));
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/pom.xml 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/pom.xml
index 69a5a66994..d136977526 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/pom.xml
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/pom.xml
@@ -37,6 +37,11 @@
             <artifactId>dolphinscheduler-task-api</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-datasource-all</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.zeppelin</groupId>
             <artifactId>zeppelin-client</artifactId>
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
index 8b1c1a341d..b3ddcbc791 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
@@ -17,13 +17,17 @@
 
 package org.apache.dolphinscheduler.plugin.task.zeppelin;
 
+import org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType;
 import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
 import 
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import 
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters;
+import 
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
 
 import org.apache.commons.lang3.StringUtils;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 
 import lombok.Getter;
 import lombok.Setter;
@@ -45,10 +49,12 @@ public class ZeppelinParameters extends AbstractParameters {
     private String parameters;
     private String username;
     private String password;
+    private int datasource;
+    private String type;
 
     @Override
     public boolean checkParameters() {
-        return StringUtils.isNotEmpty(this.noteId) && 
StringUtils.isNotEmpty(this.restEndpoint);
+        return StringUtils.isNotEmpty(this.noteId);
     }
 
     @Override
@@ -56,4 +62,19 @@ public class ZeppelinParameters extends AbstractParameters {
         return Collections.emptyList();
     }
 
+    public ZeppelinTaskExecutionContext 
generateExtendedContext(ResourceParametersHelper parametersHelper) {
+        DataSourceParameters dataSourceParameters =
+                (DataSourceParameters) 
parametersHelper.getResourceParameters(ResourceType.DATASOURCE, datasource);
+        ZeppelinTaskExecutionContext zeppelinTaskExecutionContext = new 
ZeppelinTaskExecutionContext();
+        zeppelinTaskExecutionContext.setConnectionParams(
+                Objects.nonNull(dataSourceParameters) ? 
dataSourceParameters.getConnectionParams() : null);
+        return zeppelinTaskExecutionContext;
+    }
+
+    @Override
+    public ResourceParametersHelper getResources() {
+        ResourceParametersHelper resources = super.getResources();
+        resources.put(ResourceType.DATASOURCE, datasource);
+        return resources;
+    }
 }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
index 0f25527c08..f6fa8e89ff 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
@@ -19,12 +19,15 @@ package org.apache.dolphinscheduler.plugin.task.zeppelin;
 
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
+import 
org.apache.dolphinscheduler.plugin.datasource.zeppelin.param.ZeppelinConnectionParam;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
 import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import 
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.spi.enums.DbType;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.zeppelin.client.ClientConfig;
@@ -59,6 +62,10 @@ public class ZeppelinTask extends AbstractRemoteTask {
      */
     private ZeppelinClient zClient;
 
+    private ZeppelinConnectionParam zeppelinConnectionParam;
+
+    private ZeppelinTaskExecutionContext zeppelinTaskExecutionContext;
+
     /**
      * constructor
      *
@@ -76,6 +83,14 @@ public class ZeppelinTask extends AbstractRemoteTask {
         if (this.zeppelinParameters == null || 
!this.zeppelinParameters.checkParameters()) {
             throw new ZeppelinTaskException("zeppelin task params is not 
valid");
         }
+        zeppelinTaskExecutionContext =
+                
zeppelinParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
+        zeppelinConnectionParam = (ZeppelinConnectionParam) DataSourceUtils
+                
.buildConnectionParams(DbType.valueOf(zeppelinParameters.getType()),
+                        zeppelinTaskExecutionContext.getConnectionParams());
+        zeppelinParameters.setUsername(zeppelinConnectionParam.getUsername());
+        zeppelinParameters.setPassword(zeppelinConnectionParam.getPassword());
+        
zeppelinParameters.setRestEndpoint(zeppelinConnectionParam.getRestEndpoint());
         log.info("Initialize zeppelin task params:{}", 
JSONUtils.toPrettyJsonString(taskParams));
         this.zClient = getZeppelinClient();
     }
@@ -111,11 +126,8 @@ public class ZeppelinTask extends AbstractRemoteTask {
             Status status = Status.FINISHED;
             // If in production, clone the note and run the cloned one for 
stability
             if (productionNoteDirectory != null) {
-                final String cloneNotePath = String.format(
-                        "%s%s_%s",
-                        productionNoteDirectory,
-                        noteId,
-                        DateUtils.getTimestampString());
+                final String cloneNotePath =
+                        String.format("%s%s_%s", productionNoteDirectory, 
noteId, DateUtils.getTimestampString());
                 noteId = this.zClient.cloneNote(noteId, cloneNotePath);
             }
 
@@ -124,11 +136,8 @@ public class ZeppelinTask extends AbstractRemoteTask {
                 final List<ParagraphResult> paragraphResultList = 
noteResult.getParagraphResultList();
                 StringBuilder resultContentBuilder = new StringBuilder();
                 for (ParagraphResult paragraphResult : paragraphResultList) {
-                    resultContentBuilder.append(
-                            String.format(
-                                    "paragraph_id: %s, paragraph_result: %s\n",
-                                    paragraphResult.getParagraphId(),
-                                    paragraphResult.getResultInText()));
+                    resultContentBuilder.append(String.format("paragraph_id: 
%s, paragraph_result: %s\n",
+                            paragraphResult.getParagraphId(), 
paragraphResult.getResultInText()));
                     status = paragraphResult.getStatus();
                     // we treat note execution as failure if any paragraph in 
the note fails
                     // status will be further processed in method 
mapStatusToExitCode below
@@ -221,27 +230,21 @@ public class ZeppelinTask extends AbstractRemoteTask {
         final String paragraphId = this.zeppelinParameters.getParagraphId();
         if (paragraphId == null) {
             log.info("trying terminate zeppelin task, taskId: {}, noteId: {}",
-                    this.taskExecutionContext.getTaskInstanceId(),
-                    noteId);
+                    this.taskExecutionContext.getTaskInstanceId(), noteId);
             Unirest.config().defaultBaseUrl(restEndpoint + "/api");
             Unirest.delete("/notebook/job/{noteId}").routeParam("noteId", 
noteId).asJson();
-            log.info("zeppelin task terminated, taskId: {}, noteId: {}",
-                    this.taskExecutionContext.getTaskInstanceId(),
+            log.info("zeppelin task terminated, taskId: {}, noteId: {}", 
this.taskExecutionContext.getTaskInstanceId(),
                     noteId);
         } else {
             log.info("trying terminate zeppelin task, taskId: {}, noteId: {}, 
paragraphId: {}",
-                    this.taskExecutionContext.getTaskInstanceId(),
-                    noteId,
-                    paragraphId);
+                    this.taskExecutionContext.getTaskInstanceId(), noteId, 
paragraphId);
             try {
                 this.zClient.cancelParagraph(noteId, paragraphId);
             } catch (Exception e) {
                 throw new TaskException("cancel paragraph error", e);
             }
             log.info("zeppelin task terminated, taskId: {}, noteId: {}, 
paragraphId: {}",
-                    this.taskExecutionContext.getTaskInstanceId(),
-                    noteId,
-                    paragraphId);
+                    this.taskExecutionContext.getTaskInstanceId(), noteId, 
paragraphId);
         }
 
     }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskChannel.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskChannel.java
index c2f63f3cf6..d9e7318dc4 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskChannel.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskChannel.java
@@ -44,6 +44,6 @@ public class ZeppelinTaskChannel implements TaskChannel {
 
     @Override
     public ResourceParametersHelper getResources(String parameters) {
-        return null;
+        return JSONUtils.parseObject(parameters, 
ZeppelinParameters.class).getResources();
     }
 }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskExecutionContext.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskExecutionContext.java
new file mode 100644
index 0000000000..4cc09e5f4f
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskExecutionContext.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.zeppelin;
+
+import java.io.Serializable;
+
+/**
+ *  master/worker task transport
+ */
+public class ZeppelinTaskExecutionContext implements Serializable {
+
+    /**
+     * connectionParams
+     */
+    private String connectionParams;
+
+    public String getConnectionParams() {
+        return connectionParams;
+    }
+
+    public void setConnectionParams(String connectionParams) {
+        this.connectionParams = connectionParams;
+    }
+
+    @Override
+    public String toString() {
+        return "ZeppelinTaskExecutionContext{"
+                + "connectionParams='" + connectionParams + '\''
+                + '}';
+    }
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
index 8c5cb11377..ffaf85b207 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
@@ -28,10 +28,13 @@ import static org.mockito.Mockito.when;
 
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
+import 
org.apache.dolphinscheduler.plugin.datasource.zeppelin.param.ZeppelinConnectionParam;
 import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
+import 
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
 
 import org.apache.zeppelin.client.NoteResult;
 import org.apache.zeppelin.client.ParagraphResult;
@@ -40,6 +43,7 @@ import org.apache.zeppelin.client.ZeppelinClient;
 
 import java.util.Map;
 
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -59,6 +63,8 @@ public class ZeppelinTaskTest {
     private static final String MOCK_REST_ENDPOINT = "localhost:8080";
     private static final String MOCK_CLONE_NOTE_ID = "3GYJR92R8";
     private static final String MOCK_PRODUCTION_DIRECTORY = "/prod/";
+    private static final String MOCK_TYPE = "ZEPPELIN";
+    private static MockedStatic<DataSourceUtils> dataSourceUtilsStaticMock = 
null;
     private final ObjectMapper mapper = new ObjectMapper();
 
     private ZeppelinClient zClient;
@@ -80,9 +86,15 @@ public class ZeppelinTaskTest {
 
     @BeforeEach
     public void before() throws Exception {
-        String zeppelinParameters = buildZeppelinTaskParameters();
+        String zeppelinTaskParameters = buildZeppelinTaskParameters();
         TaskExecutionContext taskExecutionContext = 
mock(TaskExecutionContext.class);
-        
when(taskExecutionContext.getTaskParams()).thenReturn(zeppelinParameters);
+        ResourceParametersHelper resourceParametersHelper = 
mock(ResourceParametersHelper.class);
+        ZeppelinConnectionParam zeppelinConnectionParam = 
mock(ZeppelinConnectionParam.class);
+        
when(taskExecutionContext.getTaskParams()).thenReturn(zeppelinTaskParameters);
+        
when(taskExecutionContext.getResourceParametersHelper()).thenReturn(resourceParametersHelper);
+        dataSourceUtilsStaticMock = Mockito.mockStatic(DataSourceUtils.class);
+        dataSourceUtilsStaticMock.when(() -> 
DataSourceUtils.buildConnectionParams(Mockito.any(), Mockito.any()))
+                .thenReturn(zeppelinConnectionParam);
         this.zeppelinTask = spy(new ZeppelinTask(taskExecutionContext));
 
         this.zClient = mock(ZeppelinClient.class);
@@ -93,6 +105,11 @@ public class ZeppelinTaskTest {
         this.zeppelinTask.init();
     }
 
+    @AfterEach
+    public void afterEach() {
+        dataSourceUtilsStaticMock.close();
+    }
+
     @Test
     public void testHandleWithParagraphExecutionSuccess() throws Exception {
         when(this.zClient.executeParagraph(any(), any(), 
any(Map.class))).thenReturn(this.paragraphResult);
@@ -158,9 +175,11 @@ public class ZeppelinTaskTest {
 
     @Test
     public void testHandleWithNoteExecutionSuccess() throws Exception {
-        String zeppelinParametersWithNoParagraphId = 
buildZeppelinTaskParametersWithNoParagraphId();
+        String zeppelinTaskParametersWithNoParagraphId = 
buildZeppelinTaskParametersWithNoParagraphId();
         TaskExecutionContext taskExecutionContext = 
mock(TaskExecutionContext.class);
-        
when(taskExecutionContext.getTaskParams()).thenReturn(zeppelinParametersWithNoParagraphId);
+        ResourceParametersHelper resourceParametersHelper = 
mock(ResourceParametersHelper.class);
+        
when(taskExecutionContext.getTaskParams()).thenReturn(zeppelinTaskParametersWithNoParagraphId);
+        
when(taskExecutionContext.getResourceParametersHelper()).thenReturn(resourceParametersHelper);
         this.zeppelinTask = spy(new ZeppelinTask(taskExecutionContext));
         this.zClient = mock(ZeppelinClient.class);
         this.noteResult = mock(NoteResult.class);
@@ -183,6 +202,9 @@ public class ZeppelinTaskTest {
 
         try (MockedStatic<DateUtils> mockedStaticDateUtils = 
Mockito.mockStatic(DateUtils.class)) {
             
when(taskExecutionContext.getTaskParams()).thenReturn(zeppelinParametersWithNoParagraphId);
+            ResourceParametersHelper resourceParametersHelper = 
mock(ResourceParametersHelper.class);
+            
when(taskExecutionContext.getResourceParametersHelper()).thenReturn(resourceParametersHelper);
+
             this.zeppelinTask = spy(new ZeppelinTask(taskExecutionContext));
 
             this.zClient = mock(ZeppelinClient.class);
@@ -211,6 +233,7 @@ public class ZeppelinTaskTest {
         zeppelinParameters.setParagraphId(MOCK_PARAGRAPH_ID);
         zeppelinParameters.setRestEndpoint(MOCK_REST_ENDPOINT);
         zeppelinParameters.setParameters(MOCK_PARAMETERS);
+        zeppelinParameters.setType(MOCK_TYPE);
 
         return JSONUtils.toJsonString(zeppelinParameters);
     }
@@ -220,6 +243,7 @@ public class ZeppelinTaskTest {
         zeppelinParameters.setNoteId(MOCK_NOTE_ID);
         zeppelinParameters.setParameters(MOCK_PARAMETERS);
         zeppelinParameters.setRestEndpoint(MOCK_REST_ENDPOINT);
+        zeppelinParameters.setType(MOCK_TYPE);
 
         return JSONUtils.toJsonString(zeppelinParameters);
     }
@@ -230,6 +254,7 @@ public class ZeppelinTaskTest {
         zeppelinParameters.setParameters(MOCK_PARAMETERS);
         zeppelinParameters.setRestEndpoint(MOCK_REST_ENDPOINT);
         
zeppelinParameters.setProductionNoteDirectory(MOCK_PRODUCTION_DIRECTORY);
+        zeppelinParameters.setType(MOCK_TYPE);
 
         return JSONUtils.toJsonString(zeppelinParameters);
     }
diff --git a/dolphinscheduler-ui/src/locales/zh_CN/datasource.ts 
b/dolphinscheduler-ui/src/locales/zh_CN/datasource.ts
index 6edc3befea..434f339dce 100644
--- a/dolphinscheduler-ui/src/locales/zh_CN/datasource.ts
+++ b/dolphinscheduler-ui/src/locales/zh_CN/datasource.ts
@@ -84,5 +84,7 @@ export default {
   SecretAccessKey: 'SecretAccessKey',
   SecretAccessKey_tips: '请输入SecretAccessKey',
   dbUser: 'DbUser',
-  dbUser_tips: '请输入DbUser'
+  dbUser_tips: '请输入DbUser',
+  zeppelin_rest_endpoint: 'zeppelinRestEndpoint',
+  zeppelin_rest_endpoint_tips: '请输入zeppelin server的rest endpoint'
 }
diff --git a/dolphinscheduler-ui/src/service/modules/data-source/types.ts 
b/dolphinscheduler-ui/src/service/modules/data-source/types.ts
index 59173151af..8e84c7887b 100644
--- a/dolphinscheduler-ui/src/service/modules/data-source/types.ts
+++ b/dolphinscheduler-ui/src/service/modules/data-source/types.ts
@@ -39,6 +39,7 @@ type IDataBase =
   | 'HANA'
   | 'DORIS'
   | 'KYUUBI'
+  | 'ZEPPELIN'
 
 type IDataBaseLabel =
   | 'MYSQL'
@@ -59,6 +60,7 @@ type IDataBaseLabel =
   | 'OCEANBASE'
   | 'SSH'
   | 'KYUUBI'
+  | 'ZEPPELIN'
 
 interface IDataSource {
   id?: number
@@ -80,6 +82,7 @@ interface IDataSource {
   connectType?: string
   other?: object
   endpoint?: string
+  restEndpoint?: string
   MSIClientId?: string
   dbUser?: string
   compatibleMode?: string
diff --git a/dolphinscheduler-ui/src/views/datasource/list/detail.tsx 
b/dolphinscheduler-ui/src/views/datasource/list/detail.tsx
index f44ca4704e..599977accf 100644
--- a/dolphinscheduler-ui/src/views/datasource/list/detail.tsx
+++ b/dolphinscheduler-ui/src/views/datasource/list/detail.tsx
@@ -154,6 +154,7 @@ const DetailModal = defineComponent({
       requiredDataBase,
       showHost,
       showPort,
+      showRestEndpoint,
       showAwsRegion,
       showCompatibleMode,
       showConnectType,
@@ -251,6 +252,21 @@ const DetailModal = defineComponent({
                     placeholder={t('datasource.ip_tips')}
                   />
                 </NFormItem>
+                <NFormItem
+                  v-show={showRestEndpoint}
+                  label={t('datasource.zeppelin_rest_endpoint')}
+                  path='restEndPoint'
+                  show-require-mark
+                >
+                  <NInput
+                    allowInput={this.trim}
+                    class='input-zeppelin_rest_endpoint'
+                    v-model={[detailForm.restEndpoint, 'value']}
+                    type='text'
+                    maxlength={255}
+                    placeholder={t('datasource.zeppelin_rest_endpoint_tips')}
+                  />
+                </NFormItem>
                 <NFormItem
                   v-show={showPort}
                   label={t('datasource.port')}
diff --git a/dolphinscheduler-ui/src/views/datasource/list/use-form.ts 
b/dolphinscheduler-ui/src/views/datasource/list/use-form.ts
index b0585e415b..8561af779f 100644
--- a/dolphinscheduler-ui/src/views/datasource/list/use-form.ts
+++ b/dolphinscheduler-ui/src/views/datasource/list/use-form.ts
@@ -61,6 +61,7 @@ export function useForm(id?: number) {
     showHost: true,
     showPort: true,
     showAwsRegion: false,
+    showRestEndpoint: false,
     showCompatibleMode: false,
     showConnectType: false,
     showPrincipal: false,
@@ -253,11 +254,19 @@ export function useForm(id?: number) {
     } else {
       state.showPrincipal = false
     }
-    if (type === 'SSH') {
+    if (type === 'SSH' || type === 'ZEPPELIN') {
       state.showDataBaseName = false
       state.requiredDataBase = false
       state.showJDBCConnectParameters = false
-      state.showPublicKey = true
+      state.showPublicKey = false
+      if (type === 'SSH') {
+        state.showPublicKey = true
+      }
+      if (type === 'ZEPPELIN') {
+        state.showHost = false
+        state.showPort = false
+        state.showRestEndpoint = true
+      }
     } else {
       state.showDataBaseName = true
       state.requiredDataBase = true
@@ -407,6 +416,11 @@ export const datasourceType: IDataBaseOptionKeys = {
     label: 'HANA',
     defaultPort: 30015
   },
+  ZEPPELIN: {
+    value: 'ZEPPELIN',
+    label: 'ZEPPELIN',
+    defaultPort: 8080
+  },
   DORIS: {
     value: 'DORIS',
     label: 'DORIS',
diff --git 
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datasource.ts
 
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datasource.ts
index 25640583d7..3b2756b5e7 100644
--- 
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datasource.ts
+++ 
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datasource.ts
@@ -133,6 +133,11 @@ export function useDatasource(
       code: 'HANA',
       disabled: false
     },
+    {
+      id: 23,
+      code: 'ZEPPELIN',
+      disabled: false
+    },
     {
       id: 23,
       code: 'DORIS',
diff --git 
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts
 
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts
index a2069b3bd5..eff4777215 100644
--- 
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts
+++ 
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts
@@ -47,23 +47,6 @@ export function useZeppelin(model: { [field: string]: any 
}): IJsonItem[] {
         placeholder: t('project.node.zeppelin_paragraph_id_tips')
       }
     },
-    {
-      type: 'input',
-      field: 'restEndpoint',
-      name: t('project.node.zeppelin_rest_endpoint'),
-      props: {
-        placeholder: t('project.node.zeppelin_rest_endpoint_tips')
-      },
-      validate: {
-        trigger: ['input', 'blur'],
-        required: true,
-        validator(validate: any, value: string) {
-          if (!value) {
-            return new Error(t('project.node.zeppelin_rest_endpoint_tips'))
-          }
-        }
-      }
-    },
     {
       type: 'input',
       field: 'productionNoteDirectory',
@@ -72,22 +55,6 @@ export function useZeppelin(model: { [field: string]: any 
}): IJsonItem[] {
         placeholder: t('project.node.zeppelin_production_note_directory_tips')
       }
     },
-    {
-      type: 'input',
-      field: 'username',
-      name: t('project.node.zeppelin_username'),
-      props: {
-        placeholder: t('project.node.zeppelin_username_tips')
-      }
-    },
-    {
-      type: 'input',
-      field: 'password',
-      name: t('project.node.zeppelin_password'),
-      props: {
-        placeholder: t('project.node.zeppelin_password_tips')
-      }
-    },
     {
       type: 'input',
       field: 'parameters',
diff --git 
a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts 
b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
index 31398f98e6..f9801708e6 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
@@ -347,6 +347,8 @@ export function formatParams(data: INodeData): {
     taskParams.password = data.password
     taskParams.productionNoteDirectory = data.productionNoteDirectory
     taskParams.parameters = data.parameters
+    taskParams.datasource = data.datasource
+    taskParams.type = data.type
   }
 
   if (data.taskType === 'K8S') {
diff --git 
a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-zeppelin.ts
 
b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-zeppelin.ts
index a3419f683e..6516d013cf 100644
--- 
a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-zeppelin.ts
+++ 
b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-zeppelin.ts
@@ -43,7 +43,12 @@ export function useZeppelin({
     workerGroup: 'default',
     delayTime: 0,
     timeout: 30,
-    timeoutNotifyStrategy: ['WARN']
+    type: 'ZEPPELIN',
+    displayRows: 10,
+    timeoutNotifyStrategy: ['WARN'],
+    restEndpoint: '',
+    username: '',
+    password: ''
   } as INodeData)
 
   return {
@@ -60,6 +65,7 @@ export function useZeppelin({
       ...Fields.useFailed(),
       Fields.useDelayTime(model),
       ...Fields.useTimeoutAlarm(model),
+      ...Fields.useDatasource(model),
       ...Fields.useZeppelin(model),
       Fields.usePreTasks()
     ] as IJsonItem[],

Reply via email to