EricGao888 commented on code in PR #17476:
URL:
https://github.com/apache/dolphinscheduler/pull/17476#discussion_r2324651870
##########
dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTask.java:
##########
@@ -126,60 +126,66 @@ public void init() {
@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
- try {
- GetTemplateResponse getTemplateResponse =
aliyunServerlessSparkClient.getTemplate(
- aliyunServerlessSparkParameters.getWorkspaceId(),
- buildGetTemplateRequest());
-
- if (getTemplateResponse != null) {
- templateConf = getTemplateResponse.getBody()
- .getData()
- .getSparkConf()
- .stream()
- .map(item -> "--conf " + item.getKey() + "=" +
item.getValue())
- .collect(Collectors.joining(" "));
-
- templateDisplayReleaseVersion =
getTemplateResponse.getBody().getData().getDisplaySparkVersion();
- templateFusion =
getTemplateResponse.getBody().getData().getFusion();
+ GetTemplateResponse getTemplateResponse = RetryUtils.retryFunction(()
-> {
+ try {
+ return aliyunServerlessSparkClient.getTemplate(
+ aliyunServerlessSparkParameters.getWorkspaceId(),
+ buildGetTemplateRequest());
+ } catch (Exception e) {
+ throw new TaskException("Failed to get template info", e);
}
- } catch (Exception e) {
- throw new AliyunServerlessSparkTaskException("Failed to get
serverless spark template!");
+ }, retryPolicy);
+
+ if (getTemplateResponse != null) {
+ templateConf = getTemplateResponse.getBody()
+ .getData()
+ .getSparkConf()
+ .stream()
+ .map(item -> "--conf " + item.getKey() + "=" +
item.getValue())
+ .collect(Collectors.joining(" "));
+
+ templateDisplayReleaseVersion =
getTemplateResponse.getBody().getData().getDisplaySparkVersion();
+ templateFusion =
getTemplateResponse.getBody().getData().getFusion();
}
- try {
- StartJobRunRequest startJobRunRequest =
buildStartJobRunRequest(aliyunServerlessSparkParameters);
- RuntimeOptions runtime = new RuntimeOptions();
- Map<String, String> headers = new HashMap<>();
- StartJobRunResponse startJobRunResponse =
aliyunServerlessSparkClient.startJobRunWithOptions(
- aliyunServerlessSparkParameters.getWorkspaceId(),
startJobRunRequest, headers, runtime);
- jobRunId = startJobRunResponse.getBody().getJobRunId();
- setAppIds(jobRunId);
- log.info("Successfully submitted serverless spark job, jobRunId -
{}", jobRunId);
-
- while (!RunState.isFinal(currentState)) {
- GetJobRunRequest getJobRunRequest = buildGetJobRunRequest();
-
- GetJobRunResponse getJobRunResponse =
RetryUtils.retryFunction(() -> {
- try {
- return aliyunServerlessSparkClient
-
.getJobRun(aliyunServerlessSparkParameters.getWorkspaceId(), jobRunId,
- getJobRunRequest);
- } catch (Exception e) {
- throw new AliyunServerlessSparkTaskException("Failed
to get job run!", e);
- }
- }, new RetryUtils.RetryPolicy(10, 1000L));
-
- currentState =
RunState.valueOf(getJobRunResponse.getBody().getJobRun().getState());
- log.info("job - {} state - {}", jobRunId, currentState);
- Thread.sleep(10 * 1000L);
+ StartJobRunRequest startJobRunRequest =
buildStartJobRunRequest(aliyunServerlessSparkParameters);
+ StartJobRunResponse startJobRunResponse = RetryUtils.retryFunction(()
-> {
+ try {
+ return aliyunServerlessSparkClient.startJobRun(
+ aliyunServerlessSparkParameters.getWorkspaceId(),
startJobRunRequest);
+ } catch (Exception e) {
+ throw new AliyunServerlessSparkTaskException("Failed to start
job run!");
Review Comment:
> There seems to exist timeout issue here, if the http is timeout, then
client will retry, but server side might already handle the previous, then will
cause the request be handled twice. I'm unsure whether the service side has
implemented idempotency handling. Because a new token is passed here each time,
so the server side cannot know the second request are retry.
@ruanwenjun @abzymeatsjtu Looks like the token is generated and set when
initializing the request line#257, therefore, I assume the idempotency is
alright here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]