This is an automated email from the ASF dual-hosted git repository.

zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 0a535447a9 [Improvement-16994][TaskPlugin] support serverless spark 
template (#17411)
0a535447a9 is described below

commit 0a535447a90da69e1d3db88fd0a379515399f932
Author: Evan <[email protected]>
AuthorDate: Wed Aug 13 18:29:46 2025 +0800

    [Improvement-16994][TaskPlugin] support serverless spark template (#17411)
---
 .pre-commit-config.yaml                            | 10 +--
 .../pom.xml                                        |  2 +-
 .../AliyunServerlessSparkParameters.java           |  2 +
 .../AliyunServerlessSparkTask.java                 | 71 ++++++++++++++++++++--
 .../AliyunServerlessSparkTaskTest.java             | 20 +++++-
 dolphinscheduler-ui/src/locales/en_US/project.ts   |  2 +
 dolphinscheduler-ui/src/locales/zh_CN/project.ts   |  2 +
 .../node/fields/use-aliyun-serverless-spark.ts     |  9 +++
 .../projects/task/components/node/format-data.ts   |  1 +
 .../views/projects/task/components/node/types.ts   |  1 +
 10 files changed, 106 insertions(+), 14 deletions(-)

diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index e51d15a16e..d8912b881a 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -18,23 +18,23 @@
 # See https://pre-commit.com for more information
 # See https://pre-commit.com/hooks.html for more hooks
 
-default_stages: [commit, push]
+default_stages: [pre-commit, pre-push]
 default_language_version:
   # force all python hooks to run python3
   python: python3
 repos:
   # Python API Hooks
   - repo: https://github.com/pycqa/isort
-    rev: 5.10.1
+    rev: 6.0.1
     hooks:
       - id: isort
         name: isort (python)
   - repo: https://github.com/psf/black
-    rev: 22.3.0
+    rev: 25.1.0
     hooks:
       - id: black
   - repo: https://github.com/pycqa/flake8
-    rev: 4.0.1
+    rev: 7.3.0
     hooks:
       - id: flake8
         additional_dependencies: [
@@ -42,7 +42,7 @@ repos:
           'flake8-black>=0.2',
         ]
   - repo: https://github.com/pycqa/autoflake
-    rev: v1.4
+    rev: v2.3.1
     hooks:
       - id: autoflake
         args: [
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/pom.xml
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/pom.xml
index 9919ce0ea8..77337f9837 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/pom.xml
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/pom.xml
@@ -67,7 +67,7 @@
         <dependency>
             <groupId>com.aliyun</groupId>
             <artifactId>emr_serverless_spark20230808</artifactId>
-            <version>1.0.0</version>
+            <version>2.4.1</version>
         </dependency>
 
         <dependency>
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkParameters.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkParameters.java
index acb76f92a1..395d725fd5 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkParameters.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkParameters.java
@@ -47,6 +47,8 @@ public class AliyunServerlessSparkParameters extends 
AbstractParameters {
 
     private String sparkSubmitParameters;
 
+    private String templateId;
+
     @JsonProperty("isProduction")
     boolean isProduction;
 
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTask.java
index e2fc7e9842..8824665180 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTask.java
@@ -30,6 +30,7 @@ import 
org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType;
 import 
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
 import 
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters;
 import 
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
+import org.apache.dolphinscheduler.plugin.task.api.utils.RetryUtils;
 import org.apache.dolphinscheduler.spi.enums.DbType;
 
 import org.apache.commons.lang3.StringUtils;
@@ -39,6 +40,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -46,6 +48,8 @@ import com.aliyun.emr_serverless_spark20230808.Client;
 import com.aliyun.emr_serverless_spark20230808.models.CancelJobRunRequest;
 import com.aliyun.emr_serverless_spark20230808.models.GetJobRunRequest;
 import com.aliyun.emr_serverless_spark20230808.models.GetJobRunResponse;
+import com.aliyun.emr_serverless_spark20230808.models.GetTemplateRequest;
+import com.aliyun.emr_serverless_spark20230808.models.GetTemplateResponse;
 import com.aliyun.emr_serverless_spark20230808.models.JobDriver;
 import com.aliyun.emr_serverless_spark20230808.models.StartJobRunRequest;
 import com.aliyun.emr_serverless_spark20230808.models.StartJobRunResponse;
@@ -64,6 +68,12 @@ public class AliyunServerlessSparkTask extends 
AbstractRemoteTask {
 
     private AliyunServerlessSparkConnectionParam 
aliyunServerlessSparkConnectionParam;
 
+    private String templateConf;
+
+    private String templateDisplayReleaseVersion;
+
+    private Boolean templateFusion;
+
     private String jobRunId;
 
     private RunState currentState;
@@ -116,6 +126,26 @@ public class AliyunServerlessSparkTask extends 
AbstractRemoteTask {
 
     @Override
     public void handle(TaskCallBack taskCallBack) throws TaskException {
+        try {
+            GetTemplateResponse getTemplateResponse = 
aliyunServerlessSparkClient.getTemplate(
+                    aliyunServerlessSparkParameters.getWorkspaceId(),
+                    buildGetTemplateRequest());
+
+            if (getTemplateResponse != null) {
+                templateConf = getTemplateResponse.getBody()
+                        .getData()
+                        .getSparkConf()
+                        .stream()
+                        .map(item -> "--conf " + item.getKey() + "=" + 
item.getValue())
+                        .collect(Collectors.joining(" "));
+
+                templateDisplayReleaseVersion = 
getTemplateResponse.getBody().getData().getDisplaySparkVersion();
+                templateFusion = 
getTemplateResponse.getBody().getData().getFusion();
+            }
+        } catch (Exception e) {
+            throw new AliyunServerlessSparkTaskException("Failed to get 
serverless spark template!");
+        }
+
         try {
             StartJobRunRequest startJobRunRequest = 
buildStartJobRunRequest(aliyunServerlessSparkParameters);
             RuntimeOptions runtime = new RuntimeOptions();
@@ -128,8 +158,17 @@ public class AliyunServerlessSparkTask extends 
AbstractRemoteTask {
 
             while (!RunState.isFinal(currentState)) {
                 GetJobRunRequest getJobRunRequest = buildGetJobRunRequest();
-                GetJobRunResponse getJobRunResponse = 
aliyunServerlessSparkClient
-                        
.getJobRun(aliyunServerlessSparkParameters.getWorkspaceId(), jobRunId, 
getJobRunRequest);
+
+                GetJobRunResponse getJobRunResponse = 
RetryUtils.retryFunction(() -> {
+                    try {
+                        return aliyunServerlessSparkClient
+                                
.getJobRun(aliyunServerlessSparkParameters.getWorkspaceId(), jobRunId,
+                                        getJobRunRequest);
+                    } catch (Exception e) {
+                        throw new AliyunServerlessSparkTaskException("Failed 
to get job run!", e);
+                    }
+                }, new RetryUtils.RetryPolicy(10, 1000L));
+
                 currentState = 
RunState.valueOf(getJobRunResponse.getBody().getJobRun().getState());
                 log.info("job - {} state - {}", jobRunId, currentState);
                 Thread.sleep(10 * 1000L);
@@ -199,16 +238,26 @@ public class AliyunServerlessSparkTask extends 
AbstractRemoteTask {
     }
 
     protected StartJobRunRequest 
buildStartJobRunRequest(AliyunServerlessSparkParameters 
aliyunServerlessSparkParameters) {
+        if (templateConf != null) {
+            aliyunServerlessSparkParameters.setSparkSubmitParameters(
+                    templateConf + " " + 
aliyunServerlessSparkParameters.getSparkSubmitParameters());
+        }
+
         StartJobRunRequest startJobRunRequest = new StartJobRunRequest();
         startJobRunRequest.setRegionId(regionId);
         
startJobRunRequest.setResourceQueueId(aliyunServerlessSparkParameters.getResourceQueueId());
         
startJobRunRequest.setCodeType(aliyunServerlessSparkParameters.getCodeType());
         
startJobRunRequest.setName(aliyunServerlessSparkParameters.getJobName());
+
         String engineReleaseVersion = 
aliyunServerlessSparkParameters.getEngineReleaseVersion();
-        engineReleaseVersion =
-                StringUtils.isEmpty(engineReleaseVersion) ? 
AliyunServerlessSparkConstants.DEFAULT_ENGINE
-                        : engineReleaseVersion;
-        startJobRunRequest.setReleaseVersion(engineReleaseVersion);
+
+        if (engineReleaseVersion != null && !engineReleaseVersion.isEmpty()) {
+            startJobRunRequest.setReleaseVersion(engineReleaseVersion);
+        } else if (templateDisplayReleaseVersion != null && templateFusion != 
null) {
+            
startJobRunRequest.setDisplayReleaseVersion(templateDisplayReleaseVersion);
+            startJobRunRequest.setFusion(templateFusion);
+        }
+
         Tag envTag = new Tag();
         envTag.setKey(AliyunServerlessSparkConstants.ENV_KEY);
         String envType = aliyunServerlessSparkParameters.isProduction() ? 
AliyunServerlessSparkConstants.ENV_PROD
@@ -243,4 +292,14 @@ public class AliyunServerlessSparkTask extends 
AbstractRemoteTask {
         cancelJobRunRequest.setRegionId(regionId);
         return cancelJobRunRequest;
     }
+
+    protected GetTemplateRequest buildGetTemplateRequest() {
+        GetTemplateRequest getTemplateRequest = new GetTemplateRequest();
+
+        if (aliyunServerlessSparkParameters.getTemplateId() != null) {
+            
getTemplateRequest.setTemplateBizId(aliyunServerlessSparkParameters.getTemplateId());
+        }
+
+        return getTemplateRequest;
+    }
 }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/test/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/test/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskTest.java
index ee331e56a1..6a29356fa7 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/test/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskTest.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/test/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskTest.java
@@ -32,6 +32,8 @@ import 
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourc
 import 
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
 import org.apache.dolphinscheduler.spi.enums.DbType;
 
+import java.util.Collections;
+
 import lombok.extern.slf4j.Slf4j;
 
 import org.junit.jupiter.api.Assertions;
@@ -51,9 +53,12 @@ import 
com.aliyun.emr_serverless_spark20230808.models.CancelJobRunResponse;
 import com.aliyun.emr_serverless_spark20230808.models.GetJobRunRequest;
 import com.aliyun.emr_serverless_spark20230808.models.GetJobRunResponse;
 import com.aliyun.emr_serverless_spark20230808.models.GetJobRunResponseBody;
+import com.aliyun.emr_serverless_spark20230808.models.GetTemplateResponse;
+import com.aliyun.emr_serverless_spark20230808.models.GetTemplateResponseBody;
 import com.aliyun.emr_serverless_spark20230808.models.StartJobRunRequest;
 import com.aliyun.emr_serverless_spark20230808.models.StartJobRunResponse;
 import com.aliyun.emr_serverless_spark20230808.models.StartJobRunResponseBody;
+import com.aliyun.emr_serverless_spark20230808.models.Template;
 
 @Slf4j
 @ExtendWith(MockitoExtension.class)
@@ -90,6 +95,9 @@ public class AliyunServerlessSparkTaskTest {
     @Mock
     private CancelJobRunResponse mockCancelJobRunResponse;
 
+    @Mock
+    private GetTemplateResponse mockGetTemplateResponse;
+
     @InjectMocks
     @Spy
     private AliyunServerlessSparkTask aliyunServerlessSparkTask;
@@ -124,6 +132,8 @@ public class AliyunServerlessSparkTaskTest {
 
     private static final String mockEntryPointArguments = "10";
 
+    private static final String mockTemplateId = "TPL-XXXXX";
+
     @BeforeEach
     public void before() {
         
when(mockTaskExecutionContext.getTaskParams()).thenReturn(taskParamsString);
@@ -167,6 +177,12 @@ public class AliyunServerlessSparkTaskTest {
                 () -> 
doReturn(mockGetJobRunResponse).when(mockAliyunServerlessSparkClient).getJobRun(any(),
 any(),
                         any()));
 
+        mockGetTemplateResponse = new GetTemplateResponse()
+                .setBody(new GetTemplateResponseBody().setData(new 
Template().setSparkConf(Collections.emptyList())));
+        Assertions.assertDoesNotThrow(
+                () -> 
doReturn(mockGetTemplateResponse).when(mockAliyunServerlessSparkClient).getTemplate(any(),
+                        any()));
+
         aliyunServerlessSparkTask.init();
         aliyunServerlessSparkTask.handle(mockTaskCallBack);
         verify(aliyunServerlessSparkTask).setAppIds(mockJobRunId);
@@ -190,6 +206,8 @@ public class AliyunServerlessSparkTaskTest {
     public void testBuildStartJobRunRequest() {
         AliyunServerlessSparkParameters mockAliyunServerlessSparkParameters =
                 mock(AliyunServerlessSparkParameters.class);
+        
doReturn(mockWorkspaceId).when(mockAliyunServerlessSparkParameters).getWorkspaceId();
+        
doReturn(mockTemplateId).when(mockAliyunServerlessSparkParameters).getTemplateId();
         
doReturn(mockResourceQueueId).when(mockAliyunServerlessSparkParameters).getResourceQueueId();
         
doReturn("JAR").when(mockAliyunServerlessSparkParameters).getCodeType();
         
doReturn("ds-test").when(mockAliyunServerlessSparkParameters).getJobName();
@@ -198,12 +216,10 @@ public class AliyunServerlessSparkTaskTest {
         
doReturn(mockEntryPointArguments).when(mockAliyunServerlessSparkParameters).getEntryPointArguments();
 
         
aliyunServerlessSparkTask.buildStartJobRunRequest(mockAliyunServerlessSparkParameters);
-
         verify(mockAliyunServerlessSparkParameters).getResourceQueueId();
         verify(mockAliyunServerlessSparkParameters).getCodeType();
         verify(mockAliyunServerlessSparkParameters).getJobName();
         verify(mockAliyunServerlessSparkParameters).getEngineReleaseVersion();
         verify(mockAliyunServerlessSparkParameters).isProduction();
     }
-
 }
diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts 
b/dolphinscheduler-ui/src/locales/en_US/project.ts
index fca4d784b1..2f5acaa722 100644
--- a/dolphinscheduler-ui/src/locales/en_US/project.ts
+++ b/dolphinscheduler-ui/src/locales/en_US/project.ts
@@ -922,6 +922,8 @@ export default {
     entry_point_arguments_tips: 'entry point arguments',
     spark_submit_parameters: 'spark submit parameters',
     spark_submit_parameters_tips: 'spark submit parameters',
+    template_id: 'spark template id',
+    template_id_tips: 'spark template id',
     is_production: 'is production',
     is_production_tips: 'is production',
     json_format_tips: 'Json parameters format is abnormal'
diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts 
b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
index d54e5a0eae..6ef6485355 100644
--- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts
+++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
@@ -892,6 +892,8 @@ export default {
     entry_point_arguments_tips: 'entry point arguments',
     spark_submit_parameters: 'spark submit parameters',
     spark_submit_parameters_tips: 'spark submit parameters',
+    template_id: 'spark template id',
+    template_id_tips: 'spark template id',
     is_production: 'is production',
     is_production_tips: 'is production',
     json_format_tips: 'JSON参数格式异常'
diff --git 
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-aliyun-serverless-spark.ts
 
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-aliyun-serverless-spark.ts
index 650f488271..9dacea8d85 100644
--- 
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-aliyun-serverless-spark.ts
+++ 
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-aliyun-serverless-spark.ts
@@ -161,6 +161,15 @@ export function useAliyunServerlessSpark(model: {
       }
     },
 
+    {
+      type: 'input',
+      field: 'templateId',
+      name: t('project.node.template_id'),
+      props: {
+        placeholder: t('project.node.template_id_tips')
+      }
+    },
+
     {
       type: 'switch',
       field: 'isProduction',
diff --git 
a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts 
b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
index 7e100d5681..ab41b56208 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
@@ -305,6 +305,7 @@ export function formatParams(data: INodeData): {
     taskParams.codeType = data.codeType
     taskParams.jobName = data.jobName
     taskParams.engineReleaseVersion = data.engineReleaseVersion
+    taskParams.templateId = data.templateId
     taskParams.entryPoint = data.entryPoint
     taskParams.entryPointArguments = data.entryPointArguments
     taskParams.sparkSubmitParameters = data.sparkSubmitParameters
diff --git 
a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts 
b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
index 562e8f89c0..b33531f78e 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
@@ -357,6 +357,7 @@ interface ITaskParams {
   resourceQueueId?: string
   codeType?: string
   engineReleaseVersion?: string
+  templateId?: string
   entryPoint?: string
   entryPointArguments?: string
   sparkSubmitParameters?: string

Reply via email to