Copilot commented on code in PR #17779:
URL:
https://github.com/apache/dolphinscheduler/pull/17779#discussion_r2652315081
##########
dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss:
##########
@@ -227,6 +230,9 @@ $bgLight: #ffffff;
&.icon-flink_stream {
background-image: url('/images/task-icons/flink_hover.png');
}
+ &.icon-external_stream {
+ background-image: url('/images/task-icons/external_stream_hover.png');
+ }
Review Comment:
The CSS class name 'icon-external_stream' doesn't match the task type
'EXTERNAL_SYSTEM'. This should be 'icon-external_system' for consistency with
the icon defined at line 120.
##########
dolphinscheduler-ui/src/store/project/task-type.ts:
##########
@@ -126,6 +126,15 @@ export const TASK_TYPES_MAP = {
helperLinkDisable: true,
taskExecuteType: 'STREAM'
},
+
+ NAL_SYSTEM : {
Review Comment:
The property name 'NAL_SYSTEM' appears to be incomplete or corrupted. This
should be 'EXTERNAL_SYSTEM' to match the task type definition elsewhere in the
codebase.
##########
dolphinscheduler-ui/src/views/thirdparty-api-source/use-table.ts:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { reactive, ref } from 'vue'
+import {
+ queryDataSourceListPaging,
+ deleteDataSource
+} from '@/service/modules/data-source'
Review Comment:
The imports reference `data-source` module but this file is in the
`thirdparty-api-source` directory. This creates a dependency on the wrong
module. The file should either use dedicated third-party API source service
methods or be moved to use the correct service module for consistency.
##########
dolphinscheduler-ui/src/locales/zh_CN/thirdparty-api-source.ts:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+export default {
+ thirdparty_api_source: '第三方系统API',
+ create_thirdparty_api_source: '创建第三方系统API',
+ edit_thirdparty_api_source: '编辑第三方系统API',
+
+ basic_info: '基本信息',
+ id: 'ID',
+ system_name: '系统名称',
+ system_name_tips: '请输入系统名称',
+ service_address: '服务地址',
+ service_address_tips: '请输入服务地址',
+ field_mapping: '字段映射',
+ internal_field: '系统字段',
+ external_field: '注册系统字段',
+ create_time: '创建时间',
+ update_time: '更新时间',
+
+ auth_config: '认证配置',
+ auth_type: '认证类型',
+ auth_type_tips: '请选择认证类型',
+ basic_auth: '基础认证',
+ oauth2: 'OAuth2认证',
+ jwt: 'JWT认证',
+ username: '用户名',
+ username_tips: '请输入用户名',
+ password: '密码',
+ password_tips: '请输入密码',
+ jwt_token: 'JWT串',
+ jwt_token_tips: '请输入JWT串',
+ oauth2_token_url: 'Token URL',
+ oauth2_token_url_tips: '请输入Token URL',
+ oauth2_client_id: 'Client ID',
+ oauth2_client_id_tips: '请输入Client ID',
+ oauth2_client_secret: 'Client Secret',
+ oauth2_client_secret_tips: '请输入Client Secret',
+ oauth2_grant_type: 'Grant Type',
+ oauth2_grant_type_tips: '请输入Grant Type',
+ oauth2_username: 'OAuth2用户名',
+ oauth2_username_tips: '请输入OAuth2用户名',
+ oauth2_password: 'OAuth2密码',
+ oauth2_password_tips: '请输入OAuth2密码',
+ additional_params: '补充参数',
+ add_param: '添加参数',
+ add_extract_field: '添加提取参数',
+ key: '键名',
+ value: '键值',
+
+ interface_config: '接口配置',
+ input_interface: '查询任务列表接口',
+ input_interface_tips: '请输入接口地址',
+ submit_interface: '启动任务接口',
+ submit_interface_tips: '请输入接口地址',
+ query_interface: '查询任务状态接口',
+ query_interface_tips: '请输入接口地址',
+ stop_interface: '停止任务接口',
+ stop_interface_tips: '请输入接口地址',
+ parameters: '参数',
+ param_location: '参数位置',
+ param_location_tips: '请选择参数位置',
+ param_name_tips: '请输入参数名',
+ param_value_tips: '请输入参数值',
+ extract_response_data: '提取响应并存储变量',
+ extract_field: '请输入提取参数',
+ json_path_list: '请输入,例如:$.data[*].id',
+ json_path: '请输入,例如$.data.taskInstanceId',
Review Comment:
Inconsistent translation keys: 'json_path_list' at line 81 is used for the
select interface but 'json_path' at line 82 is used for other interfaces. The
placeholder text differs ('$.data[*].id' vs '$.data.taskInstanceId') which may
confuse users. Consider using consistent naming and examples across all
interfaces.
##########
dolphinscheduler-ui/src/views/datasource/list/detail.tsx:
##########
@@ -780,6 +799,756 @@ const DetailModal = defineComponent({
placeholder={t('datasource.namespace_tips')}
/>
</NFormItem>
+ {/* THIRDPARTY_SYSTEM_CONNECTOR */}
+ {detailForm.type === 'THIRDPARTY_SYSTEM_CONNECTOR' && (
+ <>
+ <NFormItem
+ label={t('thirdparty_api_source.service_address')}
+ path='serviceAddress'
+ required
+ >
+ <NInput
+ allowInput={this.trim}
+ v-model={[detailForm.serviceAddress, 'value']}
+ placeholder={'http://'}
+ />
+ </NFormItem>
+ <NFormItem
+ label={t('thirdparty_api_source.interface_timeout')}
+ path='interfaceTimeout'
+ >
+ <NInputNumber
+ v-model={[detailForm.interfaceTimeout, 'value']}
+
placeholder={t('thirdparty_api_source.interface_timeout_tips')}
+ min={1000}
+ max={1200000}
+ step={1000}
+ >
+ {{
+ suffix: () => t('thirdparty_api_source.millisecond')
+ }}
+ </NInputNumber>
+ </NFormItem>
+ <NFormItem label={t('thirdparty_api_source.auth_type')}
path='authConfig.authType'>
+ <NSelect
+ v-model={[detailForm.authConfig.authType, 'value']}
+ options={[
+ { label: t('thirdparty_api_source.basic_auth'),
value: 'BASIC_AUTH' },
+ { label: t('thirdparty_api_source.oauth2'), value:
'OAUTH2' },
+ { label: t('thirdparty_api_source.jwt'), value:
'JWT' }
+ ]}
+ />
+ </NFormItem>
+ <NFormItem
label={t('thirdparty_api_source.header_prefix')}>
+ <NInput
+ allowInput={this.trim}
+ v-model={[detailForm.authConfig.headerPrefix, 'value']}
+
placeholder={t('thirdparty_api_source.header_prefix_tips')}
+ />
+ </NFormItem>
+ {detailForm.authConfig.authType === 'BASIC_AUTH' && (
+ <>
+ <NFormItem
+ label={t('thirdparty_api_source.username')}
+ path='authConfig.basicUsername'
+ >
+ <NInput
+ allowInput={this.trim}
+ v-model={[detailForm.authConfig.basicUsername,
'value']}
+
placeholder={t('thirdparty_api_source.username_tips')}
+ />
+ </NFormItem>
+ <NFormItem
+ label={t('thirdparty_api_source.password')}
+ path='authConfig.basicPassword'
+ >
+ <NInput
+ allowInput={this.trim}
+ v-model={[detailForm.authConfig.basicPassword,
'value']}
+ type='password'
+ showPasswordOn='click'
+
placeholder={t('thirdparty_api_source.password_tips')}
+ />
+ </NFormItem>
+ </>
+ )}
+ {detailForm.authConfig.authType === 'OAUTH2' && (
+ <>
+ <NFormItem
+ label={t('thirdparty_api_source.oauth2_token_url')}
+ path='authConfig.oauth2TokenUrl'
+ >
+ <NInput
+ allowInput={this.trim}
+ v-model={[detailForm.authConfig.oauth2TokenUrl,
'value']}
+
placeholder={t('thirdparty_api_source.oauth2_token_url_tips')}
+ />
+ </NFormItem>
+ <NFormItem
+ label={t('thirdparty_api_source.oauth2_client_id')}
+ path='authConfig.oauth2ClientId'
+ >
+ <NInput
+ allowInput={this.trim}
+ v-model={[detailForm.authConfig.oauth2ClientId,
'value']}
+
placeholder={t('thirdparty_api_source.oauth2_client_id_tips')}
+ />
+ </NFormItem>
+ <NFormItem
+
label={t('thirdparty_api_source.oauth2_client_secret')}
+ path='authConfig.oauth2ClientSecret'
+ >
+ <NInput
+ allowInput={this.trim}
+
v-model={[detailForm.authConfig.oauth2ClientSecret, 'value']}
+
placeholder={t('thirdparty_api_source.oauth2_client_secret_tips')}
+ />
+ </NFormItem>
+ <NFormItem
+ label={t('thirdparty_api_source.oauth2_grant_type')}
+ path='authConfig.oauth2GrantType'
+ >
+ <NInput
+ allowInput={this.trim}
+ v-model={[detailForm.authConfig.oauth2GrantType,
'value']}
+
placeholder={t('thirdparty_api_source.oauth2_grant_type_tips')}
+ />
+ </NFormItem>
+ <NFormItem
+ label={t('thirdparty_api_source.oauth2_username')}
+ path='authConfig.oauth2Username'
+ >
+ <NInput
+ allowInput={this.trim}
+ v-model={[detailForm.authConfig.oauth2Username,
'value']}
+
placeholder={t('thirdparty_api_source.oauth2_username_tips')}
+ />
+ </NFormItem>
+ <NFormItem
+ label={t('thirdparty_api_source.oauth2_password')}
+ path='authConfig.oauth2Password'
+ >
+ <NInput
+ allowInput={this.trim}
+ v-model={[detailForm.authConfig.oauth2Password,
'value']}
+ type='password'
+ showPasswordOn='click'
+
placeholder={t('thirdparty_api_source.oauth2_password_tips')}
+ />
+ </NFormItem>
+ </>
+ )}
+ {detailForm.authConfig.authType === 'JWT' && (
+ <NFormItem
+ label={t('thirdparty_api_source.jwt_token')}
+ path='authConfig.jwtToken'
+ >
+ <NInput
+ allowInput={this.trim}
+ v-model={[detailForm.authConfig.jwtToken, 'value']}
+
placeholder={t('thirdparty_api_source.jwt_token_tips')}
+ />
+ </NFormItem>
+ )}
+ {/* additional params */}
+ <NFormItem
label={t('thirdparty_api_source.additional_params')}>
+ <div style={{ width: '100%' }}>
+ {/* add button */}
+ <NButton
+ onClick={() => {
+ if (!detailForm.authConfig.authMappings) {
+ detailForm.authConfig.authMappings = []
+ }
+ detailForm.authConfig.authMappings.push({ key: '',
value: '' })
+ }}
+ style={{ marginBottom: '10px' }}
+ >
+ {t('thirdparty_api_source.add_param')}
+ </NButton>
+
+ {/* param list */}
+ {detailForm.authConfig.authMappings &&
detailForm.authConfig.authMappings.map((param: { key: string; value: string },
index: number) => (
+ <div
+ key={index}
+ style={{ display: 'flex', alignItems: 'center',
width: '100%', marginBottom: '10px' }}
+ >
+ <NInput
+ v-model={[param.key, 'value']}
+ placeholder={t('thirdparty_api_source.key')}
+ style={{ width: '40%' }}
+ />
+ <NInput
+ v-model={[param.value, 'value']}
+ placeholder={t('thirdparty_api_source.value')}
+ style={{ width: '40%', marginLeft: '10px' }}
+ />
+ <NButton
+ onClick={() => {
+
detailForm.authConfig.authMappings.splice(index, 1)
+ }}
+ style={{ width: '20%', marginLeft: '10px' }}
+ size="small"
+ >
+ {t('thirdparty_api_source.delete')}
+ </NButton>
+ </div>
+ ))}
+ </div>
+ </NFormItem>
+ <NFormItem
+ label={t('thirdparty_api_source.input_interface')}
+ path='selectInterface.url'
+ >
+ <div style={{ display: 'flex', alignItems: 'center',
width: '100%' }}>
+ <NInput
+ allowInput={this.trim}
+ v-model={[detailForm.selectInterface.url, 'value']}
+
placeholder={t('thirdparty_api_source.input_interface_tips')}
+ style={{ flex: 1 }}
+ />
+ <NSelect
+ v-model={[detailForm.selectInterface.method,
'value']}
+ options={[
+ { label: t('thirdparty_api_source.get'), value:
'GET' },
+ { label: t('thirdparty_api_source.post'), value:
'POST' },
+ { label: t('thirdparty_api_source.put'), value:
'PUT' }
+ ]}
+ style={{ width: '120px', marginLeft: '10px' }}
+ />
+ </div>
+ </NFormItem>
+ <NFormItem label={t('thirdparty_api_source.parameters')}>
+ <div style={{ width: '100%' }}>
+ {/* add Button*/}
+ <NButton
+ onClick={() => {
+ if (!detailForm.selectInterface.parameters) {
+ detailForm.selectInterface.parameters = []
+ }
+ detailForm.selectInterface.parameters.push({
paramName: '', paramValue: '', location: 'HEADER' })
+ }}
+ style={{ marginBottom: '10px' }}
+ >
+ {t('thirdparty_api_source.add_param')}
+ </NButton>
+
+ {/* parameter list */}
+ {detailForm.selectInterface.parameters &&
detailForm.selectInterface.parameters.map((param: { paramName: string;
paramValue: string; location: string }, index: number) => (
+ <div
+ key={index}
+ style={{ display: 'flex', alignItems: 'center',
width: '100%', marginBottom: '10px' }}
+ >
+ <NSelect
+ v-model={[param.location, 'value']}
+ options={[
+ { label: 'Header', value: 'HEADER' },
+ { label: 'Param', value: 'PARAM' }
+ ]}
+
placeholder={t('thirdparty_api_source.param_location_tips')}
+ style={{ width: '120px' }}
+ />
+ <NInput
+ v-model={[param.paramName, 'value']}
+
placeholder={t('thirdparty_api_source.param_name_tips')}
+ style={{ flex: 1, marginLeft: '10px' }}
+ />
+ <NInput
+ v-model={[param.paramValue, 'value']}
+
placeholder={t('thirdparty_api_source.param_value_tips')}
+ style={{ flex: 1, marginLeft: '10px' }}
+ />
+ <NButton
+ onClick={() => {
+
detailForm.selectInterface.parameters.splice(index, 1)
+ }}
+ style={{ marginLeft: '10px' }}
+ >
+ {t('thirdparty_api_source.delete')}
+ </NButton>
+ </div>
+ ))}
+ </div>
+ </NFormItem>
+ {(detailForm.selectInterface.method === 'POST' ||
detailForm.selectInterface.method === 'PUT') && (
+ <NFormItem
label={t('thirdparty_api_source.request_body')}>
+ <NInput
+ v-model={[detailForm.selectInterface.body, 'value']}
+ type="textarea"
+ autosize={{
+ minRows: 4,
+ maxRows: 10
+ }}
+ placeholder="请输入JSON格式的请求体"
+ />
Review Comment:
Hard-coded Chinese text "请输入JSON格式的请求体" (Please enter JSON format request
body) should use the i18n translation system like other placeholder text in
this file. This breaks internationalization for non-Chinese users.
##########
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-thirdpartysystemconnector/src/main/java/org/apache/dolphinscheduler/plugin/datasource/thirdpartysystemconnector/param/PollingInterfaceInfo.java:
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param;
+
+import lombok.Data;
+
+@Data
Review Comment:
This method overrides [InterfaceInfo.canEqual](1); it is advisable to add an
Override annotation.
##########
dolphinscheduler-task-plugin/dolphinscheduler-task-external-system/src/main/java/org/apache/dolphinscheduler/plugin/task/externalSystem/ExternalSystemTask.java:
##########
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.externalSystem;
+
+import org.apache.dolphinscheduler.common.model.OkHttpRequestHeaderContentType;
+import org.apache.dolphinscheduler.common.model.OkHttpRequestHeaders;
+import org.apache.dolphinscheduler.common.model.OkHttpResponse;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.OkHttpUtils;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.AuthenticationUtils;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.InterfaceInfo;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.PollingFailureConfig;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.PollingInterfaceInfo;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.PollingSuccessConfig;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.RequestParameter;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.ResponseParameter;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.ThirdPartySystemConnectorConnectionParam;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.FormBody;
+
+import com.jayway.jsonpath.JsonPath;
+
+@Slf4j
+public class ExternalSystemTask extends AbstractTask {
+
+ private Boolean traceEnabled = true;
+
+ private ExternalSystemParameters externalSystemParameters;
+ private ThirdPartySystemConnectorConnectionParam baseExternalSystemParams;
+ private TaskExecutionContext taskExecutionContext;
+ private String accessToken;
+ private Map<String, String> parameterMap = new HashMap<>();
+ private Set<String> successStatusCache = new HashSet<>();
+ private Set<String> failureStatusCache = new HashSet<>();
+
+ private boolean isTimeout = false;
+ private long taskStartTime;
+
+ public ExternalSystemTask(TaskExecutionContext taskExecutionContext) {
+ super(taskExecutionContext);
+ this.taskExecutionContext = taskExecutionContext;
+ this.externalSystemParameters =
+ JSONUtils.parseObject(taskExecutionContext.getTaskParams(),
ExternalSystemParameters.class);
+ baseExternalSystemParams =
+
externalSystemParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
+ try {
+ accessToken =
baseExternalSystemParams.getAuthConfig().getHeaderPrefix() + " "
+ +
AuthenticationUtils.authenticateAndGetToken(baseExternalSystemParams);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void init() {
+ externalSystemParameters = JSONUtils.parseObject(
+ taskExecutionContext.getTaskParams(),
+ ExternalSystemParameters.class);
+ log.info("Initialize external system task with externalSystemId: {},
externalTaskId: {}, externalTaskName: {}",
+ externalSystemParameters.getDatasource(),
+ externalSystemParameters.getExternalTaskId(),
+ externalSystemParameters.getExternalTaskName());
+
+ if (externalSystemParameters == null ||
!externalSystemParameters.checkParameters()) {
+ throw new RuntimeException("external system task params is not
valid");
+ }
+
+ // Initialize parameter mapping
+ initParameterMap();
+ initStatusCache();
+ }
+
+ @Override
+ public void handle(TaskCallBack taskCallBack) throws TaskException {
+ try {
+ taskStartTime = System.currentTimeMillis();
+ submitExternalTask();
+ TimeUnit.SECONDS.sleep(10);
+ trackExternalTaskStatus();
+ } catch (Exception e) {
+ log.error("external system task error", e);
+ setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
+ throw new TaskException("Execute external system task failed", e);
+ }
+ }
+
+ @Override
+ public void cancel() throws TaskException {
+ try {
+ log.info("cancel external system task");
+ cancelTaskInstance();
+ } catch (Exception e) {
+ throw new TaskException("cancel external system task error", e);
+ } finally {
+ // Only set to failure status when timeout failure strategy is
enabled and it actually timed out
+ log.info("External task timeout check isTimeoutFailureEnabled:{}",
isTimeoutFailureEnabled());
+ if (isTimeoutFailureEnabled()) {
+ long currentTime = System.currentTimeMillis();
+ long usedTimeMillis = currentTime - taskStartTime;
+ long usedTime = (usedTimeMillis + 29999) / 60000; // Over 30
seconds takes 1 minute, less than 30
+ // seconds
takes 0 minutes
+ log.info(
+ "External task timeout check, used time: {}m, timeout:
{}m, currentTime: {}, taskStartTime: {}",
+ usedTime, taskExecutionContext.getTaskTimeout() / 60,
currentTime, taskStartTime);
+ if (usedTime >= taskExecutionContext.getTaskTimeout() / 60) {
+ isTimeout = true;
+ log.warn("External task timeout, used time: {}m, timeout:
{}m",
+ usedTime, taskExecutionContext.getTaskTimeout() /
60);
+ }
+ }
+ if (isTimeout) {
+ setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
+ log.info("External task cancelled due to timeout, set status
to FAILED");
+ } else {
+ setExitStatusCode(TaskConstants.EXIT_CODE_KILL);
+ log.info("External task cancelled manually or timeout not
enabled, set status to KILLED");
+ }
+ }
+ }
+
+ private void submitExternalTask() throws TaskException {
+ try {
+ InterfaceInfo submitConfig =
baseExternalSystemParams.getSubmitInterface();
+ String url =
replaceParameterPlaceholders(baseExternalSystemParams.getCompleteUrl(submitConfig.getUrl()));
+ Map<String, String> headers = new HashMap<>();
+ buildAuthHeader(accessToken, headers);
+ buildHeaders(submitConfig, headers);
+ Map<String, Object> requestBody = buildRequestBody(submitConfig);
+ Map<String, Object> requestParams =
buildRequestParams(submitConfig);
+
+ int interfaceTimeout =
baseExternalSystemParams.getInterfaceTimeout();
+ log.info("Using interface timeout value: {} milliseconds",
interfaceTimeout);
+ OkHttpResponse response =
+ executeRequestWithoutRetry(submitConfig.getMethod(), url,
headers, requestParams, requestBody,
+ interfaceTimeout, interfaceTimeout,
interfaceTimeout);
+ log.info("Submit task response:{}", response);
+ if (response.getStatusCode() != 200) {
+ throw new TaskException("Submit task failed: " +
response.getBody());
+ }
+
+ parseSubmitResponse(submitConfig.getResponseParameters(),
response.getBody());
+ } catch (Exception e) {
+ log.error("Submit task failed:{}", e);
+ throw new TaskException("Submit task failed", e);
+ }
+ }
+
+ private void trackExternalTaskStatus() throws TaskException {
+ try {
+ String status;
+ do {
+ // Only check timeout when timeout failure strategy is enabled
+ if (isTimeoutFailureEnabled()) {
+ long currentTime = System.currentTimeMillis();
+ long usedTimeMillis = currentTime - taskStartTime;
+ long usedTime = (usedTimeMillis + 29999) / 60000; // Over
30 seconds takes 1 minute, less than 30
+ //
seconds takes 0 minutes
+ if (usedTime >= taskExecutionContext.getTaskTimeout()) {
+ isTimeout = true;
+ log.error("External task timeout, used time: {}m,
timeout: {}m",
+ usedTime,
taskExecutionContext.getTaskTimeout() / 60);
+ setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
+ cancelTaskInstance();
+ return;
+ }
+ }
+
+ status = pollTaskStatus();
+
+ if (successStatusCache.contains(status)) {
+ setExitStatusCode(TaskConstants.EXIT_CODE_SUCCESS);
+ log.info("External task completed successfully with
status: {}", status);
+ return;
+ } else if (failureStatusCache.contains(status)) {
+ setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
+ log.error("External task failed with status: {}", status);
+ return;
+ }
+
+ TimeUnit.SECONDS.sleep(10);
+ } while (traceEnabled);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new TaskException("Task status tracking interrupted", e);
+ } catch (Exception e) {
+ log.error("Track task status failed", e);
+ setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
+ throw new TaskException("Track task status failed", e);
+ }
+ }
+
+ private String pollTaskStatus() throws TaskException {
+ try {
+ PollingInterfaceInfo pollConfig =
+ baseExternalSystemParams.getPollStatusInterface();
+ String url =
replaceParameterPlaceholders(baseExternalSystemParams.getCompleteUrl(pollConfig.getUrl()));
+ Map<String, String> headers = new HashMap<>();
+ buildAuthHeader(accessToken, headers);
+ buildHeaders(pollConfig, headers);
+ Map<String, Object> requestBody = buildRequestBody(pollConfig);
+ Map<String, Object> requestParams = buildRequestParams(pollConfig);
+
+ int interfaceTimeout =
baseExternalSystemParams.getInterfaceTimeout();
+ OkHttpResponse response =
+ executeRequestWithRetry(pollConfig.getMethod(), url,
headers, requestParams, requestBody,
+ interfaceTimeout, interfaceTimeout,
interfaceTimeout, 3);
+ log.info("poll task status response:{}", response);
+
+ if (response.getStatusCode() != 200) {
+ throw new TaskException("polling task failed: " +
response.getBody());
+ }
+
+ String successField =
pollConfig.getPollingSuccessConfig().getSuccessField();
+ String failureField =
pollConfig.getPollingFailureConfig().getFailureField();
+
+ Object successStatusObj = JsonPath.read(response.getBody(),
successField);
+ Object failureStatusObj = JsonPath.read(response.getBody(),
failureField);
+
+ log.info("PollTaskStatus successfully, external task instance
success status: {}, failure status: {}",
+ successStatusObj.toString(), failureStatusObj.toString());
+
+ if (successStatusObj != null) {
+ return successStatusObj.toString();
+ } else if (failureStatusObj != null) {
+ return failureStatusObj.toString();
+ } else {
+ log.warn("successStatusObj and failureStatusObj are null");
+ return "UNKNOWN";
+ }
+ } catch (Exception e) {
+ log.error("Poll task status failed", e);
+ throw new TaskException("Poll task status failed", e);
+ }
+ }
+
+ private void cancelTaskInstance() throws TaskException {
+ try {
+ traceEnabled = false;
+ InterfaceInfo stopConfig =
baseExternalSystemParams.getStopInterface();
+ log.info("start cancel External System TaskInstance");
+ String url =
replaceParameterPlaceholders(baseExternalSystemParams.getCompleteUrl(stopConfig.getUrl()));
+ Map<String, String> headers = new HashMap<>();
+ buildAuthHeader(accessToken, headers);
+ buildHeaders(stopConfig, headers);
+ Map<String, Object> requestBody = buildRequestBody(stopConfig);
+ Map<String, Object> requestParams = buildRequestParams(stopConfig);
+
+ int interfaceTimeout =
baseExternalSystemParams.getInterfaceTimeout();
+ OkHttpResponse response =
+ executeRequestWithRetry(stopConfig.getMethod(), url,
headers, requestParams, requestBody,
+ interfaceTimeout, interfaceTimeout,
interfaceTimeout, 3);
+ log.info("cancel task response:{}", response);
+
+ if (response.getStatusCode() != 200) {
+ throw new TaskException("Cancel task failed: " +
response.getBody());
+ }
+ log.info("Cancel task result: {}", response.getBody());
+ } catch (Exception e) {
+ log.error("Cancel task failed", e);
+ throw new TaskException("Cancel task failed", e);
+ }
+ }
+
+ private OkHttpResponse executeRequestWithoutRetry(InterfaceInfo.HttpMethod
method, String url,
+ Map<String, String>
headers, Map<String, Object> requestParams,
+ Map<String, Object>
requestBody, int connectTimeout,
+ int readTimeout,
+ int writeTimeout) throws
TaskException {
+ return executeRequestWithRetry(method, url, headers, requestParams,
requestBody, connectTimeout, readTimeout,
+ writeTimeout, 0);
+ }
+
+ private OkHttpResponse executeRequestWithRetry(InterfaceInfo.HttpMethod
method, String url,
+ Map<String, String>
headers, Map<String, Object> requestParams,
+ Map<String, Object>
requestBody, int connectTimeout, int readTimeout,
+ int writeTimeout, int
maxRetries) throws TaskException {
+ int retryCount = 0;
+ while (retryCount <= maxRetries) {
+ OkHttpRequestHeaders okHttpRequestHeaders = new
OkHttpRequestHeaders();
+ okHttpRequestHeaders.setHeaders(headers);
+ OkHttpRequestHeaderContentType contentType =
getContentType(headers);
+
okHttpRequestHeaders.setOkHttpRequestHeaderContentType(getContentType(headers));
+ try {
+ switch (method) {
+ case POST:
+ if
(contentType.equals(OkHttpRequestHeaderContentType.APPLICATION_JSON)) {
+ return OkHttpUtils.post(url, okHttpRequestHeaders,
requestParams, requestBody,
+ connectTimeout,
+ readTimeout, writeTimeout);
+ }
+ if
(contentType.equals(OkHttpRequestHeaderContentType.APPLICATION_FORM_URLENCODED))
{
+ FormBody.Builder formBodyBuilder = new
FormBody.Builder();
+ if (requestBody != null) {
+ for (Map.Entry<String, Object> entry :
requestBody.entrySet()) {
+ formBodyBuilder.add(entry.getKey(),
entry.getValue().toString());
+ }
+ }
+ return OkHttpUtils.postFormBody(url,
okHttpRequestHeaders, requestParams,
+ formBodyBuilder.build(), connectTimeout,
+ readTimeout, writeTimeout);
+ }
+ case PUT:
+ return OkHttpUtils.put(url, okHttpRequestHeaders,
requestBody, connectTimeout, readTimeout,
+ writeTimeout);
+ case GET:
+ return OkHttpUtils.get(url, okHttpRequestHeaders,
requestParams, connectTimeout, readTimeout,
+ writeTimeout);
+ default:
+ throw new TaskException("Unsupported HTTP method: " +
method);
+ }
+ } catch (Exception e) {
+ retryCount++;
+ if (maxRetries > 0) {
+ log.warn("Request failed, retrying... (attempt {}/{})",
retryCount, maxRetries, e);
+ if (retryCount > maxRetries) {
+ throw new TaskException("Request failed after " +
maxRetries + " retries", e);
+ }
+ try {
+ TimeUnit.SECONDS.sleep(5);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new TaskException("Request retry interrupted",
ie);
+ }
+ } else {
+ throw new TaskException("Request failed without retry", e);
+ }
+ }
+ }
+ return null; // This line should never be reached
+ }
+
+ private void buildAuthHeader(String accessToken, Map<String, String>
headers) {
+ headers.put("Authorization", accessToken);
+ }
+
+ private Map<String, String> buildHeaders(InterfaceInfo config,
+ Map<String, String>
requestParams) {
+ for (RequestParameter param : config.getParameters()) {
+ if
(param.getLocation().equals(RequestParameter.ParamLocation.HEADER)) {
+ requestParams.put(param.getParamName(),
replaceParameterPlaceholders(param.getParamValue()));
+ }
+ }
+ return requestParams;
+ }
+
+ private Map<String, Object> buildRequestBody(InterfaceInfo config) {
+ Map<String, Object> requestBody = new HashMap<>();
+ if (config.getBody() != null) {
+ requestBody =
JSONUtils.parseObject(replaceParameterPlaceholders(config.getBody()),
Map.class);
+ }
+ return requestBody;
+ }
+
+ private Map<String, Object> buildRequestParams(InterfaceInfo config) {
+ Map<String, Object> requestParams = new HashMap<>();
+ for (RequestParameter param : config.getParameters()) {
+ if
(param.getLocation().equals(RequestParameter.ParamLocation.PARAM)) {
+ requestParams.put(param.getParamName(),
replaceParameterPlaceholders(param.getParamValue()));
+ }
+ }
+ return requestParams;
+ }
+
+ private String replaceParameterPlaceholders(String template) {
+ if (StringUtils.isEmpty(template)) {
+ return template;
+ }
+
+ StringBuilder result = new StringBuilder(template);
+ for (Map.Entry<String, String> entry : parameterMap.entrySet()) {
+ String placeholder = "${" + entry.getKey() + "}";
+ int index;
+ while ((index = result.indexOf(placeholder)) != -1) {
+ result.replace(index, index + placeholder.length(),
entry.getValue());
+ }
+ }
+ String resultString =
ParameterUtils.convertParameterPlaceholders(result.toString(), parameterMap);
+ log.info("after replaceParameterPlaceholders:{}", resultString);
+ return resultString;
+ }
+
+ private void parseSubmitResponse(List<ResponseParameter>
responseParameters,
+ String responseBody) throws TaskException
{
+ try {
+ for (ResponseParameter param : responseParameters) {
+ String jsonPath = param.getJsonPath();
+ String key = param.getKey();
+ Object value = JsonPath.read(responseBody, jsonPath);
+
+ if (value == null) {
+ log.warn("Response parameter {} not found in response
body", key);
+ continue;
+ }
+
+ parameterMap.put(key, value.toString().replace("\"", ""));
+ log.info("Parsed parameter {}: {}", key, value.toString());
+
+ }
+ } catch (Exception e) {
+ log.error("Parse submit response failed", e);
+ throw new TaskException("Parse submit response failed", e);
+ }
+ }
+
+ @Override
+ public AbstractParameters getParameters() {
+ return this.externalSystemParameters;
+ }
+
+ /**
+ * Initialize parameter mapping
+ */
+ private void initParameterMap() {
+ Map<String, Property> prepareParamsMap =
taskExecutionContext.getPrepareParamsMap();
+ if (prepareParamsMap != null) {
+ for (Map.Entry<String, Property> entry :
prepareParamsMap.entrySet()) {
+ parameterMap.put(entry.getKey(), entry.getValue().getValue());
+ }
+ }
+ if (externalSystemParameters.getExternalTaskId() != null) {
+ parameterMap.put(ExternalTaskConstants.EXTERNAL_TASK_ID,
externalSystemParameters.getExternalTaskId());
+ parameterMap.put(ExternalTaskConstants.EXTERNAL_TASK_NAME,
externalSystemParameters.getExternalTaskName());
+ }
+ }
+
+ private void initStatusCache() {
+ PollingSuccessConfig successConfig =
+
baseExternalSystemParams.getPollStatusInterface().getPollingSuccessConfig();
+
+ if (successConfig != null && successConfig.getSuccessValue() != null) {
+ try {
+ String successValueString = successConfig.getSuccessValue();
+ String[] successValues = successValueString.split(",");
+ for (String successValue : successValues) {
+ successStatusCache.add(successValue);
+ }
+ log.info("trackExternalTaskStatus successValues is :{}",
successStatusCache);
+
+ } catch (NullPointerException e) {
+ log.error("Error: successValue is null");
+ }
+ }
+ PollingFailureConfig failureConfig =
+
baseExternalSystemParams.getPollStatusInterface().getPollingFailureConfig();
+ if (failureConfig != null && failureConfig.getFailureField() != null) {
+ try {
+ String failureValueString = failureConfig.getFailureValue();
+ String[] failureValues = failureValueString.split(",");
+ for (String failureValue : failureValues) {
+ failureStatusCache.add(failureValue);
+ }
+ log.info("trackExternalTaskStatus failureValues is :{}",
failureStatusCache);
+ } catch (NullPointerException e) {
+ log.error("Error: failureStatus is null");
+ }
+ }
+ }
+
+ /**
+ * Check if timeout failure strategy is enabled
+ */
+ private boolean isTimeoutFailureEnabled() {
+ return taskExecutionContext.getTaskTimeoutStrategy() != null
+ && taskExecutionContext.getTaskTimeout() > 0
+ && taskExecutionContext.getTaskTimeout() < Integer.MAX_VALUE
+ && (taskExecutionContext.getTaskTimeoutStrategy() ==
TaskTimeoutStrategy.FAILED
+ || taskExecutionContext.getTaskTimeoutStrategy() ==
TaskTimeoutStrategy.WARNFAILED);
+ }
+
+ private OkHttpRequestHeaderContentType getContentType(Map<String, String>
headers) {
+ if (headers == null ||
!headers.containsKey(ExternalTaskConstants.CONTENT_TYPE)
+ ||
!headers.containsKey(ExternalTaskConstants.CONTENT_TYPE_LOWERCASE)) {
+ return OkHttpRequestHeaderContentType.APPLICATION_JSON;
Review Comment:
The condition uses OR (||) instead of AND (&&) when checking for missing
Content-Type headers. This means the function returns APPLICATION_JSON even
when one of the headers exists. The logic should be: if headers is null OR
(does not contain CONTENT_TYPE AND does not contain CONTENT_TYPE_LOWERCASE).
##########
dolphinscheduler-task-plugin/dolphinscheduler-task-external-system/src/main/java/org/apache/dolphinscheduler/plugin/task/externalSystem/ExternalSystemTask.java:
##########
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.externalSystem;
+
+import org.apache.dolphinscheduler.common.model.OkHttpRequestHeaderContentType;
+import org.apache.dolphinscheduler.common.model.OkHttpRequestHeaders;
+import org.apache.dolphinscheduler.common.model.OkHttpResponse;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.OkHttpUtils;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.AuthenticationUtils;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.InterfaceInfo;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.PollingFailureConfig;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.PollingInterfaceInfo;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.PollingSuccessConfig;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.RequestParameter;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.ResponseParameter;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.ThirdPartySystemConnectorConnectionParam;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.FormBody;
+
+import com.jayway.jsonpath.JsonPath;
+
+@Slf4j
+public class ExternalSystemTask extends AbstractTask {
+
+ private Boolean traceEnabled = true;
+
+ private ExternalSystemParameters externalSystemParameters;
+ private ThirdPartySystemConnectorConnectionParam baseExternalSystemParams;
+ private TaskExecutionContext taskExecutionContext;
+ private String accessToken;
+ private Map<String, String> parameterMap = new HashMap<>();
+ private Set<String> successStatusCache = new HashSet<>();
+ private Set<String> failureStatusCache = new HashSet<>();
+
+ private boolean isTimeout = false;
+ private long taskStartTime;
+
+ public ExternalSystemTask(TaskExecutionContext taskExecutionContext) {
+ super(taskExecutionContext);
+ this.taskExecutionContext = taskExecutionContext;
+ this.externalSystemParameters =
+ JSONUtils.parseObject(taskExecutionContext.getTaskParams(),
ExternalSystemParameters.class);
+ baseExternalSystemParams =
+
externalSystemParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
+ try {
+ accessToken =
baseExternalSystemParams.getAuthConfig().getHeaderPrefix() + " "
+ +
AuthenticationUtils.authenticateAndGetToken(baseExternalSystemParams);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void init() {
+ externalSystemParameters = JSONUtils.parseObject(
+ taskExecutionContext.getTaskParams(),
+ ExternalSystemParameters.class);
+ log.info("Initialize external system task with externalSystemId: {},
externalTaskId: {}, externalTaskName: {}",
+ externalSystemParameters.getDatasource(),
+ externalSystemParameters.getExternalTaskId(),
+ externalSystemParameters.getExternalTaskName());
+
+ if (externalSystemParameters == null ||
!externalSystemParameters.checkParameters()) {
+ throw new RuntimeException("external system task params is not
valid");
+ }
+
+ // Initialize parameter mapping
+ initParameterMap();
+ initStatusCache();
+ }
+
+ @Override
+ public void handle(TaskCallBack taskCallBack) throws TaskException {
+ try {
+ taskStartTime = System.currentTimeMillis();
+ submitExternalTask();
+ TimeUnit.SECONDS.sleep(10);
+ trackExternalTaskStatus();
+ } catch (Exception e) {
+ log.error("external system task error", e);
+ setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
+ throw new TaskException("Execute external system task failed", e);
+ }
+ }
+
+ @Override
+ public void cancel() throws TaskException {
+ try {
+ log.info("cancel external system task");
+ cancelTaskInstance();
+ } catch (Exception e) {
+ throw new TaskException("cancel external system task error", e);
+ } finally {
+ // Only set to failure status when timeout failure strategy is
enabled and it actually timed out
+ log.info("External task timeout check isTimeoutFailureEnabled:{}",
isTimeoutFailureEnabled());
+ if (isTimeoutFailureEnabled()) {
+ long currentTime = System.currentTimeMillis();
+ long usedTimeMillis = currentTime - taskStartTime;
+ long usedTime = (usedTimeMillis + 29999) / 60000; // Over 30
seconds takes 1 minute, less than 30
+ // seconds
takes 0 minutes
+ log.info(
+ "External task timeout check, used time: {}m, timeout:
{}m, currentTime: {}, taskStartTime: {}",
+ usedTime, taskExecutionContext.getTaskTimeout() / 60,
currentTime, taskStartTime);
Review Comment:
The timeout calculation uses an unusual formula (usedTimeMillis + 29999) /
60000 which rounds up to minutes. The comment says "Over 30 seconds takes 1
minute, less than 30 seconds takes 0 minutes" but this formula actually rounds
times between 0-30 seconds to 0 minutes, and times between 30-90 seconds to 1
minute. This may not match the intended timeout behavior. Consider using a
standard rounding approach or clarifying the intended behavior.
##########
dolphinscheduler-ui/src/service/modules/thirdparty-api-source/index.ts:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { axios } from '@/service/service'
+import {
+ ThirdpartyApiSourceReq,
+ ThirdpartyApiSource,
+} from './types'
Review Comment:
Unused import ThirdpartyApiSource.
##########
dolphinscheduler-task-plugin/dolphinscheduler-task-external-system/src/main/java/org/apache/dolphinscheduler/plugin/task/externalSystem/ExternalSystemTask.java:
##########
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.externalSystem;
+
+import org.apache.dolphinscheduler.common.model.OkHttpRequestHeaderContentType;
+import org.apache.dolphinscheduler.common.model.OkHttpRequestHeaders;
+import org.apache.dolphinscheduler.common.model.OkHttpResponse;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.OkHttpUtils;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.AuthenticationUtils;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.InterfaceInfo;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.PollingFailureConfig;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.PollingInterfaceInfo;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.PollingSuccessConfig;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.RequestParameter;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.ResponseParameter;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.ThirdPartySystemConnectorConnectionParam;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.FormBody;
+
+import com.jayway.jsonpath.JsonPath;
+
+@Slf4j
+public class ExternalSystemTask extends AbstractTask {
+
+ private Boolean traceEnabled = true;
+
+ private ExternalSystemParameters externalSystemParameters;
+ private ThirdPartySystemConnectorConnectionParam baseExternalSystemParams;
+ private TaskExecutionContext taskExecutionContext;
+ private String accessToken;
+ private Map<String, String> parameterMap = new HashMap<>();
+ private Set<String> successStatusCache = new HashSet<>();
+ private Set<String> failureStatusCache = new HashSet<>();
+
+ private boolean isTimeout = false;
+ private long taskStartTime;
+
+ public ExternalSystemTask(TaskExecutionContext taskExecutionContext) {
+ super(taskExecutionContext);
+ this.taskExecutionContext = taskExecutionContext;
+ this.externalSystemParameters =
+ JSONUtils.parseObject(taskExecutionContext.getTaskParams(),
ExternalSystemParameters.class);
+ baseExternalSystemParams =
+
externalSystemParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
+ try {
+ accessToken =
baseExternalSystemParams.getAuthConfig().getHeaderPrefix() + " "
+ +
AuthenticationUtils.authenticateAndGetToken(baseExternalSystemParams);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void init() {
+ externalSystemParameters = JSONUtils.parseObject(
+ taskExecutionContext.getTaskParams(),
+ ExternalSystemParameters.class);
+ log.info("Initialize external system task with externalSystemId: {},
externalTaskId: {}, externalTaskName: {}",
+ externalSystemParameters.getDatasource(),
+ externalSystemParameters.getExternalTaskId(),
+ externalSystemParameters.getExternalTaskName());
+
+ if (externalSystemParameters == null ||
!externalSystemParameters.checkParameters()) {
+ throw new RuntimeException("external system task params is not
valid");
+ }
+
+ // Initialize parameter mapping
+ initParameterMap();
+ initStatusCache();
+ }
+
+ @Override
+ public void handle(TaskCallBack taskCallBack) throws TaskException {
+ try {
+ taskStartTime = System.currentTimeMillis();
+ submitExternalTask();
+ TimeUnit.SECONDS.sleep(10);
+ trackExternalTaskStatus();
+ } catch (Exception e) {
+ log.error("external system task error", e);
+ setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
+ throw new TaskException("Execute external system task failed", e);
+ }
+ }
+
+ @Override
+ public void cancel() throws TaskException {
+ try {
+ log.info("cancel external system task");
+ cancelTaskInstance();
+ } catch (Exception e) {
+ throw new TaskException("cancel external system task error", e);
+ } finally {
+ // Only set to failure status when timeout failure strategy is
enabled and it actually timed out
+ log.info("External task timeout check isTimeoutFailureEnabled:{}",
isTimeoutFailureEnabled());
+ if (isTimeoutFailureEnabled()) {
+ long currentTime = System.currentTimeMillis();
+ long usedTimeMillis = currentTime - taskStartTime;
+ long usedTime = (usedTimeMillis + 29999) / 60000; // Over 30
seconds takes 1 minute, less than 30
+ // seconds
takes 0 minutes
+ log.info(
+ "External task timeout check, used time: {}m, timeout:
{}m, currentTime: {}, taskStartTime: {}",
+ usedTime, taskExecutionContext.getTaskTimeout() / 60,
currentTime, taskStartTime);
+ if (usedTime >= taskExecutionContext.getTaskTimeout() / 60) {
+ isTimeout = true;
+ log.warn("External task timeout, used time: {}m, timeout:
{}m",
+ usedTime, taskExecutionContext.getTaskTimeout() /
60);
+ }
+ }
+ if (isTimeout) {
+ setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
+ log.info("External task cancelled due to timeout, set status
to FAILED");
+ } else {
+ setExitStatusCode(TaskConstants.EXIT_CODE_KILL);
+ log.info("External task cancelled manually or timeout not
enabled, set status to KILLED");
+ }
+ }
+ }
+
+ private void submitExternalTask() throws TaskException {
+ try {
+ InterfaceInfo submitConfig =
baseExternalSystemParams.getSubmitInterface();
+ String url =
replaceParameterPlaceholders(baseExternalSystemParams.getCompleteUrl(submitConfig.getUrl()));
+ Map<String, String> headers = new HashMap<>();
+ buildAuthHeader(accessToken, headers);
+ buildHeaders(submitConfig, headers);
+ Map<String, Object> requestBody = buildRequestBody(submitConfig);
+ Map<String, Object> requestParams =
buildRequestParams(submitConfig);
+
+ int interfaceTimeout =
baseExternalSystemParams.getInterfaceTimeout();
+ log.info("Using interface timeout value: {} milliseconds",
interfaceTimeout);
+ OkHttpResponse response =
+ executeRequestWithoutRetry(submitConfig.getMethod(), url,
headers, requestParams, requestBody,
+ interfaceTimeout, interfaceTimeout,
interfaceTimeout);
+ log.info("Submit task response:{}", response);
+ if (response.getStatusCode() != 200) {
+ throw new TaskException("Submit task failed: " +
response.getBody());
+ }
+
+ parseSubmitResponse(submitConfig.getResponseParameters(),
response.getBody());
+ } catch (Exception e) {
+ log.error("Submit task failed:{}", e);
+ throw new TaskException("Submit task failed", e);
+ }
+ }
+
+ private void trackExternalTaskStatus() throws TaskException {
+ try {
+ String status;
+ do {
+ // Only check timeout when timeout failure strategy is enabled
+ if (isTimeoutFailureEnabled()) {
+ long currentTime = System.currentTimeMillis();
+ long usedTimeMillis = currentTime - taskStartTime;
+ long usedTime = (usedTimeMillis + 29999) / 60000; // Over
30 seconds takes 1 minute, less than 30
+ //
seconds takes 0 minutes
+ if (usedTime >= taskExecutionContext.getTaskTimeout()) {
+ isTimeout = true;
+ log.error("External task timeout, used time: {}m,
timeout: {}m",
+ usedTime,
taskExecutionContext.getTaskTimeout() / 60);
+ setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
+ cancelTaskInstance();
+ return;
+ }
+ }
+
+ status = pollTaskStatus();
+
+ if (successStatusCache.contains(status)) {
+ setExitStatusCode(TaskConstants.EXIT_CODE_SUCCESS);
+ log.info("External task completed successfully with
status: {}", status);
+ return;
+ } else if (failureStatusCache.contains(status)) {
+ setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
+ log.error("External task failed with status: {}", status);
+ return;
+ }
+
+ TimeUnit.SECONDS.sleep(10);
+ } while (traceEnabled);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new TaskException("Task status tracking interrupted", e);
+ } catch (Exception e) {
+ log.error("Track task status failed", e);
+ setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
+ throw new TaskException("Track task status failed", e);
+ }
+ }
+
+ private String pollTaskStatus() throws TaskException {
+ try {
+ PollingInterfaceInfo pollConfig =
+ baseExternalSystemParams.getPollStatusInterface();
+ String url =
replaceParameterPlaceholders(baseExternalSystemParams.getCompleteUrl(pollConfig.getUrl()));
+ Map<String, String> headers = new HashMap<>();
+ buildAuthHeader(accessToken, headers);
+ buildHeaders(pollConfig, headers);
+ Map<String, Object> requestBody = buildRequestBody(pollConfig);
+ Map<String, Object> requestParams = buildRequestParams(pollConfig);
+
+ int interfaceTimeout =
baseExternalSystemParams.getInterfaceTimeout();
+ OkHttpResponse response =
+ executeRequestWithRetry(pollConfig.getMethod(), url,
headers, requestParams, requestBody,
+ interfaceTimeout, interfaceTimeout,
interfaceTimeout, 3);
+ log.info("poll task status response:{}", response);
+
+ if (response.getStatusCode() != 200) {
+ throw new TaskException("polling task failed: " +
response.getBody());
+ }
+
+ String successField =
pollConfig.getPollingSuccessConfig().getSuccessField();
+ String failureField =
pollConfig.getPollingFailureConfig().getFailureField();
+
+ Object successStatusObj = JsonPath.read(response.getBody(),
successField);
+ Object failureStatusObj = JsonPath.read(response.getBody(),
failureField);
+
+ log.info("PollTaskStatus successfully, external task instance
success status: {}, failure status: {}",
+ successStatusObj.toString(), failureStatusObj.toString());
Review Comment:
Variable [failureStatusObj](1) may be null at this access as suggested by
[this](2) null guard.
##########
dolphinscheduler-task-plugin/dolphinscheduler-task-external-system/src/main/java/org/apache/dolphinscheduler/plugin/task/externalSystem/ExternalSystemTask.java:
##########
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.externalSystem;
+
+import org.apache.dolphinscheduler.common.model.OkHttpRequestHeaderContentType;
+import org.apache.dolphinscheduler.common.model.OkHttpRequestHeaders;
+import org.apache.dolphinscheduler.common.model.OkHttpResponse;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.OkHttpUtils;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.AuthenticationUtils;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.InterfaceInfo;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.PollingFailureConfig;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.PollingInterfaceInfo;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.PollingSuccessConfig;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.RequestParameter;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.ResponseParameter;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.ThirdPartySystemConnectorConnectionParam;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.FormBody;
+
+import com.jayway.jsonpath.JsonPath;
+
+@Slf4j
+public class ExternalSystemTask extends AbstractTask {
+
+ private Boolean traceEnabled = true;
+
+ private ExternalSystemParameters externalSystemParameters;
+ private ThirdPartySystemConnectorConnectionParam baseExternalSystemParams;
+ private TaskExecutionContext taskExecutionContext;
+ private String accessToken;
+ private Map<String, String> parameterMap = new HashMap<>();
+ private Set<String> successStatusCache = new HashSet<>();
+ private Set<String> failureStatusCache = new HashSet<>();
+
+ private boolean isTimeout = false;
+ private long taskStartTime;
+
+ public ExternalSystemTask(TaskExecutionContext taskExecutionContext) {
+ super(taskExecutionContext);
+ this.taskExecutionContext = taskExecutionContext;
+ this.externalSystemParameters =
+ JSONUtils.parseObject(taskExecutionContext.getTaskParams(),
ExternalSystemParameters.class);
+ baseExternalSystemParams =
+
externalSystemParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
+ try {
+ accessToken =
baseExternalSystemParams.getAuthConfig().getHeaderPrefix() + " "
+ +
AuthenticationUtils.authenticateAndGetToken(baseExternalSystemParams);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void init() {
+ externalSystemParameters = JSONUtils.parseObject(
+ taskExecutionContext.getTaskParams(),
+ ExternalSystemParameters.class);
+ log.info("Initialize external system task with externalSystemId: {},
externalTaskId: {}, externalTaskName: {}",
+ externalSystemParameters.getDatasource(),
+ externalSystemParameters.getExternalTaskId(),
+ externalSystemParameters.getExternalTaskName());
+
+ if (externalSystemParameters == null ||
!externalSystemParameters.checkParameters()) {
+ throw new RuntimeException("external system task params is not
valid");
+ }
+
+ // Initialize parameter mapping
+ initParameterMap();
+ initStatusCache();
+ }
+
+ @Override
+ public void handle(TaskCallBack taskCallBack) throws TaskException {
+ try {
+ taskStartTime = System.currentTimeMillis();
+ submitExternalTask();
+ TimeUnit.SECONDS.sleep(10);
+ trackExternalTaskStatus();
+ } catch (Exception e) {
+ log.error("external system task error", e);
+ setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
+ throw new TaskException("Execute external system task failed", e);
+ }
+ }
+
+ @Override
+ public void cancel() throws TaskException {
+ try {
+ log.info("cancel external system task");
+ cancelTaskInstance();
+ } catch (Exception e) {
+ throw new TaskException("cancel external system task error", e);
+ } finally {
+ // Only set to failure status when timeout failure strategy is
enabled and it actually timed out
+ log.info("External task timeout check isTimeoutFailureEnabled:{}",
isTimeoutFailureEnabled());
+ if (isTimeoutFailureEnabled()) {
+ long currentTime = System.currentTimeMillis();
+ long usedTimeMillis = currentTime - taskStartTime;
+ long usedTime = (usedTimeMillis + 29999) / 60000; // Over 30
seconds takes 1 minute, less than 30
+ // seconds
takes 0 minutes
+ log.info(
+ "External task timeout check, used time: {}m, timeout:
{}m, currentTime: {}, taskStartTime: {}",
+ usedTime, taskExecutionContext.getTaskTimeout() / 60,
currentTime, taskStartTime);
+ if (usedTime >= taskExecutionContext.getTaskTimeout() / 60) {
+ isTimeout = true;
+ log.warn("External task timeout, used time: {}m, timeout:
{}m",
+ usedTime, taskExecutionContext.getTaskTimeout() /
60);
+ }
+ }
+ if (isTimeout) {
+ setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
+ log.info("External task cancelled due to timeout, set status
to FAILED");
+ } else {
+ setExitStatusCode(TaskConstants.EXIT_CODE_KILL);
+ log.info("External task cancelled manually or timeout not
enabled, set status to KILLED");
+ }
+ }
+ }
+
+ private void submitExternalTask() throws TaskException {
+ try {
+ InterfaceInfo submitConfig =
baseExternalSystemParams.getSubmitInterface();
+ String url =
replaceParameterPlaceholders(baseExternalSystemParams.getCompleteUrl(submitConfig.getUrl()));
+ Map<String, String> headers = new HashMap<>();
+ buildAuthHeader(accessToken, headers);
+ buildHeaders(submitConfig, headers);
+ Map<String, Object> requestBody = buildRequestBody(submitConfig);
+ Map<String, Object> requestParams =
buildRequestParams(submitConfig);
+
+ int interfaceTimeout =
baseExternalSystemParams.getInterfaceTimeout();
+ log.info("Using interface timeout value: {} milliseconds",
interfaceTimeout);
+ OkHttpResponse response =
+ executeRequestWithoutRetry(submitConfig.getMethod(), url,
headers, requestParams, requestBody,
+ interfaceTimeout, interfaceTimeout,
interfaceTimeout);
+ log.info("Submit task response:{}", response);
+ if (response.getStatusCode() != 200) {
+ throw new TaskException("Submit task failed: " +
response.getBody());
+ }
+
+ parseSubmitResponse(submitConfig.getResponseParameters(),
response.getBody());
+ } catch (Exception e) {
+ log.error("Submit task failed:{}", e);
+ throw new TaskException("Submit task failed", e);
+ }
+ }
+
+ private void trackExternalTaskStatus() throws TaskException {
+ try {
+ String status;
+ do {
+ // Only check timeout when timeout failure strategy is enabled
+ if (isTimeoutFailureEnabled()) {
+ long currentTime = System.currentTimeMillis();
+ long usedTimeMillis = currentTime - taskStartTime;
+ long usedTime = (usedTimeMillis + 29999) / 60000; // Over
30 seconds takes 1 minute, less than 30
+ //
seconds takes 0 minutes
+ if (usedTime >= taskExecutionContext.getTaskTimeout()) {
Review Comment:
The timeout comparison uses `usedTime >=
taskExecutionContext.getTaskTimeout()` but taskTimeout is already in seconds
(as indicated by the division by 60 on line 138 and 195). However, the
comparison here doesn't divide by 60, which means it's comparing minutes to
seconds. This is inconsistent with the timeout check in the cancel method.
##########
dolphinscheduler-ui/src/views/thirdparty-api-source/use-table.ts:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { reactive, ref } from 'vue'
+import {
+ queryDataSourceListPaging,
+ deleteDataSource
+} from '@/service/modules/data-source'
+
+export function useTable() {
+ const data = reactive({
+ page: 1,
+ pageSize: 10,
+ itemCount: 0,
+ searchVal: ref(''),
+ list: [],
+ loading: false
+ })
+
+ const getList = async () => {
+ if (data.loading) return
+ data.loading = true
+
+ const listRes = await queryDataSourceListPaging({
+ pageNo: data.page,
+ pageSize: data.pageSize,
+ searchVal: data.searchVal
+ })
+ data.loading = false
+ data.list = listRes.totalList
+ data.itemCount = listRes.total
+ }
+
+ const updateList = () => {
+ if (data.list.length === 1 && data.page > 1) {
+ --data.page
+ }
+ getList()
+ }
+
+ const deleteRecord = async (id: number) => {
+ const ignored = await deleteDataSource(id)
Review Comment:
Unused variable ignored.
##########
dolphinscheduler-task-plugin/dolphinscheduler-task-external-system/src/main/java/org/apache/dolphinscheduler/plugin/task/externalSystem/ExternalSystemTask.java:
##########
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.externalSystem;
+
+import org.apache.dolphinscheduler.common.model.OkHttpRequestHeaderContentType;
+import org.apache.dolphinscheduler.common.model.OkHttpRequestHeaders;
+import org.apache.dolphinscheduler.common.model.OkHttpResponse;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.OkHttpUtils;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.AuthenticationUtils;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.InterfaceInfo;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.PollingFailureConfig;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.PollingInterfaceInfo;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.PollingSuccessConfig;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.RequestParameter;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.ResponseParameter;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.ThirdPartySystemConnectorConnectionParam;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.FormBody;
+
+import com.jayway.jsonpath.JsonPath;
+
+@Slf4j
+public class ExternalSystemTask extends AbstractTask {
+
+ private Boolean traceEnabled = true;
+
+ private ExternalSystemParameters externalSystemParameters;
+ private ThirdPartySystemConnectorConnectionParam baseExternalSystemParams;
+ private TaskExecutionContext taskExecutionContext;
+ private String accessToken;
+ private Map<String, String> parameterMap = new HashMap<>();
+ private Set<String> successStatusCache = new HashSet<>();
+ private Set<String> failureStatusCache = new HashSet<>();
+
+ private boolean isTimeout = false;
+ private long taskStartTime;
+
+ public ExternalSystemTask(TaskExecutionContext taskExecutionContext) {
+ super(taskExecutionContext);
+ this.taskExecutionContext = taskExecutionContext;
+ this.externalSystemParameters =
+ JSONUtils.parseObject(taskExecutionContext.getTaskParams(),
ExternalSystemParameters.class);
+ baseExternalSystemParams =
+
externalSystemParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
+ try {
+ accessToken =
baseExternalSystemParams.getAuthConfig().getHeaderPrefix() + " "
+ +
AuthenticationUtils.authenticateAndGetToken(baseExternalSystemParams);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void init() {
+ externalSystemParameters = JSONUtils.parseObject(
+ taskExecutionContext.getTaskParams(),
+ ExternalSystemParameters.class);
+ log.info("Initialize external system task with externalSystemId: {},
externalTaskId: {}, externalTaskName: {}",
+ externalSystemParameters.getDatasource(),
Review Comment:
Variable [externalSystemParameters](1) may be null at this access as
suggested by [this](2) null guard.
##########
dolphinscheduler-task-plugin/dolphinscheduler-task-external-system/src/main/java/org/apache/dolphinscheduler/plugin/task/externalSystem/ExternalSystemParameters.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.externalSystem;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.ThirdPartySystemConnectorConnectionParam;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType;
+import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters;
+import
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
+
+import org.jetbrains.annotations.NotNull;
+
+public class ExternalSystemParameters extends AbstractParameters {
+
+ private int datasource;
+
+ private String authenticationToken;
+
+ private String externalTaskId;
+ private String externalTaskName;
+
+ public int getDatasource() {
+ return datasource;
+ }
+
+ public void setDatasource(int datasource) {
+ this.datasource = datasource;
+ }
+
+ public String getAuthenticationToken() {
+ return authenticationToken;
+ }
+
+ public void setAuthenticationToken(String authenticationToken) {
+ this.authenticationToken = authenticationToken;
+ }
+
+ public String getExternalTaskId() {
+ return externalTaskId;
+ }
+
+ public void setExternalTaskId(String externalTaskId) {
+ this.externalTaskId = externalTaskId;
+ }
+
+ public String getExternalTaskName() {
+ return externalTaskName;
+ }
+
+ public void setExternalTaskName(String externalTaskName) {
+ this.externalTaskName = externalTaskName;
+ }
+
+ @Override
+ public ResourceParametersHelper getResources() {
+ ResourceParametersHelper resources = super.getResources();
+ resources.put(ResourceType.DATASOURCE, datasource);
+ return resources;
+ }
+
+ public boolean checkParameters() {
Review Comment:
This method overrides [AbstractParameters.checkParameters](1); it is
advisable to add an Override annotation.
##########
dolphinscheduler-task-plugin/dolphinscheduler-task-external-system/src/main/java/org/apache/dolphinscheduler/plugin/task/externalSystem/ExternalSystemTask.java:
##########
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.externalSystem;
+
+import org.apache.dolphinscheduler.common.model.OkHttpRequestHeaderContentType;
+import org.apache.dolphinscheduler.common.model.OkHttpRequestHeaders;
+import org.apache.dolphinscheduler.common.model.OkHttpResponse;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.OkHttpUtils;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.AuthenticationUtils;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.InterfaceInfo;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.PollingFailureConfig;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.PollingInterfaceInfo;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.PollingSuccessConfig;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.RequestParameter;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.ResponseParameter;
+import
org.apache.dolphinscheduler.plugin.datasource.thirdpartysystemconnector.param.ThirdPartySystemConnectorConnectionParam;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.FormBody;
+
+import com.jayway.jsonpath.JsonPath;
+
+@Slf4j
+public class ExternalSystemTask extends AbstractTask {
+
+ private Boolean traceEnabled = true;
+
+ private ExternalSystemParameters externalSystemParameters;
+ private ThirdPartySystemConnectorConnectionParam baseExternalSystemParams;
+ private TaskExecutionContext taskExecutionContext;
+ private String accessToken;
+ private Map<String, String> parameterMap = new HashMap<>();
+ private Set<String> successStatusCache = new HashSet<>();
+ private Set<String> failureStatusCache = new HashSet<>();
+
+ private boolean isTimeout = false;
+ private long taskStartTime;
+
+ public ExternalSystemTask(TaskExecutionContext taskExecutionContext) {
+ super(taskExecutionContext);
+ this.taskExecutionContext = taskExecutionContext;
+ this.externalSystemParameters =
+ JSONUtils.parseObject(taskExecutionContext.getTaskParams(),
ExternalSystemParameters.class);
+ baseExternalSystemParams =
+
externalSystemParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
+ try {
+ accessToken =
baseExternalSystemParams.getAuthConfig().getHeaderPrefix() + " "
+ +
AuthenticationUtils.authenticateAndGetToken(baseExternalSystemParams);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void init() {
+ externalSystemParameters = JSONUtils.parseObject(
+ taskExecutionContext.getTaskParams(),
+ ExternalSystemParameters.class);
+ log.info("Initialize external system task with externalSystemId: {},
externalTaskId: {}, externalTaskName: {}",
+ externalSystemParameters.getDatasource(),
+ externalSystemParameters.getExternalTaskId(),
+ externalSystemParameters.getExternalTaskName());
+
+ if (externalSystemParameters == null ||
!externalSystemParameters.checkParameters()) {
+ throw new RuntimeException("external system task params is not
valid");
+ }
+
+ // Initialize parameter mapping
+ initParameterMap();
+ initStatusCache();
+ }
+
+ @Override
+ public void handle(TaskCallBack taskCallBack) throws TaskException {
+ try {
+ taskStartTime = System.currentTimeMillis();
+ submitExternalTask();
+ TimeUnit.SECONDS.sleep(10);
+ trackExternalTaskStatus();
+ } catch (Exception e) {
+ log.error("external system task error", e);
+ setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
+ throw new TaskException("Execute external system task failed", e);
+ }
+ }
+
+ @Override
+ public void cancel() throws TaskException {
+ try {
+ log.info("cancel external system task");
+ cancelTaskInstance();
+ } catch (Exception e) {
+ throw new TaskException("cancel external system task error", e);
+ } finally {
+ // Only set to failure status when timeout failure strategy is
enabled and it actually timed out
+ log.info("External task timeout check isTimeoutFailureEnabled:{}",
isTimeoutFailureEnabled());
+ if (isTimeoutFailureEnabled()) {
+ long currentTime = System.currentTimeMillis();
+ long usedTimeMillis = currentTime - taskStartTime;
+ long usedTime = (usedTimeMillis + 29999) / 60000; // Over 30
seconds takes 1 minute, less than 30
+ // seconds
takes 0 minutes
+ log.info(
+ "External task timeout check, used time: {}m, timeout:
{}m, currentTime: {}, taskStartTime: {}",
+ usedTime, taskExecutionContext.getTaskTimeout() / 60,
currentTime, taskStartTime);
+ if (usedTime >= taskExecutionContext.getTaskTimeout() / 60) {
+ isTimeout = true;
+ log.warn("External task timeout, used time: {}m, timeout:
{}m",
+ usedTime, taskExecutionContext.getTaskTimeout() /
60);
+ }
+ }
+ if (isTimeout) {
+ setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
+ log.info("External task cancelled due to timeout, set status
to FAILED");
+ } else {
+ setExitStatusCode(TaskConstants.EXIT_CODE_KILL);
+ log.info("External task cancelled manually or timeout not
enabled, set status to KILLED");
+ }
+ }
+ }
+
+ private void submitExternalTask() throws TaskException {
+ try {
+ InterfaceInfo submitConfig =
baseExternalSystemParams.getSubmitInterface();
+ String url =
replaceParameterPlaceholders(baseExternalSystemParams.getCompleteUrl(submitConfig.getUrl()));
+ Map<String, String> headers = new HashMap<>();
+ buildAuthHeader(accessToken, headers);
+ buildHeaders(submitConfig, headers);
+ Map<String, Object> requestBody = buildRequestBody(submitConfig);
+ Map<String, Object> requestParams =
buildRequestParams(submitConfig);
+
+ int interfaceTimeout =
baseExternalSystemParams.getInterfaceTimeout();
+ log.info("Using interface timeout value: {} milliseconds",
interfaceTimeout);
+ OkHttpResponse response =
+ executeRequestWithoutRetry(submitConfig.getMethod(), url,
headers, requestParams, requestBody,
+ interfaceTimeout, interfaceTimeout,
interfaceTimeout);
+ log.info("Submit task response:{}", response);
+ if (response.getStatusCode() != 200) {
+ throw new TaskException("Submit task failed: " +
response.getBody());
+ }
+
+ parseSubmitResponse(submitConfig.getResponseParameters(),
response.getBody());
+ } catch (Exception e) {
+ log.error("Submit task failed:{}", e);
+ throw new TaskException("Submit task failed", e);
+ }
+ }
+
+ private void trackExternalTaskStatus() throws TaskException {
+ try {
+ String status;
+ do {
+ // Only check timeout when timeout failure strategy is enabled
+ if (isTimeoutFailureEnabled()) {
+ long currentTime = System.currentTimeMillis();
+ long usedTimeMillis = currentTime - taskStartTime;
+ long usedTime = (usedTimeMillis + 29999) / 60000; // Over
30 seconds takes 1 minute, less than 30
+ //
seconds takes 0 minutes
+ if (usedTime >= taskExecutionContext.getTaskTimeout()) {
+ isTimeout = true;
+ log.error("External task timeout, used time: {}m,
timeout: {}m",
+ usedTime,
taskExecutionContext.getTaskTimeout() / 60);
+ setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
+ cancelTaskInstance();
+ return;
+ }
+ }
+
+ status = pollTaskStatus();
+
+ if (successStatusCache.contains(status)) {
+ setExitStatusCode(TaskConstants.EXIT_CODE_SUCCESS);
+ log.info("External task completed successfully with
status: {}", status);
+ return;
+ } else if (failureStatusCache.contains(status)) {
+ setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
+ log.error("External task failed with status: {}", status);
+ return;
+ }
+
+ TimeUnit.SECONDS.sleep(10);
+ } while (traceEnabled);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new TaskException("Task status tracking interrupted", e);
+ } catch (Exception e) {
+ log.error("Track task status failed", e);
+ setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
+ throw new TaskException("Track task status failed", e);
+ }
+ }
+
+ private String pollTaskStatus() throws TaskException {
+ try {
+ PollingInterfaceInfo pollConfig =
+ baseExternalSystemParams.getPollStatusInterface();
+ String url =
replaceParameterPlaceholders(baseExternalSystemParams.getCompleteUrl(pollConfig.getUrl()));
+ Map<String, String> headers = new HashMap<>();
+ buildAuthHeader(accessToken, headers);
+ buildHeaders(pollConfig, headers);
+ Map<String, Object> requestBody = buildRequestBody(pollConfig);
+ Map<String, Object> requestParams = buildRequestParams(pollConfig);
+
+ int interfaceTimeout =
baseExternalSystemParams.getInterfaceTimeout();
+ OkHttpResponse response =
+ executeRequestWithRetry(pollConfig.getMethod(), url,
headers, requestParams, requestBody,
+ interfaceTimeout, interfaceTimeout,
interfaceTimeout, 3);
+ log.info("poll task status response:{}", response);
+
+ if (response.getStatusCode() != 200) {
+ throw new TaskException("polling task failed: " +
response.getBody());
+ }
+
+ String successField =
pollConfig.getPollingSuccessConfig().getSuccessField();
+ String failureField =
pollConfig.getPollingFailureConfig().getFailureField();
+
+ Object successStatusObj = JsonPath.read(response.getBody(),
successField);
+ Object failureStatusObj = JsonPath.read(response.getBody(),
failureField);
+
+ log.info("PollTaskStatus successfully, external task instance
success status: {}, failure status: {}",
+ successStatusObj.toString(), failureStatusObj.toString());
Review Comment:
Variable [successStatusObj](1) may be null at this access as suggested by
[this](2) null guard.
##########
dolphinscheduler-ui/src/views/thirdparty-api-source/index.tsx:
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { defineComponent, ref, onMounted, computed } from 'vue'
+import {
+ NDataTable,
+ NButton,
+ NSpace,
+ NPopconfirm,
+ NIcon,
+ NPagination,
+ NTooltip
+} from 'naive-ui'
+import { useI18n } from 'vue-i18n'
+import { useRouter } from 'vue-router'
Review Comment:
Unused import useRouter.
##########
dolphinscheduler-ui/src/views/thirdparty-api-source/index.tsx:
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { defineComponent, ref, onMounted, computed } from 'vue'
+import {
+ NDataTable,
+ NButton,
+ NSpace,
+ NPopconfirm,
+ NIcon,
+ NPagination,
+ NTooltip
+} from 'naive-ui'
+import { useI18n } from 'vue-i18n'
+import { useRouter } from 'vue-router'
+import { deleteThirdpartyApiSource, queryThirdpartyApiSourceListPaging,
createThirdpartyApiSource, updateThirdpartyApiSource,
getThirdpartyApiSourceById, testThirdpartyApiSourceConnection } from
'@/service/modules/thirdparty-api-source'
+import type { ThirdpartyApiSource } from
'@/service/modules/thirdparty-api-source/types'
+import Card from '@/components/card'
+import Search from '@/components/input-search'
+import { EditOutlined, DeleteOutlined } from '@vicons/antd'
+import ThirdpartyApiSourceModal from './modal'
+
+export default defineComponent({
+ name: 'ThirdpartyApiSourceList',
+ setup() {
+ const { t } = useI18n()
+
+ const tableData = ref<ThirdpartyApiSource[]>([])
+ const loading = ref(false)
+ const searchVal = ref('')
+ const page = ref(1)
+ const pageSize = ref(10)
+ const itemCount = ref(0)
+ const showModal = ref(false)
+ const editData = ref<any>(null)
+ const operationType = ref<'create' | 'edit'>('create')
+
+ // 获取真实接口数据
+ const getTableData = async () => {
+ loading.value = true
+ try {
+ const res = await queryThirdpartyApiSourceListPaging({
+ pageNo: page.value,
+ pageSize: pageSize.value,
+ searchVal: searchVal.value || undefined
+ }) as any
+ if(res) {
+ tableData.value = (res.totalList || []) as ThirdpartyApiSource[]
+ itemCount.value = res.total || 0
+ }
+ } finally {
+ loading.value = false
+ }
+ }
+
+ const handleDelete = async (row: ThirdpartyApiSource) => {
+ await deleteThirdpartyApiSource(row.id!)
+ await getTableData()
+ }
+
+ const changePage = (p: number) => {
+ page.value = p
+ getTableData()
+ }
+ const changePageSize = (ps: number) => {
+ page.value = 1
+ pageSize.value = ps
+ getTableData()
+ }
+
+ const handleCreate = () => {
+ editData.value = null
+ operationType.value = 'create'
+ showModal.value = true
+ }
+ const handleEdit = async (row: any) => {
+ // 获取详情
+ const detail = await getThirdpartyApiSourceById(row.id)
+ editData.value = detail
+ operationType.value = 'edit'
+ showModal.value = true
+ }
+ const handleModalClose = () => {
+ showModal.value = false
+ editData.value = null
+ operationType.value = 'create'
+ }
+ const handleModalSubmit = async (data: any) => {
+ const res = data.id ? await updateThirdpartyApiSource(data.id, data) :
await createThirdpartyApiSource(data)
+ if(res) {
+ window.$message.success(data.id ? t('message.edit.success') :
t('message.create.success'))
+ } else {
+ window.$message.error(data.id ? t('message.edit.failed') :
t('message.create.failed'))
+ }
+ showModal.value = false
+ editData.value = null
+ getTableData()
+ }
+ const handleModalTest = async (data: any) => {
+ try {
+ const res = await testThirdpartyApiSourceConnection(data)
+ window.$message.success(
+ res && res.msg
+ ? res.msg
+ : `${t('datasource.test_connect')} ${t('datasource.success')}`
+ )
+ } catch (e: any) {
+ // Error handling is done by the calling function
+ }
+ }
+
+ const columns = computed(() => [
+ {
+ title: t('thirdparty_api_source.id'),
+ key: 'id'
+ },
+ {
+ title: t('thirdparty_api_source.system_name'),
+ key: 'name'
+ },
+ {
+ title: t('thirdparty_api_source.create_time'),
+ key: 'createTime',
+ render: (row: any) => row.createTime ? row.createTime : '-'
+ },
+ {
+ title: t('thirdparty_api_source.update_time'),
+ key: 'updateTime',
+ render: (row: any) => row.updateTime ? row.updateTime : '-'
+ },
+ {
+ title: t('datasource.operation'),
+ key: 'actions',
+ render: (row: ThirdpartyApiSource) => {
+ return (
+ <NSpace>
+ <NTooltip>
+ {{
+ trigger: () => (
+ <NButton
+ circle
+ type='info'
+ size='small'
+ onClick={() => handleEdit(row)}
+ >
+ <NIcon><EditOutlined /></NIcon>
+ </NButton>
+ ),
+ default: () => t('thirdparty_api_source.edit')
+ }}
+ </NTooltip>
+ <NTooltip>
+ {{
+ trigger: () => (
+ <NPopconfirm onPositiveClick={() => handleDelete(row)}>
+ {{
+ trigger: () => (
+ <NButton circle type='error' size='small'
class='btn-delete'>
+ <NIcon><DeleteOutlined /></NIcon>
+ </NButton>
+ ),
+ default: () => t('datasource.delete_confirm')
+ }}
+ </NPopconfirm>
+ ),
+ default: () => t('thirdparty_api_source.delete')
+ }}
+ </NTooltip>
+ </NSpace>
+ )
+ }
+ }
+ ])
+
+ onMounted(() => {
+ getTableData()
+ })
+
+ return () => (
+ <NSpace vertical>
+ <Card>
+ <NSpace justify='space-between'>
+ <NButton
+ type='primary'
+ size='small'
+ onClick={handleCreate}
+ >
+ {t('thirdparty_api_source.create_thirdparty_api_source')}
+ </NButton>
+ <NSpace>
+ <Search
+ placeholder={t('resource.file.enter_keyword_tips')}
+ v-model:value={searchVal.value}
+ onSearch={getTableData}
+ />
+ <NButton size='small' type='primary' onClick={getTableData}>
+ {t('thirdparty_api_source.search')}
+ </NButton>
+ </NSpace>
+ </NSpace>
+ </Card>
+ <Card title={t('thirdparty_api_source.thirdparty_api_source')}>
+ <NSpace vertical>
+ <NDataTable
+ loading={loading.value}
+ columns={columns.value}
+ data={tableData.value}
+ striped
+ size={'small'}
+ />
+ <NSpace justify='center'>
+ <NPagination
+ page={page.value}
Review Comment:
This write to property 'page' is useless, since [another property write](1)
always overrides it.
##########
dolphinscheduler-ui/src/views/thirdparty-api-source/index.tsx:
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { defineComponent, ref, onMounted, computed } from 'vue'
+import {
+ NDataTable,
+ NButton,
+ NSpace,
+ NPopconfirm,
+ NIcon,
+ NPagination,
+ NTooltip
+} from 'naive-ui'
+import { useI18n } from 'vue-i18n'
+import { useRouter } from 'vue-router'
+import { deleteThirdpartyApiSource, queryThirdpartyApiSourceListPaging,
createThirdpartyApiSource, updateThirdpartyApiSource,
getThirdpartyApiSourceById, testThirdpartyApiSourceConnection } from
'@/service/modules/thirdparty-api-source'
+import type { ThirdpartyApiSource } from
'@/service/modules/thirdparty-api-source/types'
+import Card from '@/components/card'
+import Search from '@/components/input-search'
+import { EditOutlined, DeleteOutlined } from '@vicons/antd'
+import ThirdpartyApiSourceModal from './modal'
+
+export default defineComponent({
+ name: 'ThirdpartyApiSourceList',
+ setup() {
+ const { t } = useI18n()
+
+ const tableData = ref<ThirdpartyApiSource[]>([])
+ const loading = ref(false)
+ const searchVal = ref('')
+ const page = ref(1)
+ const pageSize = ref(10)
+ const itemCount = ref(0)
+ const showModal = ref(false)
+ const editData = ref<any>(null)
+ const operationType = ref<'create' | 'edit'>('create')
+
+ // 获取真实接口数据
+ const getTableData = async () => {
+ loading.value = true
+ try {
+ const res = await queryThirdpartyApiSourceListPaging({
+ pageNo: page.value,
+ pageSize: pageSize.value,
+ searchVal: searchVal.value || undefined
+ }) as any
+ if(res) {
+ tableData.value = (res.totalList || []) as ThirdpartyApiSource[]
+ itemCount.value = res.total || 0
+ }
+ } finally {
+ loading.value = false
+ }
+ }
+
+ const handleDelete = async (row: ThirdpartyApiSource) => {
+ await deleteThirdpartyApiSource(row.id!)
+ await getTableData()
+ }
+
+ const changePage = (p: number) => {
+ page.value = p
+ getTableData()
+ }
+ const changePageSize = (ps: number) => {
+ page.value = 1
+ pageSize.value = ps
+ getTableData()
+ }
+
+ const handleCreate = () => {
+ editData.value = null
+ operationType.value = 'create'
+ showModal.value = true
+ }
+ const handleEdit = async (row: any) => {
+ // 获取详情
+ const detail = await getThirdpartyApiSourceById(row.id)
+ editData.value = detail
+ operationType.value = 'edit'
+ showModal.value = true
+ }
+ const handleModalClose = () => {
+ showModal.value = false
+ editData.value = null
+ operationType.value = 'create'
+ }
+ const handleModalSubmit = async (data: any) => {
+ const res = data.id ? await updateThirdpartyApiSource(data.id, data) :
await createThirdpartyApiSource(data)
+ if(res) {
+ window.$message.success(data.id ? t('message.edit.success') :
t('message.create.success'))
+ } else {
+ window.$message.error(data.id ? t('message.edit.failed') :
t('message.create.failed'))
+ }
+ showModal.value = false
+ editData.value = null
+ getTableData()
+ }
+ const handleModalTest = async (data: any) => {
+ try {
+ const res = await testThirdpartyApiSourceConnection(data)
+ window.$message.success(
+ res && res.msg
+ ? res.msg
+ : `${t('datasource.test_connect')} ${t('datasource.success')}`
+ )
+ } catch (e: any) {
+ // Error handling is done by the calling function
+ }
+ }
+
+ const columns = computed(() => [
+ {
+ title: t('thirdparty_api_source.id'),
+ key: 'id'
+ },
+ {
+ title: t('thirdparty_api_source.system_name'),
+ key: 'name'
+ },
+ {
+ title: t('thirdparty_api_source.create_time'),
+ key: 'createTime',
+ render: (row: any) => row.createTime ? row.createTime : '-'
+ },
+ {
+ title: t('thirdparty_api_source.update_time'),
+ key: 'updateTime',
+ render: (row: any) => row.updateTime ? row.updateTime : '-'
+ },
+ {
+ title: t('datasource.operation'),
+ key: 'actions',
+ render: (row: ThirdpartyApiSource) => {
+ return (
+ <NSpace>
+ <NTooltip>
+ {{
+ trigger: () => (
+ <NButton
+ circle
+ type='info'
+ size='small'
+ onClick={() => handleEdit(row)}
+ >
+ <NIcon><EditOutlined /></NIcon>
+ </NButton>
+ ),
+ default: () => t('thirdparty_api_source.edit')
+ }}
+ </NTooltip>
+ <NTooltip>
+ {{
+ trigger: () => (
+ <NPopconfirm onPositiveClick={() => handleDelete(row)}>
+ {{
+ trigger: () => (
+ <NButton circle type='error' size='small'
class='btn-delete'>
+ <NIcon><DeleteOutlined /></NIcon>
+ </NButton>
+ ),
+ default: () => t('datasource.delete_confirm')
+ }}
+ </NPopconfirm>
+ ),
+ default: () => t('thirdparty_api_source.delete')
+ }}
+ </NTooltip>
+ </NSpace>
+ )
+ }
+ }
+ ])
+
+ onMounted(() => {
+ getTableData()
+ })
+
+ return () => (
+ <NSpace vertical>
+ <Card>
+ <NSpace justify='space-between'>
+ <NButton
+ type='primary'
+ size='small'
+ onClick={handleCreate}
+ >
+ {t('thirdparty_api_source.create_thirdparty_api_source')}
+ </NButton>
+ <NSpace>
+ <Search
+ placeholder={t('resource.file.enter_keyword_tips')}
+ v-model:value={searchVal.value}
+ onSearch={getTableData}
+ />
+ <NButton size='small' type='primary' onClick={getTableData}>
+ {t('thirdparty_api_source.search')}
+ </NButton>
+ </NSpace>
+ </NSpace>
+ </Card>
+ <Card title={t('thirdparty_api_source.thirdparty_api_source')}>
+ <NSpace vertical>
+ <NDataTable
+ loading={loading.value}
+ columns={columns.value}
+ data={tableData.value}
+ striped
+ size={'small'}
+ />
+ <NSpace justify='center'>
+ <NPagination
+ page={page.value}
+ page-size={pageSize.value}
Review Comment:
This write to property 'page-size' is useless, since [another property
write](1) always overrides it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]