This is an automated email from the ASF dual-hosted git repository.
chufenggao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new f7da8f39db [Feature-16127] Support emr serverless spark (#16126)
f7da8f39db is described below
commit f7da8f39db96f95da4471d78aa01b98827165657
Author: Eric Gao <[email protected]>
AuthorDate: Fri Aug 2 12:10:32 2024 +0800
[Feature-16127] Support emr serverless spark (#16126)
* support emr serverless spark task
Signed-off-by: EricGao888 <[email protected]>
---------
Signed-off-by: EricGao888 <[email protected]>
---
docs/docs/en/guide/task/aliyun-serverless-spark.md | 111 ++++++++++
docs/docs/zh/guide/task/aliyun-serverless-spark.md | 111 ++++++++++
docs/img/tasks/demo/aliyun_serverless_spark_1.png | Bin 0 -> 780243 bytes
docs/img/tasks/demo/aliyun_serverless_spark_2.png | Bin 0 -> 967682 bytes
docs/img/tasks/demo/aliyun_serverless_spark_3.png | Bin 0 -> 940847 bytes
docs/img/tasks/demo/aliyun_serverless_spark_4.png | Bin 0 -> 1016310 bytes
.../src/main/resources/task-type-config.yaml | 1 +
.../pom.xml | 57 +++++
.../AliyunServerlessSparkClientWrapper.java | 69 ++++++
.../AliyunServerlessSparkConstants.java | 41 ++++
.../AliyunServerlessSparkDataSourceChannel.java | 37 ++++
...yunServerlessSparkDataSourceChannelFactory.java | 39 ++++
.../AliyunServerlessSparkUtils.java | 41 ++++
.../AliyunServerlessSparkConnectionParam.java | 37 ++++
.../AliyunServerlessSparkDataSourceParamDTO.java | 40 ++++
.../AliyunServerlessSparkDataSourceProcessor.java | 154 +++++++++++++
...iyunServerlessSparkDataSourceProcessorTest.java | 117 ++++++++++
.../dolphinscheduler-datasource-all/pom.xml | 5 +
dolphinscheduler-datasource-plugin/pom.xml | 1 +
.../apache/dolphinscheduler/spi/enums/DbType.java | 5 +-
.../pom.xml | 68 ++++++
.../AliyunServerlessSparkParameters.java | 67 ++++++
.../AliyunServerlessSparkTask.java | 246 +++++++++++++++++++++
.../AliyunServerlessSparkTaskChannel.java | 37 ++++
.../AliyunServerlessSparkTaskChannelFactory.java | 36 +++
.../AliyunServerlessSparkTaskException.java | 33 +++
.../task/aliyunserverlessspark/RunState.java | 51 +++++
.../AliyunServerlessSparkTaskTest.java | 208 +++++++++++++++++
.../dolphinscheduler-task-all/pom.xml | 6 +
dolphinscheduler-task-plugin/pom.xml | 1 +
.../images/task-icons/aliyun_serverless_spark.png | Bin 0 -> 18362 bytes
.../task-icons/aliyun_serverless_spark_hover.png | Bin 0 -> 18583 bytes
.../src/locales/en_US/datasource.ts | 12 +-
dolphinscheduler-ui/src/locales/en_US/project.ts | 28 ++-
.../src/locales/zh_CN/datasource.ts | 12 +-
dolphinscheduler-ui/src/locales/zh_CN/project.ts | 28 ++-
.../src/service/modules/data-source/types.ts | 7 +-
dolphinscheduler-ui/src/store/project/task-type.ts | 4 +
dolphinscheduler-ui/src/store/project/types.ts | 1 +
.../src/views/datasource/list/detail.tsx | 69 +++++-
.../src/views/datasource/list/use-form.ts | 27 ++-
.../projects/task/components/node/fields/index.ts | 1 +
.../node/fields/use-aliyun-serverless-spark.ts | 173 +++++++++++++++
.../task/components/node/fields/use-datasource.ts | 5 +
.../projects/task/components/node/format-data.ts | 14 ++
.../projects/task/components/node/tasks/index.ts | 4 +-
.../node/tasks/use-aliyun-serverless-spark.ts | 74 +++++++
.../views/projects/task/components/node/types.ts | 11 +
.../src/views/projects/task/constants/task-type.ts | 5 +
.../workflow/components/dag/dag.module.scss | 6 +
tools/dependencies/known-dependencies.txt | 8 +
51 files changed, 2094 insertions(+), 14 deletions(-)
diff --git a/docs/docs/en/guide/task/aliyun-serverless-spark.md
b/docs/docs/en/guide/task/aliyun-serverless-spark.md
new file mode 100644
index 0000000000..6980401bbd
--- /dev/null
+++ b/docs/docs/en/guide/task/aliyun-serverless-spark.md
@@ -0,0 +1,111 @@
+# Aliyun EMR Serverless Spark
+
+## Introduction
+
+`Aliyun EMR Serverless Spark` task plugin submits spark job to
+[`Aliyun EMR Serverless
Spark`](https://help.aliyun.com/zh/emr/emr-serverless-spark/product-overview/what-is-emr-serverless-spark)
service.
+
+## Create Connections
+
+- Click `Datasource -> Create Datasource -> ALIYUN_SERVERLESS_SPARK` to create
a connection.
+
+
+
+- Fill in `Datasource Name`, `Access Key Id`, `Access Key Secret`, `Region Id`
and click `Confirm`.
+
+
+
+## Create Tasks
+
+- Click `Porject -> Workflow Definition -> Create Workflow` and drag the
`ALIYUN_SERVERLESS_SPARK` task to the canvas.
+
+
+
+- Fill in the task parameters and click `Confirm` to create the task node.
+
+
+
+## Task Parameters
+
+- Please refer to [DolphinScheduler Task Parameters Appendix](appendix.md)
`Default Task Parameters` section for default parameters.
+
+| **Parameters** |
**Description** |
+|-------------------------|-----------------------------------------------------------------------------------------------------|
+| Datasource types | The type of datasource the task uses, should be
`ALIYUN_SERVERLESS_SPARK`. |
+| Datasource instances | The instance of `ALIYUN_SERVERLESS_SPARK`
datasource. |
+| workspace id | `Aliyun Serverless Spark` workspace id.
|
+| resource queue id | `Aliyun Serverless Spark` resource queue the task
uses to submit spark job. |
+| code type | `Aliyun Serverless Spark` code type, could be
`JAR`, `PYTHON` or `SQL`. |
+| job name | `Aliyun Serverless Spark` job name.
|
+| entry point | The location of the job code such as jar package,
python file, or sql file. OSS location supported. |
+| entry point arguments | Arguments of the job main program.
|
+| spark submit parameters | Spark-submit related parameters.
|
+| engine release version | Spark engine release version.
|
+| is production | Whether the spark job runs in production or
development environment. |
+
+## Examples
+
+### Submit Jar tasks
+
+| **Parameters** |
**Example Values / Operations**
|
+|-------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| region id | cn-hangzhou
|
+| access key id | <your-access-key-id>
|
+| access key secret | <your-access-key-secret>
|
+| resource queue id | root_queue
|
+| code type | JAR
|
+| job name | ds-emr-spark-jar
|
+| entry point |
oss://datadev-oss-hdfs-test/spark-resource/examples/jars/spark-examples_2.12-3.3.1.jar
|
+| entry point arguments | 100
|
+| spark submit parameters | --class org.apache.spark.examples.SparkPi --conf
spark.executor.cores=4 --conf spark.executor.memory=20g --conf
spark.driver.cores=4 --conf spark.driver.memory=8g --conf
spark.executor.instances=1 |
+| engine release version | esr-2.1-native (Spark 3.3.1, Scala 2.12, Native
Runtime)
|
+| is production | Please open the switch
|
+
+### Submit SQL tasks
+
+| **Parameters** |
**Example Values / Operations**
|
+|-------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| region id | cn-hangzhou
|
+| access key id | <your-access-key-id>
|
+| access key secret | <your-access-key-secret>
|
+| resource queue id | root_queue
|
+| code type | SQL
|
+| job name | ds-emr-spark-sql-1
|
+| entry point | Any non-empty string
|
+| entry point arguments | -e#show tables;show tables;
|
+| spark submit parameters | --class
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf
spark.executor.cores=4 --conf spark.executor.memory=20g --conf
spark.driver.cores=4 --conf spark.driver.memory=8g --conf
spark.executor.instances=1 |
+| engine release version | esr-2.1-native (Spark 3.3.1, Scala 2.12, Native
Runtime)
|
+| is production | Please open the switch
|
+
+### Submit SQL tasks located in OSS
+
+| **Parameters** |
**Example Values / Operations**
|
+|-------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| region id | cn-hangzhou
|
+| access key id | <your-access-key-id>
|
+| access key secret | <your-access-key-secret>
|
+| resource queue id | root_queue
|
+| code type | SQL
|
+| job name | ds-emr-spark-sql-2
|
+| entry point | Any non-empty string
|
+| entry point arguments |
-f#oss://datadev-oss-hdfs-test/spark-resource/examples/sql/show_db.sql
|
+| spark submit parameters | --class
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf
spark.executor.cores=4 --conf spark.executor.memory=20g --conf
spark.driver.cores=4 --conf spark.driver.memory=8g --conf
spark.executor.instances=1" |
+| engine release version | esr-2.1-native (Spark 3.3.1, Scala 2.12, Native
Runtime)
|
+| is production | Please open the switch
|
+
+### Submit PySpark Tasks
+
+| **Parameters** |
**Example Values / Operations**
|
+|-------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| region id | cn-hangzhou
|
+| access key id | <your-access-key-id>
|
+| access key secret | <your-access-key-secret>
|
+| resource queue id | root_queue
|
+| code type | PYTHON
|
+| job name | ds-emr-spark-python
|
+| entry point |
oss://datadev-oss-hdfs-test/spark-resource/examples/src/main/python/pi.py
|
+| entry point arguments | 100
|
+| spark submit parameters | --conf spark.executor.cores=4 --conf
spark.executor.memory=20g --conf spark.driver.cores=4 --conf
spark.driver.memory=8g --conf spark.executor.instances=1 |
+| engine release version | esr-2.1-native (Spark 3.3.1, Scala 2.12, Native
Runtime)
|
+| is production | Please open the switch
|
+
diff --git a/docs/docs/zh/guide/task/aliyun-serverless-spark.md
b/docs/docs/zh/guide/task/aliyun-serverless-spark.md
new file mode 100644
index 0000000000..2a10ecf6f2
--- /dev/null
+++ b/docs/docs/zh/guide/task/aliyun-serverless-spark.md
@@ -0,0 +1,111 @@
+# Aliyun EMR Serverless Spark
+
+## 简介
+
+`Aliyun EMR Serverless Spark` 任务插件用于向
+[`阿里云EMR Serverless
Spark`](https://help.aliyun.com/zh/emr/emr-serverless-spark/product-overview/what-is-emr-serverless-spark)
服务提交作业。
+
+## 创建链接
+
+- 点击 `数据源 -> 创建数据源 -> ALIYUN_SERVERLESS_SPARK` 创建链接。
+
+
+
+- 填入 `Datasource Name`, `Access Key Id`, `Access Key Secret`, `Region Id`
参数并且点击 `确认`.
+
+
+
+## 创建任务节点
+
+- 点击 `项目 -> 工作流定义 -> 创建工作流` 并且将 `ALIYUN_SERVERLESS_SPARK` 任务拖到画板中。
+
+
+
+- 填入相关任务参数并且点击 `确认` 创建任务节点。
+
+
+
+## 任务参数
+
+- 默认参数说明请参考[DolphinScheduler任务参数附录](appendix.md)`默认任务参数`一栏。
+
+| **任务参数** | **描述**
|
+|-------------------------|----------------------------------------------------------|
+| Datasource types | 链接类型,应该选择 `ALIYUN_SERVERLESS_SPARK`。
|
+| Datasource instances | `ALIYUN_SERVERLESS_SPARK` 链接实例。
|
+| workspace id | `Aliyun Serverless Spark` 工作空间id。
|
+| resource queue id | `Aliyun Serverless Spark` 任务队列id。
|
+| code type | `Aliyun Serverless Spark`
任务类型,可以是`JAR`、`PYTHON`或者`SQL`。 |
+| job name | `Aliyun Serverless Spark` 任务名。
|
+| entry point | 任务代码(JAR包、PYTHON / SQL脚本)的位置,支持OSS中的文件。
|
+| entry point arguments | 主程序入口参数。
|
+| spark submit parameters | Spark-submit相关参数。
|
+| engine release version | Spark引擎版本。
|
+| is production | Spark任务是否运行在生产环境中。
|
+
+## 示例
+
+### 提交jar类型任务
+
+| **参数名** |
**参数值 / 按钮操作**
|
+|-------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| region id | cn-hangzhou
|
+| access key id | <your-access-key-id>
|
+| access key secret | <your-access-key-secret>
|
+| resource queue id | root_queue
|
+| code type | JAR
|
+| job name | ds-emr-spark-jar
|
+| entry point |
oss://datadev-oss-hdfs-test/spark-resource/examples/jars/spark-examples_2.12-3.3.1.jar
|
+| entry point arguments | 100
|
+| spark submit parameters | --class org.apache.spark.examples.SparkPi --conf
spark.executor.cores=4 --conf spark.executor.memory=20g --conf
spark.driver.cores=4 --conf spark.driver.memory=8g --conf
spark.executor.instances=1 |
+| engine release version | esr-2.1-native (Spark 3.3.1, Scala 2.12, Native
Runtime)
|
+| is production | 请您将按钮打开
|
+
+### 提交sql类型任务
+
+| **参数名** |
**参数值 / 按钮操作**
|
+|-------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| region id | cn-hangzhou
|
+| access key id | <your-access-key-id>
|
+| access key secret | <your-access-key-secret>
|
+| resource queue id | root_queue
|
+| code type | SQL
|
+| job name | ds-emr-spark-sql-1
|
+| entry point | 任意非空值
|
+| entry point arguments | -e#show tables;show tables;
|
+| spark submit parameters | --class
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf
spark.executor.cores=4 --conf spark.executor.memory=20g --conf
spark.driver.cores=4 --conf spark.driver.memory=8g --conf
spark.executor.instances=1 |
+| engine release version | esr-2.1-native (Spark 3.3.1, Scala 2.12, Native
Runtime)
|
+| is production | 请您将按钮打开
|
+
+### 提交oss中的sql脚本任务
+
+| **参数名** |
**参数值 / 按钮操作**
|
+|-------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| region id | cn-hangzhou
|
+| access key id | <your-access-key-id>
|
+| access key secret | <your-access-key-secret>
|
+| resource queue id | root_queue
|
+| code type | SQL
|
+| job name | ds-emr-spark-sql-2
|
+| entry point | 任意非空值
|
+| entry point arguments |
-f#oss://datadev-oss-hdfs-test/spark-resource/examples/sql/show_db.sql
|
+| spark submit parameters | --class
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf
spark.executor.cores=4 --conf spark.executor.memory=20g --conf
spark.driver.cores=4 --conf spark.driver.memory=8g --conf
spark.executor.instances=1" |
+| engine release version | esr-2.1-native (Spark 3.3.1, Scala 2.12, Native
Runtime)
|
+| is production | 请您将按钮打开
|
+
+### 提交pyspark任务
+
+| **参数名** |
**参数值 / 按钮操作**
|
+|-------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| region id | cn-hangzhou
|
+| access key id | <your-access-key-id>
|
+| access key secret | <your-access-key-secret>
|
+| resource queue id | root_queue
|
+| code type | PYTHON
|
+| job name | ds-emr-spark-python
|
+| entry point |
oss://datadev-oss-hdfs-test/spark-resource/examples/src/main/python/pi.py
|
+| entry point arguments | 100
|
+| spark submit parameters | --conf spark.executor.cores=4 --conf
spark.executor.memory=20g --conf spark.driver.cores=4 --conf
spark.driver.memory=8g --conf spark.executor.instances=1 |
+| engine release version | esr-2.1-native (Spark 3.3.1, Scala 2.12, Native
Runtime)
|
+| is production | 请您将按钮打开
|
+
diff --git a/docs/img/tasks/demo/aliyun_serverless_spark_1.png
b/docs/img/tasks/demo/aliyun_serverless_spark_1.png
new file mode 100644
index 0000000000..04a793d6ba
Binary files /dev/null and b/docs/img/tasks/demo/aliyun_serverless_spark_1.png
differ
diff --git a/docs/img/tasks/demo/aliyun_serverless_spark_2.png
b/docs/img/tasks/demo/aliyun_serverless_spark_2.png
new file mode 100644
index 0000000000..3d096a89fe
Binary files /dev/null and b/docs/img/tasks/demo/aliyun_serverless_spark_2.png
differ
diff --git a/docs/img/tasks/demo/aliyun_serverless_spark_3.png
b/docs/img/tasks/demo/aliyun_serverless_spark_3.png
new file mode 100644
index 0000000000..b7c96133f6
Binary files /dev/null and b/docs/img/tasks/demo/aliyun_serverless_spark_3.png
differ
diff --git a/docs/img/tasks/demo/aliyun_serverless_spark_4.png
b/docs/img/tasks/demo/aliyun_serverless_spark_4.png
new file mode 100644
index 0000000000..61d8370a22
Binary files /dev/null and b/docs/img/tasks/demo/aliyun_serverless_spark_4.png
differ
diff --git a/dolphinscheduler-api/src/main/resources/task-type-config.yaml
b/dolphinscheduler-api/src/main/resources/task-type-config.yaml
index 05d1e6290a..d92f41f3c5 100644
--- a/dolphinscheduler-api/src/main/resources/task-type-config.yaml
+++ b/dolphinscheduler-api/src/main/resources/task-type-config.yaml
@@ -35,6 +35,7 @@ task:
- 'K8S'
- 'DMS'
- 'DATA_FACTORY'
+ - 'ALIYUN_SERVERLESS_SPARK'
logic:
- 'SUB_PROCESS'
- 'DEPENDENT'
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/pom.xml
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/pom.xml
new file mode 100644
index 0000000000..80226fe367
--- /dev/null
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/pom.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-datasource-plugin</artifactId>
+ <version>dev-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>dolphinscheduler-datasource-aliyunserverlessspark</artifactId>
+ <packaging>jar</packaging>
+ <name>${project.artifactId}</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-spi</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-datasource-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.aliyun</groupId>
+ <artifactId>emr_serverless_spark20230808</artifactId>
+ <version>1.0.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.aliyun</groupId>
+ <artifactId>credentials-java</artifactId>
+ <version>0.3.0</version>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkClientWrapper.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkClientWrapper.java
new file mode 100644
index 0000000000..55c21972ef
--- /dev/null
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkClientWrapper.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.commons.lang3.StringUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.aliyun.emr_serverless_spark20230808.Client;
+import com.aliyun.teaopenapi.models.Config;
+
+@Slf4j
+public class AliyunServerlessSparkClientWrapper implements AutoCloseable {
+
+ private Client aliyunServerlessSparkClient;
+
+ public AliyunServerlessSparkClientWrapper(
+ String accessKeyId,
+ String accessKeySecret,
+ String regionId,
+ String endpoint)
+ throws
Exception {
+
+ checkNotNull(accessKeyId, accessKeySecret, regionId);
+
+ if (StringUtils.isEmpty(endpoint)) {
+ endpoint =
String.format(AliyunServerlessSparkConstants.ENDPOINT_TEMPLATE, regionId);
+ }
+
+ Config config = new Config()
+ .setEndpoint(endpoint)
+ .setAccessKeyId(accessKeyId)
+ .setAccessKeySecret(accessKeySecret);
+ aliyunServerlessSparkClient = new Client(config);
+ }
+
+ // TODO: update checkConnect when aliyun serverless spark service support
the service connection check
+ public boolean checkConnect(String accessKeyId, String accessKeySecret,
String regionId) {
+ try {
+ // If the login fails, an exception will be thrown directly
+ return true;
+ } catch (Exception e) {
+ log.info("spark client failed to connect to the server", e);
+ return false;
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+}
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkConstants.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkConstants.java
new file mode 100644
index 0000000000..d28bd3e7ee
--- /dev/null
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkConstants.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark;
+
+import lombok.experimental.UtilityClass;
+
+@UtilityClass
+public class AliyunServerlessSparkConstants {
+
+ public String ENDPOINT_TEMPLATE = "emr-serverless-spark.%s.aliyuncs.com";
+
+ public String DEFAULT_ENGINE = "esr-2.1-native (Spark 3.3.1, Scala 2.12,
Native Runtime)";
+
+ public String ENV_PROD = "production";
+
+ public String ENV_DEV = "dev";
+
+ public String ENTRY_POINT_ARGUMENTS_DELIMITER = "#";
+
+ public String ENV_KEY = "environment";
+
+ public String WORKFLOW_KEY = "workflow";
+
+ public String WORKFLOW_VALUE = "true";
+
+}
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkDataSourceChannel.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkDataSourceChannel.java
new file mode 100644
index 0000000000..3821a1fdbe
--- /dev/null
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkDataSourceChannel.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark;
+
+import org.apache.dolphinscheduler.spi.datasource.AdHocDataSourceClient;
+import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
+import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel;
+import org.apache.dolphinscheduler.spi.datasource.PooledDataSourceClient;
+import org.apache.dolphinscheduler.spi.enums.DbType;
+
+public class AliyunServerlessSparkDataSourceChannel implements
DataSourceChannel {
+
+ @Override
+ public AdHocDataSourceClient
createAdHocDataSourceClient(BaseConnectionParam baseConnectionParam, DbType
dbType) {
+ throw new UnsupportedOperationException("Aliyun Serverless Spark
AdHocDataSourceClient is not supported");
+ }
+
+ @Override
+ public PooledDataSourceClient
createPooledDataSourceClient(BaseConnectionParam baseConnectionParam, DbType
dbType) {
+ throw new UnsupportedOperationException("Aliyun Serverless Spark
AdHocDataSourceClient is not supported");
+ }
+}
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkDataSourceChannelFactory.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkDataSourceChannelFactory.java
new file mode 100644
index 0000000000..851110aeab
--- /dev/null
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkDataSourceChannelFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark;
+
+import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel;
+import org.apache.dolphinscheduler.spi.datasource.DataSourceChannelFactory;
+import org.apache.dolphinscheduler.spi.enums.DbType;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(DataSourceChannelFactory.class)
+public class AliyunServerlessSparkDataSourceChannelFactory implements
DataSourceChannelFactory {
+
+ @Override
+ public DataSourceChannel create() {
+ return new AliyunServerlessSparkDataSourceChannel();
+ }
+
+ @Override
+ public String getName() {
+ return DbType.ALIYUN_SERVERLESS_SPARK.getName();
+ }
+
+}
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkUtils.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkUtils.java
new file mode 100644
index 0000000000..bb53ff8b88
--- /dev/null
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkUtils.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark;
+
+import
org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark.param.AliyunServerlessSparkConnectionParam;
+
+import com.aliyun.emr_serverless_spark20230808.Client;
+import com.aliyun.teaopenapi.models.Config;
+
+public class AliyunServerlessSparkUtils {
+
+ private AliyunServerlessSparkUtils() {
+ throw new IllegalStateException("Utility class");
+ }
+
+ public static Client
getAliyunServerlessSparkClient(AliyunServerlessSparkConnectionParam
connectionParam) throws Exception {
+ String endpoint =
+
String.format(AliyunServerlessSparkConstants.ENDPOINT_TEMPLATE,
connectionParam.getRegionId());
+ Config config = new Config()
+ .setEndpoint(endpoint)
+ .setAccessKeyId(connectionParam.getAccessKeyId())
+ .setAccessKeySecret(connectionParam.getAccessKeySecret());
+ return new Client(config);
+ }
+
+}
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/param/AliyunServerlessSparkConnectionParam.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/param/AliyunServerlessSparkConnectionParam.java
new file mode 100644
index 0000000000..5570465650
--- /dev/null
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/param/AliyunServerlessSparkConnectionParam.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark.param;
+
+import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
+
+import lombok.Data;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+
+@Data
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class AliyunServerlessSparkConnectionParam implements ConnectionParam {
+
+ protected String accessKeyId;
+
+ protected String accessKeySecret;
+
+ protected String regionId;
+
+ protected String endpoint;
+}
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/param/AliyunServerlessSparkDataSourceParamDTO.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/param/AliyunServerlessSparkDataSourceParamDTO.java
new file mode 100644
index 0000000000..703dc4c879
--- /dev/null
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/param/AliyunServerlessSparkDataSourceParamDTO.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark.param;
+
+import
org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
+import org.apache.dolphinscheduler.spi.enums.DbType;
+
+import lombok.Data;
+
+@Data
+public class AliyunServerlessSparkDataSourceParamDTO extends
BaseDataSourceParamDTO {
+
+ protected String accessKeyId;
+
+ protected String accessKeySecret;
+
+ protected String regionId;
+
+ protected String endpoint;
+
+ @Override
+ public DbType getType() {
+ return DbType.ALIYUN_SERVERLESS_SPARK;
+ }
+}
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/param/AliyunServerlessSparkDataSourceProcessor.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/param/AliyunServerlessSparkDataSourceProcessor.java
new file mode 100644
index 0000000000..ac1905e237
--- /dev/null
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/param/AliyunServerlessSparkDataSourceProcessor.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark.param;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import
org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark.AliyunServerlessSparkClientWrapper;
+import
org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor;
+import
org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
+import
org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
+import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
+import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
+import org.apache.dolphinscheduler.spi.enums.DbType;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.sql.Connection;
+import java.text.MessageFormat;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(DataSourceProcessor.class)
+@Slf4j
+public class AliyunServerlessSparkDataSourceProcessor extends
AbstractDataSourceProcessor {
+
+ @Override
+ public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) {
+ return JSONUtils.parseObject(paramJson,
AliyunServerlessSparkDataSourceParamDTO.class);
+ }
+
+ @Override
+ public void checkDatasourceParam(BaseDataSourceParamDTO
datasourceParamDTO) {
+ AliyunServerlessSparkDataSourceParamDTO
aliyunServerlessSparkDataSourceParamDTO =
+ (AliyunServerlessSparkDataSourceParamDTO) datasourceParamDTO;
+ if
(StringUtils.isEmpty(aliyunServerlessSparkDataSourceParamDTO.getRegionId()) ||
+
StringUtils.isEmpty(aliyunServerlessSparkDataSourceParamDTO.getAccessKeyId()) ||
+
StringUtils.isEmpty(aliyunServerlessSparkDataSourceParamDTO.getRegionId())) {
+ throw new IllegalArgumentException("spark datasource param is not
valid");
+ }
+ }
+
+ @Override
+ public String getDatasourceUniqueId(ConnectionParam connectionParam,
DbType dbType) {
+ AliyunServerlessSparkConnectionParam baseConnectionParam =
+ (AliyunServerlessSparkConnectionParam) connectionParam;
+ return MessageFormat.format(
+ "{0}@{1}@{2}@{3}",
+ dbType.getName(),
+ baseConnectionParam.getRegionId(),
+
PasswordUtils.encodePassword(baseConnectionParam.getAccessKeyId()),
+
PasswordUtils.encodePassword(baseConnectionParam.getAccessKeySecret()));
+ }
+
+ @Override
+ public BaseDataSourceParamDTO createDatasourceParamDTO(String
connectionJson) {
+ AliyunServerlessSparkConnectionParam connectionParams =
+ (AliyunServerlessSparkConnectionParam)
createConnectionParams(connectionJson);
+ AliyunServerlessSparkDataSourceParamDTO
aliyunServerlessSparkDataSourceParamDTO =
+ new AliyunServerlessSparkDataSourceParamDTO();
+
+
aliyunServerlessSparkDataSourceParamDTO.setAccessKeyId(connectionParams.getAccessKeyId());
+
aliyunServerlessSparkDataSourceParamDTO.setAccessKeySecret(connectionParams.getAccessKeySecret());
+
aliyunServerlessSparkDataSourceParamDTO.setRegionId(connectionParams.getRegionId());
+
aliyunServerlessSparkDataSourceParamDTO.setEndpoint(connectionParams.getEndpoint());
+ return aliyunServerlessSparkDataSourceParamDTO;
+ }
+
+ @Override
+ public AliyunServerlessSparkConnectionParam
createConnectionParams(BaseDataSourceParamDTO datasourceParam) {
+ AliyunServerlessSparkDataSourceParamDTO
aliyunServerlessSparkDataSourceParamDTO =
+ (AliyunServerlessSparkDataSourceParamDTO) datasourceParam;
+ AliyunServerlessSparkConnectionParam
aliyunServerlessSparkConnectionParam =
+ new AliyunServerlessSparkConnectionParam();
+
aliyunServerlessSparkConnectionParam.setAccessKeyId(aliyunServerlessSparkDataSourceParamDTO.getAccessKeyId());
+ aliyunServerlessSparkConnectionParam
+
.setAccessKeySecret(aliyunServerlessSparkDataSourceParamDTO.getAccessKeySecret());
+
aliyunServerlessSparkConnectionParam.setRegionId(aliyunServerlessSparkDataSourceParamDTO.getRegionId());
+
aliyunServerlessSparkConnectionParam.setEndpoint(aliyunServerlessSparkDataSourceParamDTO.getEndpoint());
+
+ return aliyunServerlessSparkConnectionParam;
+ }
+
+ @Override
+ public ConnectionParam createConnectionParams(String connectionJson) {
+ return JSONUtils.parseObject(connectionJson,
AliyunServerlessSparkConnectionParam.class);
+ }
+
+ @Override
+ public String getDatasourceDriver() {
+ return "";
+ }
+
+ @Override
+ public String getValidationQuery() {
+ return "";
+ }
+
+ @Override
+ public String getJdbcUrl(ConnectionParam connectionParam) {
+ return "";
+ }
+
+ @Override
+ public Connection getConnection(ConnectionParam connectionParam) {
+ return null;
+ }
+
+ @Override
+ public boolean checkDataSourceConnectivity(ConnectionParam
connectionParam) {
+ AliyunServerlessSparkConnectionParam baseConnectionParam =
+ (AliyunServerlessSparkConnectionParam) connectionParam;
+ try (
+ AliyunServerlessSparkClientWrapper
aliyunServerlessSparkClientWrapper =
+ new AliyunServerlessSparkClientWrapper(
+ baseConnectionParam.getAccessKeyId(),
+ baseConnectionParam.getAccessKeySecret(),
+ baseConnectionParam.getRegionId(),
+ baseConnectionParam.getEndpoint())) {
+ return aliyunServerlessSparkClientWrapper.checkConnect(
+ baseConnectionParam.getAccessKeyId(),
+ baseConnectionParam.getAccessKeySecret(),
+ baseConnectionParam.getRegionId());
+ } catch (Exception e) {
+ log.error("spark client failed to connect to the server", e);
+ return false;
+ }
+ }
+
+ @Override
+ public DbType getDbType() {
+ return DbType.ALIYUN_SERVERLESS_SPARK;
+ }
+
+ @Override
+ public DataSourceProcessor create() {
+ return new AliyunServerlessSparkDataSourceProcessor();
+ }
+}
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/test/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkDataSourceProcessorTest.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/test/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkDataSourceProcessorTest.java
new file mode 100644
index 0000000000..9973ef10f5
--- /dev/null
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-aliyunserverlessspark/src/test/java/org/apache/dolphinscheduler/plugin/datasource/aliyunserverlessspark/AliyunServerlessSparkDataSourceProcessorTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark;
+
+import
org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark.param.AliyunServerlessSparkConnectionParam;
+import
org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark.param.AliyunServerlessSparkDataSourceParamDTO;
+import
org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark.param.AliyunServerlessSparkDataSourceProcessor;
+import org.apache.dolphinscheduler.spi.enums.DbType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.MockedConstruction;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class AliyunServerlessSparkDataSourceProcessorTest {
+
+ private AliyunServerlessSparkDataSourceProcessor
aliyunServerlessSparkDataSourceProcessor;
+
+ private String connectJson =
+
"{\"accessKeyId\":\"mockAccessKeyId\",\"accessKeySecret\":\"mockAccessKeySecret\",\"regionId\":\"cn-hangzhou\"}";
+
+ @BeforeEach
+ public void init() {
+ aliyunServerlessSparkDataSourceProcessor = new
AliyunServerlessSparkDataSourceProcessor();
+ }
+
+ @Test
+ void testCheckDatasourceParam() {
+ AliyunServerlessSparkDataSourceParamDTO
aliyunServerlessSparkDataSourceParamDTO =
+ new AliyunServerlessSparkDataSourceParamDTO();
+ aliyunServerlessSparkDataSourceParamDTO.setRegionId("cn-hangzhou");
+ Assertions.assertThrows(IllegalArgumentException.class,
+ () -> aliyunServerlessSparkDataSourceProcessor
+
.checkDatasourceParam(aliyunServerlessSparkDataSourceParamDTO));
+
aliyunServerlessSparkDataSourceParamDTO.setAccessKeyId("mockAccessKeyId");
+
aliyunServerlessSparkDataSourceParamDTO.setAccessKeySecret("mockAccessKeySecret");
+ Assertions
+ .assertDoesNotThrow(() ->
aliyunServerlessSparkDataSourceProcessor
+
.checkDatasourceParam(aliyunServerlessSparkDataSourceParamDTO));
+ }
+
+ @Test
+ void testGetDatasourceUniqueId() {
+ AliyunServerlessSparkConnectionParam
aliyunServerlessSparkConnectionParam =
+ new AliyunServerlessSparkConnectionParam();
+ aliyunServerlessSparkConnectionParam.setRegionId("cn-hangzhou");
+ aliyunServerlessSparkConnectionParam.setAccessKeyId("mockAccessKeyId");
+
aliyunServerlessSparkConnectionParam.setAccessKeySecret("mockAccessKeySecret");
+
Assertions.assertEquals("aliyun_serverless_spark@cn-hangzhou@mockAccessKeyId@mockAccessKeySecret",
+
aliyunServerlessSparkDataSourceProcessor.getDatasourceUniqueId(aliyunServerlessSparkConnectionParam,
+ DbType.ALIYUN_SERVERLESS_SPARK));
+ }
+
+ @Test
+ void testCreateDatasourceParamDTO() {
+ AliyunServerlessSparkDataSourceParamDTO
aliyunServerlessSparkDataSourceParamDTO =
+ (AliyunServerlessSparkDataSourceParamDTO)
aliyunServerlessSparkDataSourceProcessor
+ .createDatasourceParamDTO(connectJson);
+ Assertions.assertEquals("cn-hangzhou",
aliyunServerlessSparkDataSourceParamDTO.getRegionId());
+ Assertions.assertEquals("mockAccessKeyId",
aliyunServerlessSparkDataSourceParamDTO.getAccessKeyId());
+ Assertions.assertEquals("mockAccessKeySecret",
aliyunServerlessSparkDataSourceParamDTO.getAccessKeySecret());
+ }
+
+ @Test
+ void testCreateConnectionParams() {
+ AliyunServerlessSparkDataSourceParamDTO
aliyunServerlessSparkDataSourceParamDTO =
+ (AliyunServerlessSparkDataSourceParamDTO)
aliyunServerlessSparkDataSourceProcessor
+ .createDatasourceParamDTO(connectJson);
+ AliyunServerlessSparkConnectionParam
aliyunServerlessSparkConnectionParam =
+ aliyunServerlessSparkDataSourceProcessor
+
.createConnectionParams(aliyunServerlessSparkDataSourceParamDTO);
+ Assertions.assertEquals("cn-hangzhou",
aliyunServerlessSparkConnectionParam.getRegionId());
+ Assertions.assertEquals("mockAccessKeyId",
aliyunServerlessSparkConnectionParam.getAccessKeyId());
+ Assertions.assertEquals("mockAccessKeySecret",
aliyunServerlessSparkConnectionParam.getAccessKeySecret());
+ }
+
+ @Test
+ void testTestConnection() {
+ AliyunServerlessSparkDataSourceParamDTO
aliyunServerlessSparkDataSourceParamDTO =
+ (AliyunServerlessSparkDataSourceParamDTO)
aliyunServerlessSparkDataSourceProcessor
+ .createDatasourceParamDTO(connectJson);
+ AliyunServerlessSparkConnectionParam connectionParam =
+ aliyunServerlessSparkDataSourceProcessor
+
.createConnectionParams(aliyunServerlessSparkDataSourceParamDTO);
+
Assertions.assertTrue(aliyunServerlessSparkDataSourceProcessor.checkDataSourceConnectivity(connectionParam));
+ try (
+ MockedConstruction<AliyunServerlessSparkClientWrapper>
AliyunServerlessSparkClientWrapper =
+
Mockito.mockConstruction(AliyunServerlessSparkClientWrapper.class, (mock,
context) -> {
+ Mockito.when(
+
mock.checkConnect(connectionParam.getAccessKeyId(),
+
connectionParam.getAccessKeySecret(), connectionParam.getRegionId()))
+ .thenReturn(true);
+ })) {
+ Assertions
+
.assertTrue(aliyunServerlessSparkDataSourceProcessor.checkDataSourceConnectivity(connectionParam));
+ }
+ }
+}
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml
index effe3c9abb..b62a3ce414 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml
@@ -158,5 +158,10 @@
<artifactId>dolphinscheduler-datasource-hana</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.dolphinscheduler</groupId>
+
<artifactId>dolphinscheduler-datasource-aliyunserverlessspark</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git a/dolphinscheduler-datasource-plugin/pom.xml
b/dolphinscheduler-datasource-plugin/pom.xml
index c30a6b4258..1f712364d9 100644
--- a/dolphinscheduler-datasource-plugin/pom.xml
+++ b/dolphinscheduler-datasource-plugin/pom.xml
@@ -56,6 +56,7 @@
<module>dolphinscheduler-datasource-sagemaker</module>
<module>dolphinscheduler-datasource-k8s</module>
<module>dolphinscheduler-datasource-hana</module>
+ <module>dolphinscheduler-datasource-aliyunserverlessspark</module>
</modules>
<dependencyManagement>
diff --git
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java
index 882b170e11..360e788cb3 100644
---
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java
+++
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java
@@ -55,7 +55,10 @@ public enum DbType {
ZEPPELIN(24, "zeppelin", "zeppelin"),
SAGEMAKER(25, "sagemaker", "sagemaker"),
- K8S(26, "k8s", "k8s");
+ K8S(26, "k8s", "k8s"),
+
+ ALIYUN_SERVERLESS_SPARK(27, "aliyun_serverless_spark", "aliyun serverless
spark");
+
private static final Map<Integer, DbType> DB_TYPE_MAP =
Arrays.stream(DbType.values()).collect(toMap(DbType::getCode,
Functions.identity()));
@EnumValue
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/pom.xml
b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/pom.xml
new file mode 100644
index 0000000000..7a61f116c8
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/pom.xml
@@ -0,0 +1,68 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-task-plugin</artifactId>
+ <version>dev-SNAPSHOT</version>
+ </parent>
+ <artifactId>dolphinscheduler-task-aliyunserverlessspark</artifactId>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-spi</artifactId>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.aliyun.oss</groupId>
+ <artifactId>aliyun-sdk-oss</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-task-api</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.aliyun.oss</groupId>
+ <artifactId>aliyun-sdk-oss</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-datasource-all</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.aliyun</groupId>
+ <artifactId>emr_serverless_spark20230808</artifactId>
+ <version>1.0.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.aliyun</groupId>
+ <artifactId>credentials-java</artifactId>
+ <version>0.3.0</version>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkParameters.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkParameters.java
new file mode 100644
index 0000000000..a4cdf07ef5
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkParameters.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.plugin.task.aliyunserverlessspark;
+
+import org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType;
+import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
+
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+@Data
+@Slf4j
+public class AliyunServerlessSparkParameters extends AbstractParameters {
+
+ // spark job configurations
+ private String workspaceId;
+
+ private String resourceQueueId;
+
+ private String codeType;
+
+ private String jobName;
+
+ private String engineReleaseVersion;
+
+ private String entryPoint;
+
+ private String entryPointArguments;
+
+ private String sparkSubmitParameters;
+
+ @JsonProperty("isProduction")
+ boolean isProduction;
+
+ private int datasource;
+
+ private String type;
+
+ @Override
+ public boolean checkParameters() {
+ return true;
+ }
+
+ @Override
+ public ResourceParametersHelper getResources() {
+ ResourceParametersHelper resources = super.getResources();
+ resources.put(ResourceType.DATASOURCE, datasource);
+ return resources;
+ }
+}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTask.java
new file mode 100644
index 0000000000..e2fc7e9842
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTask.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.aliyunserverlessspark;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import
org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark.AliyunServerlessSparkConstants;
+import
org.apache.dolphinscheduler.plugin.datasource.aliyunserverlessspark.param.AliyunServerlessSparkConnectionParam;
+import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType;
+import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters;
+import
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
+import org.apache.dolphinscheduler.spi.enums.DbType;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.aliyun.emr_serverless_spark20230808.Client;
+import com.aliyun.emr_serverless_spark20230808.models.CancelJobRunRequest;
+import com.aliyun.emr_serverless_spark20230808.models.GetJobRunRequest;
+import com.aliyun.emr_serverless_spark20230808.models.GetJobRunResponse;
+import com.aliyun.emr_serverless_spark20230808.models.JobDriver;
+import com.aliyun.emr_serverless_spark20230808.models.StartJobRunRequest;
+import com.aliyun.emr_serverless_spark20230808.models.StartJobRunResponse;
+import com.aliyun.emr_serverless_spark20230808.models.Tag;
+import com.aliyun.teaopenapi.models.Config;
+import com.aliyun.teautil.models.RuntimeOptions;
+
+@Slf4j
+public class AliyunServerlessSparkTask extends AbstractRemoteTask {
+
+ private final TaskExecutionContext taskExecutionContext;
+
+ private Client aliyunServerlessSparkClient;
+
+ private AliyunServerlessSparkParameters aliyunServerlessSparkParameters;
+
+ private AliyunServerlessSparkConnectionParam
aliyunServerlessSparkConnectionParam;
+
+ private String jobRunId;
+
+ private RunState currentState;
+
+ private String accessKeyId;
+
+ private String accessKeySecret;
+
+ private String regionId;
+
+ private String endpoint;
+
+ protected AliyunServerlessSparkTask(TaskExecutionContext
taskExecutionContext) {
+ super(taskExecutionContext);
+ this.taskExecutionContext = taskExecutionContext;
+ }
+
+ @Override
+ public void init() {
+ final String taskParams = taskExecutionContext.getTaskParams();
+ aliyunServerlessSparkParameters = JSONUtils.parseObject(taskParams,
AliyunServerlessSparkParameters.class);
+ log.info("aliyunServerlessSparkParameters - {}",
aliyunServerlessSparkParameters);
+ if (this.aliyunServerlessSparkParameters == null ||
!this.aliyunServerlessSparkParameters.checkParameters()) {
+ throw new
AliyunServerlessSparkTaskException("Aliyun-Serverless-Spark task parameters are
not valid!");
+ }
+
+ ResourceParametersHelper resourceParametersHelper =
taskExecutionContext.getResourceParametersHelper();
+ DataSourceParameters dataSourceParameters = (DataSourceParameters)
resourceParametersHelper
+ .getResourceParameters(ResourceType.DATASOURCE,
aliyunServerlessSparkParameters.getDatasource());
+ aliyunServerlessSparkConnectionParam =
(AliyunServerlessSparkConnectionParam) DataSourceUtils
+ .buildConnectionParams(
+
DbType.valueOf(aliyunServerlessSparkParameters.getType()),
+ dataSourceParameters.getConnectionParams());
+
+ accessKeyId = aliyunServerlessSparkConnectionParam.getAccessKeyId();
+ accessKeySecret =
aliyunServerlessSparkConnectionParam.getAccessKeySecret();
+ regionId = aliyunServerlessSparkConnectionParam.getRegionId();
+ endpoint = aliyunServerlessSparkConnectionParam.getEndpoint();
+
+ try {
+ aliyunServerlessSparkClient =
+ buildAliyunServerlessSparkClient(accessKeyId,
accessKeySecret, regionId, endpoint);
+ } catch (Exception e) {
+ log.error("Failed to build Aliyun-Serverless-Spark client!", e);
+ throw new AliyunServerlessSparkTaskException("Failed to build
Aliyun-Serverless-Spark client!");
+ }
+
+ currentState = RunState.Submitted;
+ }
+
+ @Override
+ public void handle(TaskCallBack taskCallBack) throws TaskException {
+ try {
+ StartJobRunRequest startJobRunRequest =
buildStartJobRunRequest(aliyunServerlessSparkParameters);
+ RuntimeOptions runtime = new RuntimeOptions();
+ Map<String, String> headers = new HashMap<>();
+ StartJobRunResponse startJobRunResponse =
aliyunServerlessSparkClient.startJobRunWithOptions(
+ aliyunServerlessSparkParameters.getWorkspaceId(),
startJobRunRequest, headers, runtime);
+ jobRunId = startJobRunResponse.getBody().getJobRunId();
+ setAppIds(jobRunId);
+ log.info("Successfully submitted serverless spark job, jobRunId -
{}", jobRunId);
+
+ while (!RunState.isFinal(currentState)) {
+ GetJobRunRequest getJobRunRequest = buildGetJobRunRequest();
+ GetJobRunResponse getJobRunResponse =
aliyunServerlessSparkClient
+
.getJobRun(aliyunServerlessSparkParameters.getWorkspaceId(), jobRunId,
getJobRunRequest);
+ currentState =
RunState.valueOf(getJobRunResponse.getBody().getJobRun().getState());
+ log.info("job - {} state - {}", jobRunId, currentState);
+ Thread.sleep(10 * 1000L);
+ }
+
+ setExitStatusCode(mapFinalStateToExitCode(currentState));
+
+ } catch (Exception e) {
+ log.error("Serverless spark job failed!", e);
+ throw new AliyunServerlessSparkTaskException("Serverless spark job
failed!");
+ }
+ }
+
+ @Override
+ public void submitApplication() throws TaskException {
+
+ }
+
+ @Override
+ public void trackApplicationStatus() throws TaskException {
+
+ }
+
+ protected int mapFinalStateToExitCode(RunState state) {
+ switch (state) {
+ case Success:
+ return TaskConstants.EXIT_CODE_SUCCESS;
+ case Failed:
+ return TaskConstants.EXIT_CODE_KILL;
+ default:
+ return TaskConstants.EXIT_CODE_FAILURE;
+ }
+ }
+
+ @Override
+ public AbstractParameters getParameters() {
+ return aliyunServerlessSparkParameters;
+ }
+
+ @Override
+ public void cancelApplication() throws TaskException {
+ CancelJobRunRequest cancelJobRunRequest = buildCancelJobRunRequest();
+ try {
+
aliyunServerlessSparkClient.cancelJobRun(aliyunServerlessSparkParameters.getWorkspaceId(),
jobRunId,
+ cancelJobRunRequest);
+ } catch (Exception e) {
+ log.error("Failed to cancel serverless spark job run", e);
+ }
+ }
+
+ @Override
+ public List<String> getApplicationIds() throws TaskException {
+ return Collections.emptyList();
+ }
+
+ protected Client buildAliyunServerlessSparkClient(String accessKeyId,
String accessKeySecret,
+ String regionId, String
endpoint) throws Exception {
+ if (StringUtils.isEmpty(endpoint)) {
+ endpoint =
String.format(AliyunServerlessSparkConstants.ENDPOINT_TEMPLATE, regionId);
+ }
+
+ Config config = new Config()
+ .setEndpoint(endpoint)
+ .setAccessKeyId(accessKeyId)
+ .setAccessKeySecret(accessKeySecret);
+ return new Client(config);
+ }
+
+ protected StartJobRunRequest
buildStartJobRunRequest(AliyunServerlessSparkParameters
aliyunServerlessSparkParameters) {
+ StartJobRunRequest startJobRunRequest = new StartJobRunRequest();
+ startJobRunRequest.setRegionId(regionId);
+
startJobRunRequest.setResourceQueueId(aliyunServerlessSparkParameters.getResourceQueueId());
+
startJobRunRequest.setCodeType(aliyunServerlessSparkParameters.getCodeType());
+
startJobRunRequest.setName(aliyunServerlessSparkParameters.getJobName());
+ String engineReleaseVersion =
aliyunServerlessSparkParameters.getEngineReleaseVersion();
+ engineReleaseVersion =
+ StringUtils.isEmpty(engineReleaseVersion) ?
AliyunServerlessSparkConstants.DEFAULT_ENGINE
+ : engineReleaseVersion;
+ startJobRunRequest.setReleaseVersion(engineReleaseVersion);
+ Tag envTag = new Tag();
+ envTag.setKey(AliyunServerlessSparkConstants.ENV_KEY);
+ String envType = aliyunServerlessSparkParameters.isProduction() ?
AliyunServerlessSparkConstants.ENV_PROD
+ : AliyunServerlessSparkConstants.ENV_DEV;
+ envTag.setValue(envType);
+ Tag workflowTag = new Tag();
+ workflowTag.setKey(AliyunServerlessSparkConstants.WORKFLOW_KEY);
+ workflowTag.setValue(AliyunServerlessSparkConstants.WORKFLOW_VALUE);
+ startJobRunRequest.setTags(Arrays.asList(envTag, workflowTag));
+ List<String> entryPointArguments =
+
StringUtils.isEmpty(aliyunServerlessSparkParameters.getEntryPointArguments()) ?
Collections.emptyList()
+ :
Arrays.asList(aliyunServerlessSparkParameters.getEntryPointArguments()
+
.split(AliyunServerlessSparkConstants.ENTRY_POINT_ARGUMENTS_DELIMITER));
+ JobDriver.JobDriverSparkSubmit jobDriverSparkSubmit = new
JobDriver.JobDriverSparkSubmit()
+ .setEntryPoint(aliyunServerlessSparkParameters.getEntryPoint())
+ .setEntryPointArguments(entryPointArguments)
+
.setSparkSubmitParameters(aliyunServerlessSparkParameters.getSparkSubmitParameters());
+ JobDriver jobDriver = new
com.aliyun.emr_serverless_spark20230808.models.JobDriver()
+ .setSparkSubmit(jobDriverSparkSubmit);
+ startJobRunRequest.setJobDriver(jobDriver);
+ return startJobRunRequest;
+ }
+
+ protected GetJobRunRequest buildGetJobRunRequest() {
+ GetJobRunRequest getJobRunRequest = new GetJobRunRequest();
+ getJobRunRequest.setRegionId(regionId);
+ return getJobRunRequest;
+ }
+
+ protected CancelJobRunRequest buildCancelJobRunRequest() {
+ CancelJobRunRequest cancelJobRunRequest = new CancelJobRunRequest();
+ cancelJobRunRequest.setRegionId(regionId);
+ return cancelJobRunRequest;
+ }
+}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskChannel.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskChannel.java
new file mode 100644
index 0000000000..1a1205f2c6
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskChannel.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.plugin.task.aliyunserverlessspark;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+
+public class AliyunServerlessSparkTaskChannel implements TaskChannel {
+
+ @Override
+ public AbstractTask createTask(TaskExecutionContext taskRequest) {
+ return new AliyunServerlessSparkTask(taskRequest);
+ }
+
+ @Override
+ public AbstractParameters parseParameters(String taskParams) {
+ return JSONUtils.parseObject(taskParams,
AliyunServerlessSparkParameters.class);
+ }
+
+}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskChannelFactory.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskChannelFactory.java
new file mode 100644
index 0000000000..eaa797d2d5
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskChannelFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.plugin.task.aliyunserverlessspark;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(TaskChannelFactory.class)
+public class AliyunServerlessSparkTaskChannelFactory implements
TaskChannelFactory {
+
+ @Override
+ public String getName() {
+ return "ALIYUN_SERVERLESS_SPARK";
+ }
+
+ @Override
+ public TaskChannel create() {
+ return new AliyunServerlessSparkTaskChannel();
+ }
+}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskException.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskException.java
new file mode 100644
index 0000000000..5b54bd4d04
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.aliyunserverlessspark;
+
+public class AliyunServerlessSparkTaskException extends RuntimeException {
+
+ public AliyunServerlessSparkTaskException() {
+ super();
+ }
+
+ public AliyunServerlessSparkTaskException(String message) {
+ super(message);
+ }
+
+ public AliyunServerlessSparkTaskException(String message, Throwable cause)
{
+ super(message, cause);
+ }
+}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/RunState.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/RunState.java
new file mode 100644
index 0000000000..0d617170d3
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/RunState.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.plugin.task.aliyunserverlessspark;
+
+import java.util.Objects;
+
+public enum RunState {
+
+ Submitted,
+
+ Pending,
+
+ Running,
+
+ Success,
+
+ Failed,
+
+ Cancelling,
+
+ Cancelled,
+
+ CancelFailed;
+
+ public static boolean isFinal(RunState runState) {
+ return Success == runState || Failed == runState || Cancelled ==
runState;
+ }
+
+ public static boolean hasLaunched(RunState runState) {
+ return Objects.nonNull(runState) && runState != Submitted && runState
!= Pending;
+ }
+
+ public static boolean isCancelled(RunState runState) {
+ return Cancelled == runState;
+ }
+
+}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/test/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/test/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskTest.java
new file mode 100644
index 0000000000..c9cd7ef37b
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/test/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.plugin.task.aliyunserverlessspark;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType;
+import
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters;
+import
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
+import org.apache.dolphinscheduler.spi.enums.DbType;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import com.aliyun.emr_serverless_spark20230808.Client;
+import com.aliyun.emr_serverless_spark20230808.models.CancelJobRunRequest;
+import com.aliyun.emr_serverless_spark20230808.models.CancelJobRunResponse;
+import com.aliyun.emr_serverless_spark20230808.models.GetJobRunRequest;
+import com.aliyun.emr_serverless_spark20230808.models.GetJobRunResponse;
+import com.aliyun.emr_serverless_spark20230808.models.GetJobRunResponseBody;
+import com.aliyun.emr_serverless_spark20230808.models.StartJobRunRequest;
+import com.aliyun.emr_serverless_spark20230808.models.StartJobRunResponse;
+import com.aliyun.emr_serverless_spark20230808.models.StartJobRunResponseBody;
+
+@Slf4j
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
+public class AliyunServerlessSparkTaskTest {
+
+ @Mock
+ private TaskExecutionContext mockTaskExecutionContext;
+
+ @Mock
+ private Client mockAliyunServerlessSparkClient;
+
+ @Mock
+ private ResourceParametersHelper mockResourceParametersHelper;
+
+ @Mock
+ private TaskCallBack mockTaskCallBack;
+
+ @Mock
+ private StartJobRunRequest mockStartJobRunRequest;
+
+ @Mock
+ private StartJobRunResponse mockStartJobRunResponse;
+
+ @Mock
+ private GetJobRunRequest mockGetJobRunRequest;
+
+ @Mock
+ private GetJobRunResponse mockGetJobRunResponse;
+
+ @Mock
+ private CancelJobRunRequest mockCancelJobRunRequest;
+
+ @Mock
+ private CancelJobRunResponse mockCancelJobRunResponse;
+
+ @InjectMocks
+ @Spy
+ private AliyunServerlessSparkTask aliyunServerlessSparkTask;
+
+ private static final String mockAccessKeyId = "mockAccessKeyId";
+
+ private static final String mockAccessKeySecret = "mockAccessKeySecret";
+
+ private static final String mockRegionId = "cn-hangzhou";
+
+ private static final String mockEndpoint =
"emr-serverless-spark-vpc.cn-hangzhou.aliyuncs.com";
+
+ private static final int mockDatasourceId = 1;
+
+ private static final String taskParamsString =
+
"{\"localParams\":[],\"resourceList\":[],\"workspaceId\":\"w-ae42e9c929275cc5\",\"resourceQueueId\":\"root_queue\",\"codeType\":\"JAR\",\"jobName\":\"spark\",\"entryPoint\":\"oss://datadev-oss-hdfs-test/spark-resource/examples/jars/spark-examples_2.12-3.3.1.jar\",\"entryPointArguments\":\"10\",\"sparkSubmitParameters\":\"--class
org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf
spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memo
[...]
+
+ private static final String connectionParamsString =
+
"{\"accessKeyId\":\"mockAccessKeyId\",\"accessKeySecret\":\"mockAccessKeySecret\",\"regionId\":\"cn-hangzhou\",\"endpoint\":\"emr-serverless-spark-vpc.cn-hangzhou.aliyuncs.com\",\"password\":\"\"}";
+
+ private static final String mockJobRunId = "jr-f6a1d0dd17d6b8a3";
+
+ private static final String mockWorkspaceId = "w-ae42e9c929275cc5";
+
+ private static final String mockResourceQueueId = "root_queue";
+
+ private static final String mockSparkSubmitParameters =
+ "--class org.apache.spark.examples.SparkPi --conf
spark.executor.cores=4 --conf spark.executor.memory=20g --conf
spark.driver.cores=4 --conf spark.driver.memory=8g --conf
spark.executor.instances=1";
+
+ private static final String mockEntryPoint =
+
"oss://datadev-oss-hdfs-test/spark-resource/examples/jars/spark-examples_2.12-3.3.1.jar";
+
+ private static final String mockEntryPointArguments = "10";
+
+ @BeforeEach
+ public void before() {
+
when(mockTaskExecutionContext.getTaskParams()).thenReturn(taskParamsString);
+ DataSourceParameters dataSourceParameters = new DataSourceParameters();
+ dataSourceParameters.setConnectionParams(connectionParamsString);
+ dataSourceParameters.setType(DbType.ALIYUN_SERVERLESS_SPARK);
+ when(mockResourceParametersHelper.getResourceParameters(any(),
any())).thenReturn(dataSourceParameters);
+
when(mockTaskExecutionContext.getResourceParametersHelper()).thenReturn(mockResourceParametersHelper);
+ Assertions.assertDoesNotThrow(
+ () ->
when(aliyunServerlessSparkTask.buildAliyunServerlessSparkClient(any(), any(),
any(), any()))
+ .thenReturn(mockAliyunServerlessSparkClient));
+ }
+
+ @Test
+ public void testInit() throws Exception {
+ aliyunServerlessSparkTask.init();
+ verify(mockTaskExecutionContext).getTaskParams();
+
verify(mockResourceParametersHelper).getResourceParameters(ResourceType.DATASOURCE,
mockDatasourceId);
+
verify(aliyunServerlessSparkTask).buildAliyunServerlessSparkClient(mockAccessKeyId,
mockAccessKeySecret,
+ mockRegionId, mockEndpoint);
+ }
+
+ @Test
+ public void testHandle() {
+
doReturn(mockStartJobRunRequest).when(aliyunServerlessSparkTask).buildStartJobRunRequest(any());
+ StartJobRunResponseBody startJobRunResponseBody = new
StartJobRunResponseBody();
+ startJobRunResponseBody.setJobRunId(mockJobRunId);
+
doReturn(startJobRunResponseBody).when(mockStartJobRunResponse).getBody();
+ Assertions.assertDoesNotThrow(
+ () ->
doReturn(mockStartJobRunResponse).when(mockAliyunServerlessSparkClient)
+ .startJobRunWithOptions(any(), any(), any(), any()));
+
+
doReturn(mockGetJobRunRequest).when(aliyunServerlessSparkTask).buildGetJobRunRequest();
+ GetJobRunResponseBody getJobRunResponseBody = new
GetJobRunResponseBody();
+ GetJobRunResponseBody.GetJobRunResponseBodyJobRun jobRun =
+ new GetJobRunResponseBody.GetJobRunResponseBodyJobRun();
+ jobRun.setState(RunState.Success.name());
+ getJobRunResponseBody.setJobRun(jobRun);
+ doReturn(getJobRunResponseBody).when(mockGetJobRunResponse).getBody();
+ Assertions.assertDoesNotThrow(
+ () ->
doReturn(mockGetJobRunResponse).when(mockAliyunServerlessSparkClient).getJobRun(any(),
any(),
+ any()));
+
+ aliyunServerlessSparkTask.init();
+ aliyunServerlessSparkTask.handle(mockTaskCallBack);
+ verify(aliyunServerlessSparkTask).setAppIds(mockJobRunId);
+
verify(aliyunServerlessSparkTask).setExitStatusCode(TaskConstants.EXIT_CODE_SUCCESS);
+ }
+
+ @Test
+ public void testCancelApplication() throws Exception {
+
doReturn(mockCancelJobRunRequest).when(aliyunServerlessSparkTask).buildCancelJobRunRequest();
+ Assertions.assertDoesNotThrow(
+ () ->
doReturn(mockCancelJobRunResponse).when(mockAliyunServerlessSparkClient).cancelJobRun(any(),
+ any(), any()));
+
+ aliyunServerlessSparkTask.init();
+ aliyunServerlessSparkTask.cancelApplication();
+ verify(aliyunServerlessSparkTask).buildCancelJobRunRequest();
+
verify(mockAliyunServerlessSparkClient).cancelJobRun(eq(mockWorkspaceId),
any(), eq(mockCancelJobRunRequest));
+ }
+
+ @Test
+ public void testBuildStartJobRunRequest() {
+ AliyunServerlessSparkParameters mockAliyunServerlessSparkParameters =
+ mock(AliyunServerlessSparkParameters.class);
+
doReturn(mockResourceQueueId).when(mockAliyunServerlessSparkParameters).getResourceQueueId();
+
doReturn("JAR").when(mockAliyunServerlessSparkParameters).getCodeType();
+
doReturn("ds-test").when(mockAliyunServerlessSparkParameters).getJobName();
+
doReturn(mockSparkSubmitParameters).when(mockAliyunServerlessSparkParameters).getSparkSubmitParameters();
+
doReturn(mockEntryPoint).when(mockAliyunServerlessSparkParameters).getEntryPoint();
+
doReturn(mockEntryPointArguments).when(mockAliyunServerlessSparkParameters).getEntryPointArguments();
+
+
aliyunServerlessSparkTask.buildStartJobRunRequest(mockAliyunServerlessSparkParameters);
+
+ verify(mockAliyunServerlessSparkParameters).getResourceQueueId();
+ verify(mockAliyunServerlessSparkParameters).getCodeType();
+ verify(mockAliyunServerlessSparkParameters).getJobName();
+ verify(mockAliyunServerlessSparkParameters).getEngineReleaseVersion();
+ verify(mockAliyunServerlessSparkParameters).isProduction();
+ }
+
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml
b/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml
index fbaea0dddc..3d107ba14c 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml
@@ -216,6 +216,12 @@
<artifactId>dolphinscheduler-task-remoteshell</artifactId>
<version>${project.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.dolphinscheduler</groupId>
+
<artifactId>dolphinscheduler-task-aliyunserverlessspark</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git a/dolphinscheduler-task-plugin/pom.xml
b/dolphinscheduler-task-plugin/pom.xml
index 9036e88b67..e1446ec426 100644
--- a/dolphinscheduler-task-plugin/pom.xml
+++ b/dolphinscheduler-task-plugin/pom.xml
@@ -62,6 +62,7 @@
<module>dolphinscheduler-task-linkis</module>
<module>dolphinscheduler-task-datafactory</module>
<module>dolphinscheduler-task-remoteshell</module>
+ <module>dolphinscheduler-task-aliyunserverlessspark</module>
</modules>
<dependencyManagement>
diff --git
a/dolphinscheduler-ui/public/images/task-icons/aliyun_serverless_spark.png
b/dolphinscheduler-ui/public/images/task-icons/aliyun_serverless_spark.png
new file mode 100644
index 0000000000..c620bb1f1e
Binary files /dev/null and
b/dolphinscheduler-ui/public/images/task-icons/aliyun_serverless_spark.png
differ
diff --git
a/dolphinscheduler-ui/public/images/task-icons/aliyun_serverless_spark_hover.png
b/dolphinscheduler-ui/public/images/task-icons/aliyun_serverless_spark_hover.png
new file mode 100644
index 0000000000..1927e0eb24
Binary files /dev/null and
b/dolphinscheduler-ui/public/images/task-icons/aliyun_serverless_spark_hover.png
differ
diff --git a/dolphinscheduler-ui/src/locales/en_US/datasource.ts
b/dolphinscheduler-ui/src/locales/en_US/datasource.ts
index e9b799b16e..645edc7f7d 100644
--- a/dolphinscheduler-ui/src/locales/en_US/datasource.ts
+++ b/dolphinscheduler-ui/src/locales/en_US/datasource.ts
@@ -85,7 +85,7 @@ export default {
clientId: 'ClientId',
clientSecret: 'ClientSecret',
OAuth_token_endpoint: 'OAuth 2.0 token endpoint',
- endpoint_tips: 'Please enter OAuth Token',
+ OAuth_token_endpoint_tips: 'Please enter OAuth Token',
AccessKeyID: 'AccessKeyID',
AccessKeyID_tips: 'Please input AccessKeyID',
SecretAccessKey: 'SecretAccessKey',
@@ -97,5 +97,13 @@ export default {
kubeConfig: 'kubeConfig',
kubeConfig_tips: 'Please input KubeConfig',
namespace: 'namespace',
- namespace_tips: 'Please input namespace'
+ namespace_tips: 'Please input namespace',
+ access_key_id: 'Access Key Id',
+ access_key_id_tips: 'Please enter access key id',
+ access_key_secret: 'Access Key Secret',
+ access_key_secret_tips: 'Please enter access key secret',
+ region_id: 'Region Id',
+ region_id_tips: 'Please enter Region Id',
+ endpoint: 'Endpoint',
+ endpoint_tips: 'Please enter endpoint'
}
diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts
b/dolphinscheduler-ui/src/locales/en_US/project.ts
index a42b652eff..dce409c797 100644
--- a/dolphinscheduler-ui/src/locales/en_US/project.ts
+++ b/dolphinscheduler-ui/src/locales/en_US/project.ts
@@ -909,7 +909,33 @@ export default {
yarn_queue_tips: 'Please input yarn queue(optional)',
dependent_type: 'Dependency Type',
dependent_on_workflow: 'Dependent on workflow',
- dependent_on_task: 'Dependent on task'
+ dependent_on_task: 'Dependent on task',
+ region_id: 'region id',
+ region_id_tips: 'region id',
+ endpoint: 'endpoint',
+ endpoint_tips: 'restful endpoint',
+ access_key_id: 'access key id',
+ access_key_id_tips: 'access key id',
+ access_key_secret: 'access key secret',
+ access_key_secret_tips: 'access key secret',
+ workspace_id: 'workspace id',
+ workspace_id_tips: 'workspace id',
+ resource_queue_id: 'resource queue id',
+ resource_queue_id_tips: 'resource queue id',
+ code_type: 'code type',
+ code_type_tips: 'code type',
+ job_name: 'job name',
+ job_name_tips: 'job name',
+ engine_release_version: 'engine release version',
+ engine_release_version_tips: 'engine release version',
+ entry_point: 'entry point',
+ entry_point_tips: 'entry point',
+ entry_point_arguments: 'entry point arguments',
+ entry_point_arguments_tips: 'entry point arguments',
+ spark_submit_parameters: 'spark submit parameters',
+ spark_submit_parameters_tips: 'spark submit parameters',
+ is_production: 'is production',
+ is_production_tips: 'is production'
},
menu: {
fav: 'Favorites',
diff --git a/dolphinscheduler-ui/src/locales/zh_CN/datasource.ts
b/dolphinscheduler-ui/src/locales/zh_CN/datasource.ts
index 7aa797a591..b3a68ec3d8 100644
--- a/dolphinscheduler-ui/src/locales/zh_CN/datasource.ts
+++ b/dolphinscheduler-ui/src/locales/zh_CN/datasource.ts
@@ -82,7 +82,7 @@ export default {
clientId: 'ClientId',
clientSecret: 'ClientSecret',
OAuth_token_endpoint: 'OAuth 2.0 token endpoint',
- endpoint_tips: '请输入OAuth',
+ OAuth_token_endpoint_tips: '请输入OAuth',
AccessKeyID: 'AccessKeyID',
AccessKeyID_tips: '请输入AccessKeyID',
SecretAccessKey: 'SecretAccessKey',
@@ -94,5 +94,13 @@ export default {
kubeConfig: 'kubeConfig',
kubeConfig_tips: '请输入KubeConfig',
namespace: 'namespace',
- namespace_tips: '请输入namespace'
+ namespace_tips: '请输入namespace',
+ access_key_id: 'Access Key Id',
+ access_key_id_tips: '请输入access key id',
+ access_key_secret: 'Access Key Secret',
+ access_key_secret_tips: '请输入access key secret',
+ region_id: 'Region Id',
+ region_id_tips: '请输入Region Id',
+ endpoint: 'endpoint',
+ endpoint_tips: '请输入endpoint'
}
diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts
b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
index cb7e320118..f3b50c93b8 100644
--- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts
+++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
@@ -878,7 +878,33 @@ export default {
yarn_queue_tips: '请输入Yarn队列(选填)',
dependent_type: '依赖类型',
dependent_on_workflow: '依赖于工作流',
- dependent_on_task: '依赖于任务'
+ dependent_on_task: '依赖于任务',
+ region_id: 'region id',
+ region_id_tips: 'region id',
+ endpoint: 'endpoint',
+ endpoint_tips: '请输入endpoint',
+ access_key_id: 'access key id',
+ access_key_id_tips: 'access key id',
+ access_key_secret: 'access key secret',
+ access_key_secret_tips: 'access key secret',
+ workspace_id: 'workspace id',
+ workspace_id_tips: 'workspace id',
+ resource_queue_id: 'resource queue id',
+ resource_queue_id_tips: 'resource queue id',
+ code_type: 'code type',
+ code_type_tips: 'code type',
+ job_name: 'job name',
+ job_name_tips: 'job name',
+ engine_release_version: 'engine release version',
+ engine_release_version_tips: 'engine release version',
+ entry_point: 'entry point',
+ entry_point_tips: 'entry point',
+ entry_point_arguments: 'entry point arguments',
+ entry_point_arguments_tips: 'entry point arguments',
+ spark_submit_parameters: 'spark submit parameters',
+ spark_submit_parameters_tips: 'spark submit parameters',
+ is_production: 'is production',
+ is_production_tips: 'is production'
},
menu: {
fav: '收藏组件',
diff --git a/dolphinscheduler-ui/src/service/modules/data-source/types.ts
b/dolphinscheduler-ui/src/service/modules/data-source/types.ts
index 444f5293dd..135a8cc553 100644
--- a/dolphinscheduler-ui/src/service/modules/data-source/types.ts
+++ b/dolphinscheduler-ui/src/service/modules/data-source/types.ts
@@ -42,6 +42,7 @@ type IDataBase =
| 'ZEPPELIN'
| 'SAGEMAKER'
| 'K8S'
+ | 'ALIYUN_SERVERLESS_SPARK'
type IDataBaseLabel =
| 'MYSQL'
@@ -65,6 +66,7 @@ type IDataBaseLabel =
| 'ZEPPELIN'
| 'SAGEMAKER'
| 'K8S'
+ | 'ALIYUN_SERVERLESS_SPARK'
interface IDataSource {
id?: number
@@ -85,7 +87,6 @@ interface IDataSource {
database?: string
connectType?: string
other?: object
- endpoint?: string
restEndpoint?: string
kubeConfig?: string
namespace?: string
@@ -94,6 +95,10 @@ interface IDataSource {
compatibleMode?: string
publicKey?: string
datawarehouse?: string
+ accessKeyId?: string
+ accessKeySecret?: string
+ regionId?: string
+ endpoint?: string
}
interface ListReq {
diff --git a/dolphinscheduler-ui/src/store/project/task-type.ts
b/dolphinscheduler-ui/src/store/project/task-type.ts
index 826f0b13c3..9eb1d8cd41 100644
--- a/dolphinscheduler-ui/src/store/project/task-type.ts
+++ b/dolphinscheduler-ui/src/store/project/task-type.ts
@@ -89,6 +89,10 @@ export const TASK_TYPES_MAP = {
alias: 'ZEPPELIN',
helperLinkDisable: true
},
+ ALIYUN_SERVERLESS_SPARK: {
+ alias: 'ALIYUN_SERVERLESS_SPARK',
+ helperLinkDisable: true
+ },
JUPYTER: {
alias: 'JUPYTER',
helperLinkDisable: true
diff --git a/dolphinscheduler-ui/src/store/project/types.ts
b/dolphinscheduler-ui/src/store/project/types.ts
index cb48ba8654..bc8dbd0b2b 100644
--- a/dolphinscheduler-ui/src/store/project/types.ts
+++ b/dolphinscheduler-ui/src/store/project/types.ts
@@ -58,6 +58,7 @@ type TaskType =
| 'LINKIS'
| 'DATA_FACTORY'
| 'REMOTESHELL'
+ | 'ALIYUN_SERVERLESS_SPARK'
type ProgramType = 'JAVA' | 'SCALA' | 'PYTHON'
diff --git a/dolphinscheduler-ui/src/views/datasource/list/detail.tsx
b/dolphinscheduler-ui/src/views/datasource/list/detail.tsx
index 4842651290..d8b1cb1f9f 100644
--- a/dolphinscheduler-ui/src/views/datasource/list/detail.tsx
+++ b/dolphinscheduler-ui/src/views/datasource/list/detail.tsx
@@ -155,6 +155,10 @@ const DetailModal = defineComponent({
showHost,
showPort,
showRestEndpoint,
+ showAccessKeyId,
+ showAccessKeySecret,
+ showRegionId,
+ showEndpoint,
showAwsRegion,
showCompatibleMode,
showConnectType,
@@ -270,6 +274,65 @@ const DetailModal = defineComponent({
placeholder={t('datasource.zeppelin_rest_endpoint_tips')}
/>
</NFormItem>
+ <NFormItem
+ v-show={showAccessKeyId}
+ label={t('datasource.access_key_id')}
+ path='accessKeyId'
+ show-require-mark
+ >
+ <NInput
+ allowInput={this.trim}
+ class='input-access_key_id'
+ v-model={[detailForm.accessKeyId, 'value']}
+ type='text'
+ maxlength={255}
+ placeholder={t('datasource.access_key_id_tips')}
+ />
+ </NFormItem>
+ <NFormItem
+ v-show={showAccessKeySecret}
+ label={t('datasource.access_key_secret')}
+ path='accessKeySecret'
+ show-require-mark
+ >
+ <NInput
+ allowInput={this.trim}
+ class='input-access_key_secret'
+ v-model={[detailForm.accessKeySecret, 'value']}
+ type='text'
+ maxlength={255}
+ placeholder={t('datasource.access_key_secret_tips')}
+ />
+ </NFormItem>
+ <NFormItem
+ v-show={showRegionId}
+ label={t('datasource.region_id')}
+ path='regionId'
+ show-require-mark
+ >
+ <NInput
+ allowInput={this.trim}
+ class='input-region_id'
+ v-model={[detailForm.regionId, 'value']}
+ type='text'
+ maxlength={255}
+ placeholder={t('datasource.region_id_tips')}
+ />
+ </NFormItem>
+ <NFormItem
+ v-show={showEndpoint}
+ label={t('datasource.endpoint')}
+ path='endpoint'
+ >
+ <NInput
+ allowInput={this.trim}
+ class='input-endpoint'
+ v-model={[detailForm.endpoint, 'value']}
+ type='text'
+ maxlength={255}
+ placeholder={t('datasource.endpoint_tips')}
+ />
+ </NFormItem>
<NFormItem
v-show={showPort}
label={t('datasource.port')}
@@ -546,7 +609,8 @@ const DetailModal = defineComponent({
<NFormItem
v-show={
(!showMode || detailForm.mode === 'password') &&
- detailForm.type != 'K8S'
+ detailForm.type != 'K8S' &&
+ detailForm.type != 'ALIYUN_SERVERLESS_SPARK'
}
label={t('datasource.user_name')}
path='userName'
@@ -564,7 +628,8 @@ const DetailModal = defineComponent({
<NFormItem
v-show={
(!showMode || detailForm.mode === 'password') &&
- detailForm.type != 'K8S'
+ detailForm.type != 'K8S' &&
+ detailForm.type != 'ALIYUN_SERVERLESS_SPARK'
}
label={t('datasource.user_password')}
path='password'
diff --git a/dolphinscheduler-ui/src/views/datasource/list/use-form.ts
b/dolphinscheduler-ui/src/views/datasource/list/use-form.ts
index 21916667d2..7a804d3de9 100644
--- a/dolphinscheduler-ui/src/views/datasource/list/use-form.ts
+++ b/dolphinscheduler-ui/src/views/datasource/list/use-form.ts
@@ -71,6 +71,10 @@ export function useForm(id?: number) {
showPublicKey: false,
showNamespace: false,
showKubeConfig: false,
+ showAccessKeyId: false,
+ showAccessKeySecret: false,
+ showRegionId: false,
+ showEndpoint: false,
rules: {
name: {
trigger: ['input'],
@@ -121,7 +125,8 @@ export function useForm(id?: number) {
if (
!state.detailForm.userName &&
state.detailForm.type !== 'AZURESQL' &&
- state.detailForm.type !== 'K8S'
+ state.detailForm.type !== 'K8S' &&
+ state.detailForm.type !== 'ALIYUN_SERVERLESS_SPARK'
) {
return new Error(t('datasource.user_name_tips'))
}
@@ -268,7 +273,8 @@ export function useForm(id?: number) {
type === 'SSH' ||
type === 'ZEPPELIN' ||
type === 'SAGEMAKER' ||
- type === 'K8S'
+ type === 'K8S' ||
+ type === 'ALIYUN_SERVERLESS_SPARK'
) {
state.showDataBaseName = false
state.requiredDataBase = false
@@ -282,7 +288,11 @@ export function useForm(id?: number) {
state.showPort = false
state.showRestEndpoint = true
}
- if (type === 'SAGEMAKER' || type === 'K8S') {
+ if (
+ type === 'SAGEMAKER' ||
+ type === 'K8S' ||
+ type == 'ALIYUN_SERVERLESS_SPARK'
+ ) {
state.showHost = false
state.showPort = false
}
@@ -290,6 +300,12 @@ export function useForm(id?: number) {
state.showNamespace = true
state.showKubeConfig = true
}
+ if (type === 'ALIYUN_SERVERLESS_SPARK') {
+ state.showAccessKeyId = true
+ state.showAccessKeySecret = true
+ state.showRegionId = true
+ state.showEndpoint = true
+ }
} else {
state.showDataBaseName = true
state.requiredDataBase = true
@@ -458,6 +474,11 @@ export const datasourceType: IDataBaseOptionKeys = {
value: 'K8S',
label: 'K8S',
defaultPort: 6443
+ },
+ ALIYUN_SERVERLESS_SPARK: {
+ value: 'ALIYUN_SERVERLESS_SPARK',
+ label: 'ALIYUN_SERVERLESS_SPARK',
+ defaultPort: 0
}
}
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
index 57d168867e..673aced136 100644
---
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
+++
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
@@ -91,3 +91,4 @@ export { useDataFactory } from './use-data-factory'
export { useRemoteShell } from './use-remote-shell'
export { useDynamic } from './use-dynamic'
export { useYarnQueue } from './use-queue'
+export { useAliyunServerlessSpark } from './use-aliyun-serverless-spark'
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-aliyun-serverless-spark.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-aliyun-serverless-spark.ts
new file mode 100644
index 0000000000..56f73912c7
--- /dev/null
+++
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-aliyun-serverless-spark.ts
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import { useI18n } from 'vue-i18n'
+import { useCustomParams } from '.'
+import type { IJsonItem } from '../types'
+
+export function useAliyunServerlessSpark(model: {
+ [field: string]: any
+}): IJsonItem[] {
+ const { t } = useI18n()
+
+ return [
+ // mandatory field
+ {
+ type: 'input',
+ field: 'workspaceId',
+ name: t('project.node.workspace_id'),
+ props: {
+ placeholder: t('project.node.workspace_id_tips')
+ },
+ validate: {
+ trigger: ['input', 'blur'],
+ required: true,
+ validator(validate: any, value: string) {
+ if (!value) {
+ return new Error(t('project.node.workspace_id_tips'))
+ }
+ }
+ }
+ },
+
+ {
+ type: 'input',
+ field: 'resourceQueueId',
+ name: t('project.node.resource_queue_id'),
+ props: {
+ placeholder: t('project.node.resource_queue_id_tips')
+ },
+ validate: {
+ trigger: ['input', 'blur'],
+ required: true,
+ validator(validate: any, value: string) {
+ if (!value) {
+ return new Error(t('project.node.resource_queue_id_tips'))
+ }
+ }
+ }
+ },
+
+ {
+ type: 'input',
+ field: 'codeType',
+ name: t('project.node.code_type'),
+ props: {
+ placeholder: t('project.node.code_type_tips')
+ },
+ validate: {
+ trigger: ['input', 'blur'],
+ required: true,
+ validator(validate: any, value: string) {
+ if (!value) {
+ return new Error(t('project.node.code_type_tips'))
+ }
+ }
+ }
+ },
+
+ {
+ type: 'input',
+ field: 'jobName',
+ name: t('project.node.job_name'),
+ props: {
+ placeholder: t('project.node.job_name_tips')
+ },
+ validate: {
+ trigger: ['input', 'blur'],
+ required: true,
+ validator(validate: any, value: string) {
+ if (!value) {
+ return new Error(t('project.node.job_name_tips'))
+ }
+ }
+ }
+ },
+
+ {
+ type: 'input',
+ field: 'entryPoint',
+ name: t('project.node.entry_point'),
+ props: {
+ placeholder: t('project.node.entry_point_tips')
+ },
+ validate: {
+ trigger: ['input', 'blur'],
+ required: true,
+ validator(validate: any, value: string) {
+ if (!value) {
+ return new Error(t('project.node.entry_point_tips'))
+ }
+ }
+ }
+ },
+
+ {
+ type: 'input',
+ field: 'entryPointArguments',
+ name: t('project.node.entry_point_arguments'),
+ props: {
+ placeholder: t('project.node.entry_point_arguments_tips')
+ },
+ validate: {
+ trigger: ['input', 'blur'],
+ required: true,
+ validator(validate: any, value: string) {
+ if (!value) {
+ return new Error(t('project.node.entry_point_arguments_tips'))
+ }
+ }
+ }
+ },
+
+ {
+ type: 'input',
+ field: 'sparkSubmitParameters',
+ name: t('project.node.spark_submit_parameters'),
+ props: {
+ placeholder: t('project.node.spark_submit_parameters_tips')
+ },
+ validate: {
+ trigger: ['input', 'blur'],
+ required: true,
+ validator(validate: any, value: string) {
+ if (!value) {
+ return new Error(t('project.node.spark_submit_parameters_tips'))
+ }
+ }
+ }
+ },
+
+ // optional field
+ {
+ type: 'input',
+ field: 'engineReleaseVersion',
+ name: t('project.node.engine_release_version'),
+ props: {
+ placeholder: t('project.node.engine_release_version_tips')
+ }
+ },
+
+ {
+ type: 'switch',
+ field: 'isProduction',
+ name: t('project.node.is_production'),
+ span: 12
+ },
+
+ ...useCustomParams({ model, field: 'localParams', isSimple: false })
+ ]
+}
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datasource.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datasource.ts
index 8b4ef0b897..907ee147b3 100644
---
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datasource.ts
+++
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datasource.ts
@@ -157,6 +157,11 @@ export function useDatasource(
id: 25,
code: 'SAGEMAKER',
disabled: false
+ },
+ {
+ id: 27,
+ code: 'ALIYUN_SERVERLESS_SPARK',
+ disabled: false
}
]
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
index 73348adada..56dc9b1dfc 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
@@ -336,6 +336,20 @@ export function formatParams(data: INodeData): {
taskParams.type = data.type
}
+ if (data.taskType === 'ALIYUN_SERVERLESS_SPARK') {
+ taskParams.workspaceId = data.workspaceId
+ taskParams.resourceQueueId = data.resourceQueueId
+ taskParams.codeType = data.codeType
+ taskParams.jobName = data.jobName
+ taskParams.engineReleaseVersion = data.engineReleaseVersion
+ taskParams.entryPoint = data.entryPoint
+ taskParams.entryPointArguments = data.entryPointArguments
+ taskParams.sparkSubmitParameters = data.sparkSubmitParameters
+ taskParams.isProduction = data.isProduction
+ taskParams.type = data.type
+ taskParams.datasource = data.datasource
+ }
+
if (data.taskType === 'K8S') {
taskParams.namespace = data.namespace
taskParams.minCpuCores = data.minCpuCores
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
index 9fbc219b7f..7da82e2a13 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
@@ -52,6 +52,7 @@ import { useLinkis } from './use-linkis'
import { useDataFactory } from './use-data-factory'
import { useRemoteShell } from './use-remote-shell'
import { useDynamic } from './use-dynamic'
+import { useAliyunServerlessSpark } from './use-aliyun-serverless-spark'
export default {
SHELL: useShell,
@@ -90,5 +91,6 @@ export default {
KUBEFLOW: useKubeflow,
LINKIS: useLinkis,
DATA_FACTORY: useDataFactory,
- REMOTESHELL: useRemoteShell
+ REMOTESHELL: useRemoteShell,
+ ALIYUN_SERVERLESS_SPARK: useAliyunServerlessSpark
}
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-aliyun-serverless-spark.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-aliyun-serverless-spark.ts
new file mode 100644
index 0000000000..40ddbe857d
--- /dev/null
+++
b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-aliyun-serverless-spark.ts
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { reactive } from 'vue'
+import * as Fields from '../fields/index'
+import type { IJsonItem, INodeData, ITaskData } from '../types'
+
+export function useAliyunServerlessSpark({
+ projectCode,
+ from = 0,
+ readonly,
+ data
+}: {
+ projectCode: number
+ from?: number
+ readonly?: boolean
+ data?: ITaskData
+}) {
+ const model = reactive({
+ name: '',
+ taskType: 'ALIYUN_SERVERLESS_SPARK',
+ flag: 'YES',
+ description: '',
+ timeoutFlag: false,
+ localParams: [],
+ environmentCode: null,
+ failRetryInterval: 1,
+ failRetryTimes: 0,
+ workerGroup: 'default',
+ delayTime: 0,
+ timeout: 30,
+ type: 'ALIYUN_SERVERLESS_SPARK',
+ displayRows: 10,
+ timeoutNotifyStrategy: ['WARN'],
+ restEndpoint: '',
+ username: '',
+ password: ''
+ } as INodeData)
+
+ return {
+ json: [
+ Fields.useName(from),
+ ...Fields.useTaskDefinition({ projectCode, from, readonly, data, model
}),
+ Fields.useRunFlag(),
+ Fields.useCache(),
+ Fields.useDescription(),
+ Fields.useTaskPriority(),
+ Fields.useWorkerGroup(projectCode),
+ Fields.useEnvironmentName(model, !data?.id),
+ ...Fields.useTaskGroup(model, projectCode),
+ ...Fields.useFailed(),
+ Fields.useDelayTime(model),
+ ...Fields.useTimeoutAlarm(model),
+ ...Fields.useDatasource(model),
+ ...Fields.useAliyunServerlessSpark(model),
+ Fields.usePreTasks()
+ ] as IJsonItem[],
+ model
+ }
+}
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
index 514b0d032a..95991a7799 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
@@ -350,6 +350,17 @@ interface ITaskParams {
password?: string
zeppelinProductionNoteDirectory?: string
productionNoteDirectory?: string
+ regionId?: string
+ accessKeyId?: string
+ accessKeySecret?: string
+ workspaceId?: string
+ resourceQueueId?: string
+ codeType?: string
+ engineReleaseVersion?: string
+ entryPoint?: string
+ entryPointArguments?: string
+ sparkSubmitParameters?: string
+ isProduction?: boolean
hiveCliOptions?: string
hiveSqlScript?: string
hiveCliTaskExecutionType?: string
diff --git a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
index d6ad7dd288..0234df55fa 100644
--- a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
@@ -52,6 +52,7 @@ export type TaskType =
| 'LINKIS'
| 'DATA_FACTORY'
| 'REMOTESHELL'
+ | 'ALIYUN_SERVERLESS_SPARK'
export type TaskExecuteType = 'STREAM' | 'BATCH'
@@ -190,6 +191,10 @@ export const TASK_TYPES_MAP = {
REMOTESHELL: {
alias: 'REMOTESHELL',
helperLinkDisable: true
+ },
+ ALIYUN_SERVERLESS_SPARK: {
+ alias: 'ALIYUN_SERVERLESS_SPARK',
+ helperLinkDisable: true
}
} as {
[key in TaskType]: {
diff --git
a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
index 3a57c026c7..99c2ce6f5e 100644
---
a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
+++
b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
@@ -210,6 +210,9 @@ $bgLight: #ffffff;
&.icon-remoteshell {
background-image: url('/images/task-icons/remoteshell.png');
}
+ &.icon-icon-aliyun_serverless_spark {
+ background-image: url('/images/task-icons/aliyun_serverless_spark.png');
+ }
}
&:hover {
@@ -323,6 +326,9 @@ $bgLight: #ffffff;
&.icon-remoteshell {
background-image: url('/images/task-icons/remoteshell_hover.png');
}
+ &.icon-aliyun_serverless_spark {
+ background-image:
url('/images/task-icons/aliyun_serverless_spark_hover.png');
+ }
}
}
diff --git a/tools/dependencies/known-dependencies.txt
b/tools/dependencies/known-dependencies.txt
index 7a58095079..99e0fcf345 100644
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -520,4 +520,12 @@ semver-5.7.2.jar
sigmund-1.0.1.jar
wrappy-1.0.2.jar
yallist-2.1.2.jar
+credentials-java-0.3.0.jar
+dom4j-2.0.3.jar
+emr_serverless_spark20230808-1.0.0.jar
+openapiutil-0.2.1.jar
+tea-1.2.7.jar
+tea-openapi-0.3.2.jar
+tea-util-0.2.21.jar
+tea-xml-0.1.5.jar