This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 0a535447a9 [Improvement-16994][TaskPlugin] support serverless spark
template (#17411)
0a535447a9 is described below
commit 0a535447a90da69e1d3db88fd0a379515399f932
Author: Evan <[email protected]>
AuthorDate: Wed Aug 13 18:29:46 2025 +0800
[Improvement-16994][TaskPlugin] support serverless spark template (#17411)
---
.pre-commit-config.yaml | 10 +--
.../pom.xml | 2 +-
.../AliyunServerlessSparkParameters.java | 2 +
.../AliyunServerlessSparkTask.java | 71 ++++++++++++++++++++--
.../AliyunServerlessSparkTaskTest.java | 20 +++++-
dolphinscheduler-ui/src/locales/en_US/project.ts | 2 +
dolphinscheduler-ui/src/locales/zh_CN/project.ts | 2 +
.../node/fields/use-aliyun-serverless-spark.ts | 9 +++
.../projects/task/components/node/format-data.ts | 1 +
.../views/projects/task/components/node/types.ts | 1 +
10 files changed, 106 insertions(+), 14 deletions(-)
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index e51d15a16e..d8912b881a 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -18,23 +18,23 @@
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks
-default_stages: [commit, push]
+default_stages: [pre-commit, pre-push]
default_language_version:
# force all python hooks to run python3
python: python3
repos:
# Python API Hooks
- repo: https://github.com/pycqa/isort
- rev: 5.10.1
+ rev: 6.0.1
hooks:
- id: isort
name: isort (python)
- repo: https://github.com/psf/black
- rev: 22.3.0
+ rev: 25.1.0
hooks:
- id: black
- repo: https://github.com/pycqa/flake8
- rev: 4.0.1
+ rev: 7.3.0
hooks:
- id: flake8
additional_dependencies: [
@@ -42,7 +42,7 @@ repos:
'flake8-black>=0.2',
]
- repo: https://github.com/pycqa/autoflake
- rev: v1.4
+ rev: v2.3.1
hooks:
- id: autoflake
args: [
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/pom.xml
b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/pom.xml
index 9919ce0ea8..77337f9837 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/pom.xml
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/pom.xml
@@ -67,7 +67,7 @@
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>emr_serverless_spark20230808</artifactId>
- <version>1.0.0</version>
+ <version>2.4.1</version>
</dependency>
<dependency>
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkParameters.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkParameters.java
index acb76f92a1..395d725fd5 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkParameters.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkParameters.java
@@ -47,6 +47,8 @@ public class AliyunServerlessSparkParameters extends
AbstractParameters {
private String sparkSubmitParameters;
+ private String templateId;
+
@JsonProperty("isProduction")
boolean isProduction;
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTask.java
index e2fc7e9842..8824665180 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTask.java
@@ -30,6 +30,7 @@ import
org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType;
import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters;
import
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
+import org.apache.dolphinscheduler.plugin.task.api.utils.RetryUtils;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.commons.lang3.StringUtils;
@@ -39,6 +40,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
@@ -46,6 +48,8 @@ import com.aliyun.emr_serverless_spark20230808.Client;
import com.aliyun.emr_serverless_spark20230808.models.CancelJobRunRequest;
import com.aliyun.emr_serverless_spark20230808.models.GetJobRunRequest;
import com.aliyun.emr_serverless_spark20230808.models.GetJobRunResponse;
+import com.aliyun.emr_serverless_spark20230808.models.GetTemplateRequest;
+import com.aliyun.emr_serverless_spark20230808.models.GetTemplateResponse;
import com.aliyun.emr_serverless_spark20230808.models.JobDriver;
import com.aliyun.emr_serverless_spark20230808.models.StartJobRunRequest;
import com.aliyun.emr_serverless_spark20230808.models.StartJobRunResponse;
@@ -64,6 +68,12 @@ public class AliyunServerlessSparkTask extends
AbstractRemoteTask {
private AliyunServerlessSparkConnectionParam
aliyunServerlessSparkConnectionParam;
+ private String templateConf;
+
+ private String templateDisplayReleaseVersion;
+
+ private Boolean templateFusion;
+
private String jobRunId;
private RunState currentState;
@@ -116,6 +126,26 @@ public class AliyunServerlessSparkTask extends
AbstractRemoteTask {
@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
+ try {
+ GetTemplateResponse getTemplateResponse =
aliyunServerlessSparkClient.getTemplate(
+ aliyunServerlessSparkParameters.getWorkspaceId(),
+ buildGetTemplateRequest());
+
+ if (getTemplateResponse != null) {
+ templateConf = getTemplateResponse.getBody()
+ .getData()
+ .getSparkConf()
+ .stream()
+ .map(item -> "--conf " + item.getKey() + "=" +
item.getValue())
+ .collect(Collectors.joining(" "));
+
+ templateDisplayReleaseVersion =
getTemplateResponse.getBody().getData().getDisplaySparkVersion();
+ templateFusion =
getTemplateResponse.getBody().getData().getFusion();
+ }
+ } catch (Exception e) {
+ throw new AliyunServerlessSparkTaskException("Failed to get
serverless spark template!");
+ }
+
try {
StartJobRunRequest startJobRunRequest =
buildStartJobRunRequest(aliyunServerlessSparkParameters);
RuntimeOptions runtime = new RuntimeOptions();
@@ -128,8 +158,17 @@ public class AliyunServerlessSparkTask extends
AbstractRemoteTask {
while (!RunState.isFinal(currentState)) {
GetJobRunRequest getJobRunRequest = buildGetJobRunRequest();
- GetJobRunResponse getJobRunResponse =
aliyunServerlessSparkClient
-
.getJobRun(aliyunServerlessSparkParameters.getWorkspaceId(), jobRunId,
getJobRunRequest);
+
+ GetJobRunResponse getJobRunResponse =
RetryUtils.retryFunction(() -> {
+ try {
+ return aliyunServerlessSparkClient
+
.getJobRun(aliyunServerlessSparkParameters.getWorkspaceId(), jobRunId,
+ getJobRunRequest);
+ } catch (Exception e) {
+ throw new AliyunServerlessSparkTaskException("Failed
to get job run!", e);
+ }
+ }, new RetryUtils.RetryPolicy(10, 1000L));
+
currentState =
RunState.valueOf(getJobRunResponse.getBody().getJobRun().getState());
log.info("job - {} state - {}", jobRunId, currentState);
Thread.sleep(10 * 1000L);
@@ -199,16 +238,26 @@ public class AliyunServerlessSparkTask extends
AbstractRemoteTask {
}
protected StartJobRunRequest
buildStartJobRunRequest(AliyunServerlessSparkParameters
aliyunServerlessSparkParameters) {
+ if (templateConf != null) {
+ aliyunServerlessSparkParameters.setSparkSubmitParameters(
+ templateConf + " " +
aliyunServerlessSparkParameters.getSparkSubmitParameters());
+ }
+
StartJobRunRequest startJobRunRequest = new StartJobRunRequest();
startJobRunRequest.setRegionId(regionId);
startJobRunRequest.setResourceQueueId(aliyunServerlessSparkParameters.getResourceQueueId());
startJobRunRequest.setCodeType(aliyunServerlessSparkParameters.getCodeType());
startJobRunRequest.setName(aliyunServerlessSparkParameters.getJobName());
+
String engineReleaseVersion =
aliyunServerlessSparkParameters.getEngineReleaseVersion();
- engineReleaseVersion =
- StringUtils.isEmpty(engineReleaseVersion) ?
AliyunServerlessSparkConstants.DEFAULT_ENGINE
- : engineReleaseVersion;
- startJobRunRequest.setReleaseVersion(engineReleaseVersion);
+
+ if (engineReleaseVersion != null && !engineReleaseVersion.isEmpty()) {
+ startJobRunRequest.setReleaseVersion(engineReleaseVersion);
+ } else if (templateDisplayReleaseVersion != null && templateFusion !=
null) {
+
startJobRunRequest.setDisplayReleaseVersion(templateDisplayReleaseVersion);
+ startJobRunRequest.setFusion(templateFusion);
+ }
+
Tag envTag = new Tag();
envTag.setKey(AliyunServerlessSparkConstants.ENV_KEY);
String envType = aliyunServerlessSparkParameters.isProduction() ?
AliyunServerlessSparkConstants.ENV_PROD
@@ -243,4 +292,14 @@ public class AliyunServerlessSparkTask extends
AbstractRemoteTask {
cancelJobRunRequest.setRegionId(regionId);
return cancelJobRunRequest;
}
+
+ protected GetTemplateRequest buildGetTemplateRequest() {
+ GetTemplateRequest getTemplateRequest = new GetTemplateRequest();
+
+ if (aliyunServerlessSparkParameters.getTemplateId() != null) {
+
getTemplateRequest.setTemplateBizId(aliyunServerlessSparkParameters.getTemplateId());
+ }
+
+ return getTemplateRequest;
+ }
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/test/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/test/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskTest.java
index ee331e56a1..6a29356fa7 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/test/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/test/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskTest.java
@@ -32,6 +32,8 @@ import
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourc
import
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
import org.apache.dolphinscheduler.spi.enums.DbType;
+import java.util.Collections;
+
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
@@ -51,9 +53,12 @@ import
com.aliyun.emr_serverless_spark20230808.models.CancelJobRunResponse;
import com.aliyun.emr_serverless_spark20230808.models.GetJobRunRequest;
import com.aliyun.emr_serverless_spark20230808.models.GetJobRunResponse;
import com.aliyun.emr_serverless_spark20230808.models.GetJobRunResponseBody;
+import com.aliyun.emr_serverless_spark20230808.models.GetTemplateResponse;
+import com.aliyun.emr_serverless_spark20230808.models.GetTemplateResponseBody;
import com.aliyun.emr_serverless_spark20230808.models.StartJobRunRequest;
import com.aliyun.emr_serverless_spark20230808.models.StartJobRunResponse;
import com.aliyun.emr_serverless_spark20230808.models.StartJobRunResponseBody;
+import com.aliyun.emr_serverless_spark20230808.models.Template;
@Slf4j
@ExtendWith(MockitoExtension.class)
@@ -90,6 +95,9 @@ public class AliyunServerlessSparkTaskTest {
@Mock
private CancelJobRunResponse mockCancelJobRunResponse;
+ @Mock
+ private GetTemplateResponse mockGetTemplateResponse;
+
@InjectMocks
@Spy
private AliyunServerlessSparkTask aliyunServerlessSparkTask;
@@ -124,6 +132,8 @@ public class AliyunServerlessSparkTaskTest {
private static final String mockEntryPointArguments = "10";
+ private static final String mockTemplateId = "TPL-XXXXX";
+
@BeforeEach
public void before() {
when(mockTaskExecutionContext.getTaskParams()).thenReturn(taskParamsString);
@@ -167,6 +177,12 @@ public class AliyunServerlessSparkTaskTest {
() ->
doReturn(mockGetJobRunResponse).when(mockAliyunServerlessSparkClient).getJobRun(any(),
any(),
any()));
+ mockGetTemplateResponse = new GetTemplateResponse()
+ .setBody(new GetTemplateResponseBody().setData(new
Template().setSparkConf(Collections.emptyList())));
+ Assertions.assertDoesNotThrow(
+ () ->
doReturn(mockGetTemplateResponse).when(mockAliyunServerlessSparkClient).getTemplate(any(),
+ any()));
+
aliyunServerlessSparkTask.init();
aliyunServerlessSparkTask.handle(mockTaskCallBack);
verify(aliyunServerlessSparkTask).setAppIds(mockJobRunId);
@@ -190,6 +206,8 @@ public class AliyunServerlessSparkTaskTest {
public void testBuildStartJobRunRequest() {
AliyunServerlessSparkParameters mockAliyunServerlessSparkParameters =
mock(AliyunServerlessSparkParameters.class);
+
doReturn(mockWorkspaceId).when(mockAliyunServerlessSparkParameters).getWorkspaceId();
+
doReturn(mockTemplateId).when(mockAliyunServerlessSparkParameters).getTemplateId();
doReturn(mockResourceQueueId).when(mockAliyunServerlessSparkParameters).getResourceQueueId();
doReturn("JAR").when(mockAliyunServerlessSparkParameters).getCodeType();
doReturn("ds-test").when(mockAliyunServerlessSparkParameters).getJobName();
@@ -198,12 +216,10 @@ public class AliyunServerlessSparkTaskTest {
doReturn(mockEntryPointArguments).when(mockAliyunServerlessSparkParameters).getEntryPointArguments();
aliyunServerlessSparkTask.buildStartJobRunRequest(mockAliyunServerlessSparkParameters);
-
verify(mockAliyunServerlessSparkParameters).getResourceQueueId();
verify(mockAliyunServerlessSparkParameters).getCodeType();
verify(mockAliyunServerlessSparkParameters).getJobName();
verify(mockAliyunServerlessSparkParameters).getEngineReleaseVersion();
verify(mockAliyunServerlessSparkParameters).isProduction();
}
-
}
diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts
b/dolphinscheduler-ui/src/locales/en_US/project.ts
index fca4d784b1..2f5acaa722 100644
--- a/dolphinscheduler-ui/src/locales/en_US/project.ts
+++ b/dolphinscheduler-ui/src/locales/en_US/project.ts
@@ -922,6 +922,8 @@ export default {
entry_point_arguments_tips: 'entry point arguments',
spark_submit_parameters: 'spark submit parameters',
spark_submit_parameters_tips: 'spark submit parameters',
+ template_id: 'spark template id',
+ template_id_tips: 'spark template id',
is_production: 'is production',
is_production_tips: 'is production',
json_format_tips: 'Json parameters format is abnormal'
diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts
b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
index d54e5a0eae..6ef6485355 100644
--- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts
+++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
@@ -892,6 +892,8 @@ export default {
entry_point_arguments_tips: 'entry point arguments',
spark_submit_parameters: 'spark submit parameters',
spark_submit_parameters_tips: 'spark submit parameters',
+ template_id: 'spark template id',
+ template_id_tips: 'spark template id',
is_production: 'is production',
is_production_tips: 'is production',
json_format_tips: 'JSON参数格式异常'
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-aliyun-serverless-spark.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-aliyun-serverless-spark.ts
index 650f488271..9dacea8d85 100644
---
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-aliyun-serverless-spark.ts
+++
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-aliyun-serverless-spark.ts
@@ -161,6 +161,15 @@ export function useAliyunServerlessSpark(model: {
}
},
+ {
+ type: 'input',
+ field: 'templateId',
+ name: t('project.node.template_id'),
+ props: {
+ placeholder: t('project.node.template_id_tips')
+ }
+ },
+
{
type: 'switch',
field: 'isProduction',
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
index 7e100d5681..ab41b56208 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
@@ -305,6 +305,7 @@ export function formatParams(data: INodeData): {
taskParams.codeType = data.codeType
taskParams.jobName = data.jobName
taskParams.engineReleaseVersion = data.engineReleaseVersion
+ taskParams.templateId = data.templateId
taskParams.entryPoint = data.entryPoint
taskParams.entryPointArguments = data.entryPointArguments
taskParams.sparkSubmitParameters = data.sparkSubmitParameters
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
index 562e8f89c0..b33531f78e 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
@@ -357,6 +357,7 @@ interface ITaskParams {
resourceQueueId?: string
codeType?: string
engineReleaseVersion?: string
+ templateId?: string
entryPoint?: string
entryPointArguments?: string
sparkSubmitParameters?: string