This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch 2.0.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0.2-prepare by this push:
new c956185 [Feature] 2.0.2-prepare bug fix of Pressure tests #7511
(#7540)
c956185 is described below
commit c956185e00118bde6df99c2aaf3017d974515ebf
Author: zwZjut <[email protected]>
AuthorDate: Wed Dec 22 18:57:51 2021 +0800
[Feature] 2.0.2-prepare bug fix of Pressure tests #7511 (#7540)
* [Feature][dolphinscheduler-api] parse traceId in http header for Cross
system delivery to #7237 (#7238)
* to #7237
* rerun test
Co-authored-by: honghuo.zw <[email protected]>
* chery-pick 05aef27 and handle conflicts
* to #7065: fix ExecutorService and schedulerService (#7072)
Co-authored-by: honghuo.zw <[email protected]>
* [Feature][dolphinscheduler-api] access control of taskDefinition and
taskInstance in project to #7081 (#7082)
* to #7081
* fix #7081
* to #7081
Co-authored-by: honghuo.zw <[email protected]>
* chery-pick 8ebe060 and handle conflicts
* cherry-pick 1f18444 and handle conflicts
* fix #6807: dolphinscheduler.zookeeper.env_vars - >
dolphinscheduler.registry.env_vars (#6808)
Co-authored-by: honghuo.zw <[email protected]>
Co-authored-by: Kirs <[email protected]>
* add default constructor (#6780)
Co-authored-by: honghuo.zw <[email protected]>
* to #7108 (#7109)
* to #7511
* to #7511
* to #7511
* to #7511
Co-authored-by: honghuo.zw <[email protected]>
Co-authored-by: Kirs <[email protected]>
---
.../application-api.properties.tpl | 3 +
.../conf/dolphinscheduler/master.properties.tpl | 4 +
.../conf/dolphinscheduler/quartz.properties.tpl | 3 +-
.../conf/dolphinscheduler/registry.properties.tpl | 1 +
.../conf/dolphinscheduler/worker.properties.tpl | 3 +
docker/kubernetes/dolphinscheduler/values.yaml | 10 ++
.../apache/dolphinscheduler/common/Constants.java | 3 +-
.../dao/entity/ProcessInstance.java | 18 ++
.../dao/mapper/ProcessInstanceMapMapper.java | 4 +-
.../dao/mapper/ProcessInstanceMapper.java | 8 +
.../dao/mapper/TaskInstanceMapper.java | 2 +
.../dao/mapper/ProcessInstanceMapper.xml | 13 +-
.../dao/mapper/TaskInstanceMapper.xml | 6 +
.../src/main/resources/sql/dolphinscheduler_h2.sql | 1 +
.../main/resources/sql/dolphinscheduler_mysql.sql | 1 +
.../resources/sql/dolphinscheduler_postgre.sql | 1 +
.../src/main/resources/sql/soft_version | 2 +-
.../2.0.2_schema/mysql/dolphinscheduler_ddl.sql | 20 +++
.../2.0.2_schema/mysql/dolphinscheduler_dml.sql | 16 ++
.../postgresql/dolphinscheduler_ddl.sql | 41 +++++
.../postgresql/dolphinscheduler_dml.sql | 16 ++
.../server/master/MasterServer.java | 23 ++-
.../server/master/config/MasterConfig.java | 21 +++
.../master/consumer/TaskPriorityQueueConsumer.java | 8 +-
.../processor/queue/TaskResponseService.java | 26 ++-
.../master/registry/MasterRegistryClient.java | 199 +++++++++++++++++----
.../registry/MasterRegistryDataListener.java | 4 +-
.../server/master/registry/ServerNodeManager.java | 4 +-
.../master/runner/FailoverExecuteThread.java | 91 ++++++++++
.../master/runner/MasterSchedulerService.java | 1 +
.../master/runner/WorkflowExecuteThread.java | 16 +-
.../master/runner/task/CommonTaskProcessor.java | 6 +-
.../server/worker/WorkerServer.java | 30 +++-
.../server/worker/config/WorkerConfig.java | 11 ++
.../worker/processor/DBTaskAckProcessor.java | 9 +-
.../worker/processor/DBTaskResponseProcessor.java | 5 +-
.../worker/registry/WorkerRegistryClient.java | 44 +++++
.../worker/runner/RetryReportTaskStatusThread.java | 16 +-
.../master/registry/MasterRegistryClientTest.java | 6 +-
.../service/process/ProcessService.java | 30 +++-
.../service/quartz/QuartzExecutors.java | 73 ++++----
.../service/queue/TaskPriorityQueueImpl.java | 2 +-
.../service/registry/RegistryClient.java | 2 +-
.../dolphinscheduler/spi/utils/PropertyUtils.java | 3 +
44 files changed, 670 insertions(+), 136 deletions(-)
diff --git a/docker/build/conf/dolphinscheduler/application-api.properties.tpl
b/docker/build/conf/dolphinscheduler/application-api.properties.tpl
index d78db2d..393a33c 100644
--- a/docker/build/conf/dolphinscheduler/application-api.properties.tpl
+++ b/docker/build/conf/dolphinscheduler/application-api.properties.tpl
@@ -38,6 +38,9 @@
server.compression.mime-types=text/html,text/xml,text/plain,text/css,text/javasc
# max http post size
server.jetty.max-http-form-post-size=5000000
+# max http header size
+server.max-http-header-size=81920
+
# messages encoding
spring.messages.encoding=UTF-8
diff --git a/docker/build/conf/dolphinscheduler/master.properties.tpl
b/docker/build/conf/dolphinscheduler/master.properties.tpl
index 046d5c1..98ca3dd 100644
--- a/docker/build/conf/dolphinscheduler/master.properties.tpl
+++ b/docker/build/conf/dolphinscheduler/master.properties.tpl
@@ -44,3 +44,7 @@ master.max.cpuload.avg=${MASTER_MAX_CPULOAD_AVG}
# master reserved memory, only lower than system available memory, master
server can schedule. default value 0.3, the unit is G
master.reserved.memory=${MASTER_RESERVED_MEMORY}
+# master failover interval minutes
+master.failover.interval=${MASTER_FAILOVER_INTERVAL}
+# master kill yarn job when handle failover
+master.kill.yarn.job.when.handle.failover=${MASTER_KILL_YARN_JOB_WHEN_HANDLE_FAILOVER}
\ No newline at end of file
diff --git a/docker/build/conf/dolphinscheduler/quartz.properties.tpl
b/docker/build/conf/dolphinscheduler/quartz.properties.tpl
index 45c61a6..5f011f9 100644
--- a/docker/build/conf/dolphinscheduler/quartz.properties.tpl
+++ b/docker/build/conf/dolphinscheduler/quartz.properties.tpl
@@ -32,7 +32,8 @@
#org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
#org.quartz.threadPool.makeThreadsDaemons = true
-#org.quartz.threadPool.threadCount = 25
+org.quartz.threadPool.threadCount = ${ORG_QUARTZ_THREADPOOL_THREADCOUNT}
+org.quartz.scheduler.batchTriggerAcquisitionMaxCount =
${ORG_QUARTZ_SCHEDULER_BATCHTRIGGERACQUISTITIONMAXCOUNT}
#org.quartz.threadPool.threadPriority = 5
#============================================================================
diff --git a/docker/build/conf/dolphinscheduler/registry.properties.tpl
b/docker/build/conf/dolphinscheduler/registry.properties.tpl
index 9ee8add..e1ac104 100644
--- a/docker/build/conf/dolphinscheduler/registry.properties.tpl
+++ b/docker/build/conf/dolphinscheduler/registry.properties.tpl
@@ -17,3 +17,4 @@
registry.plugin.name=${REGISTRY_PLUGIN_NAME}
registry.servers=${REGISTRY_SERVERS}
+session.timeout.ms=${SESSION_TIMEOUT_MS}
\ No newline at end of file
diff --git a/docker/build/conf/dolphinscheduler/worker.properties.tpl
b/docker/build/conf/dolphinscheduler/worker.properties.tpl
index 94a3352..e1f1574 100644
--- a/docker/build/conf/dolphinscheduler/worker.properties.tpl
+++ b/docker/build/conf/dolphinscheduler/worker.properties.tpl
@@ -41,3 +41,6 @@ worker.groups=${WORKER_GROUPS}
# alert server listen host
alert.listen.host=${ALERT_LISTEN_HOST}
+
+# worker retry report task statues interval seconds
+worker.retry.report.task.statues.interval=${WORKER_RETRY_REPORT_TASK_STATUS_INTERVAL}
\ No newline at end of file
diff --git a/docker/kubernetes/dolphinscheduler/values.yaml
b/docker/kubernetes/dolphinscheduler/values.yaml
index 6ef58d5..f0df9e7 100644
--- a/docker/kubernetes/dolphinscheduler/values.yaml
+++ b/docker/kubernetes/dolphinscheduler/values.yaml
@@ -53,6 +53,10 @@ externalDatabase:
## If not exists external zookeeper, by default, Dolphinscheduler's zookeeper
will use it.
zookeeper:
enabled: true
+ tickTime: 3000
+ maxSessionTimeout: 60000
+ initLimit: 300
+ maxClientCnxns: 2000
fourlwCommandsWhitelist: "srvr,ruok,wchs,cons"
persistence:
enabled: false
@@ -158,6 +162,10 @@ master:
MASTER_TASK_COMMIT_INTERVAL: "1000"
MASTER_MAX_CPULOAD_AVG: "-1"
MASTER_RESERVED_MEMORY: "0.3"
+ MASTER_KILL_YARN_JOB_WHEN_HANDLE_FAILOVER: "true"
+ ORG_QUARTZ_THREADPOOL_THREADCOUNT: "25"
+ ORG_QUARTZ_SCHEDULER_BATCHTRIGGERACQUISTITIONMAXCOUNT: "1"
+ SESSION_TIMEOUT_MS: 60000
## Periodic probe of container liveness. Container will be restarted if the
probe fails. Cannot be updated.
## More info:
https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle#container-probes
livenessProbe:
@@ -225,6 +233,8 @@ worker:
WORKER_MAX_CPULOAD_AVG: "-1"
WORKER_RESERVED_MEMORY: "0.3"
WORKER_GROUPS: "default"
+ SESSION_TIMEOUT_MS: 60000
+ WORKER_RETRY_REPORT_TASK_STATUS_INTERVAL: 600
## Periodic probe of container liveness. Container will be restarted if the
probe fails. Cannot be updated.
## More info:
https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle#container-probes
livenessProbe:
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 5a7b7cd..809bcb1 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -53,7 +53,7 @@ public final class Constants {
public static final String ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK =
"org.quartz.jobStore.acquireTriggersWithinLock";
public static final String ORG_QUARTZ_JOBSTORE_DATASOURCE =
"org.quartz.jobStore.dataSource";
public static final String
ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS =
"org.quartz.dataSource.myDs.connectionProvider.class";
-
+ public static final String
ORG_QUARTZ_SCHEDULER_BATCHTRIGGERACQUISTITIONMAXCOUNT =
"org.quartz.scheduler.batchTriggerAcquisitionMaxCount";
/**
* quartz config default value
*/
@@ -66,6 +66,7 @@ public final class Constants {
public static final String QUARTZ_INSTANCENAME = "DolphinScheduler";
public static final String QUARTZ_INSTANCEID = "AUTO";
public static final String QUARTZ_ACQUIRETRIGGERSWITHINLOCK = "true";
+ public static final String QUARTZ_BATCHTRIGGERACQUISTITIONMAXCOUNT = "100";
/**
* common properties path
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
index 18c386b..f20b13a 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
@@ -244,6 +244,12 @@ public class ProcessInstance {
*/
private int dryRun;
+ /**
+ * re-start time
+ */
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
+ private Date restartTime;
+
public ProcessInstance() {
}
@@ -516,6 +522,14 @@ public class ProcessInstance {
this.dryRun = dryRun;
}
+ public Date getRestartTime() {
+ return restartTime;
+ }
+
+ public void setRestartTime(Date restartTime) {
+ this.restartTime = restartTime;
+ }
+
/**
* add command to history
*
@@ -684,6 +698,10 @@ public class ProcessInstance {
+ ", dryRun='"
+ dryRun
+ '\''
+ + '}'
+ + ", restartTime='"
+ + restartTime
+ + '\''
+ '}';
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapper.java
index 0e5a381..8ad7e89 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapper.java
@@ -17,11 +17,13 @@
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
-import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+
import org.apache.ibatis.annotations.Param;
import java.util.List;
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+
/**
* process instance map mapper interface
*/
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
index 51f6029..4a156ce 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
@@ -44,6 +44,14 @@ public interface ProcessInstanceMapper extends
BaseMapper<ProcessInstance> {
ProcessInstance queryDetailById(@Param("processId") int processId);
/**
+ * query process instance host by stateArray
+ *
+ * @param stateArray
+ * @return
+ */
+ List<String> queryNeedFailoverProcessInstanceHost(@Param("states") int[]
stateArray);
+
+ /**
* query process instance by host and stateArray
*
* @param host host
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
index 795004d..5e2597d 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
@@ -73,4 +73,6 @@ public interface TaskInstanceMapper extends
BaseMapper<TaskInstance> {
@Param("startTime") Date
startTime,
@Param("endTime") Date
endTime
);
+
+ int updateHostAndSubmitTimeById(@Param("id") int id, @Param("host") String
host, @Param("submitTime") Date submitTime);
}
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
index 77d96b5..7729695 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
@@ -23,7 +23,7 @@
command_type, command_param, task_depend_type, max_try_times,
failure_strategy, warning_type,
warning_group_id, schedule_time, command_start_time, global_params,
flag,
update_time, is_sub_process, executor_id, history_cmd,
- process_instance_priority, worker_group,environment_code, timeout,
tenant_id, var_pool, dry_run
+ process_instance_priority, worker_group,environment_code, timeout,
tenant_id, var_pool, dry_run, restart_time
</sql>
<select id="queryDetailById"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select
@@ -45,7 +45,14 @@
</foreach>
order by id asc
</select>
-
+ <select id="queryNeedFailoverProcessInstanceHost" resultType="String">
+ select distinct host
+ from t_ds_process_instance
+ where state in
+ <foreach collection="states" item="i" open="(" close=")" separator=",">
+ #{i}
+ </foreach>
+ </select>
<select id="queryTopNProcessInstance"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select
<include refid="baseSql"/>
@@ -93,7 +100,7 @@
<select id="queryProcessInstanceListPaging"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select instance.id, instance.command_type, instance.executor_id,
instance.process_definition_version,
instance.process_definition_code, instance.name, instance.state,
instance.schedule_time, instance.start_time,
- instance.end_time, instance.run_times, instance.recovery,
instance.host, instance.dry_run
+ instance.end_time, instance.run_times, instance.recovery,
instance.host, instance.dry_run, instance.restart_time
from t_ds_process_instance instance
join t_ds_process_definition define ON
instance.process_definition_code = define.code
where instance.is_sub_process=0
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
index f41b58a..bdbc538 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
@@ -162,4 +162,10 @@
</if>
order by instance.start_time desc
</select>
+ <update id="updateHostAndSubmitTimeById">
+ update t_ds_task_instance
+ set host = #{host},
+ submit_time = #{submitTime}
+ where id = #{id}
+ </update>
</mapper>
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
index c85e106..4f6e900 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
@@ -601,6 +601,7 @@ CREATE TABLE t_ds_process_instance
tenant_id int(11) NOT NULL DEFAULT '-1',
var_pool longtext,
dry_run int NULL DEFAULT 0,
+ restart_time datetime DEFAULT NULL,
PRIMARY KEY (id)
);
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
index 35e8d5a..9e52d60 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
@@ -596,6 +596,7 @@ CREATE TABLE `t_ds_process_instance` (
`var_pool` longtext COMMENT 'var_pool',
`dry_run` tinyint(4) DEFAULT '0' COMMENT 'dry run flagļ¼0 normal, 1 dry run',
`next_process_instance_id` int(11) DEFAULT '0' COMMENT 'serial queue next
processInstanceId',
+ `restart_time` datetime DEFAULT NULL COMMENT 'process instance restart time',
PRIMARY KEY (`id`),
KEY `process_instance_index` (`process_definition_code`,`id`) USING BTREE,
KEY `start_time_index` (`start_time`,`end_time`) USING BTREE
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgre.sql
b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgre.sql
index 3683b9b..5c02006 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgre.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgre.sql
@@ -501,6 +501,7 @@ CREATE TABLE t_ds_process_instance (
var_pool text ,
dry_run int DEFAULT '0' ,
next_process_instance_id int DEFAULT '0',
+ restart_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
) ;
diff --git a/dolphinscheduler-dao/src/main/resources/sql/soft_version
b/dolphinscheduler-dao/src/main/resources/sql/soft_version
index 10bf840..f93ea0c 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/soft_version
+++ b/dolphinscheduler-dao/src/main/resources/sql/soft_version
@@ -1 +1 @@
-2.0.1
\ No newline at end of file
+2.0.2
\ No newline at end of file
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_ddl.sql
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_ddl.sql
new file mode 100644
index 0000000..c7a2396
--- /dev/null
+++
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_ddl.sql
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+SET sql_mode=(SELECT REPLACE(@@sql_mode,'ONLY_FULL_GROUP_BY',''));
+
+alter table t_ds_process_instance add column if not exists `restart_time`
datetime DEFAULT NULL COMMENT 'process instance restart time';
\ No newline at end of file
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_dml.sql
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_dml.sql
new file mode 100644
index 0000000..38964cc
--- /dev/null
+++
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_dml.sql
@@ -0,0 +1,16 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
\ No newline at end of file
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_ddl.sql
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_ddl.sql
new file mode 100644
index 0000000..d26cf8e
--- /dev/null
+++
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_ddl.sql
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+delimiter d//
+CREATE OR REPLACE FUNCTION public.dolphin_update_metadata(
+ )
+ RETURNS character varying
+ LANGUAGE 'plpgsql'
+ COST 100
+ VOLATILE PARALLEL UNSAFE
+AS $BODY$
+DECLARE
+v_schema varchar;
+BEGIN
+ ---get schema name
+ v_schema =current_schema();
+
+EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_process_instance ADD
COLUMN IF NOT EXISTS "restart_time" timestamp DEFAULT NULL';
+return 'Success!';
+exception when others then
+ ---Raise EXCEPTION '(%)',SQLERRM;
+ return SQLERRM;
+END;
+$BODY$;
+
+select dolphin_update_metadata();
+
+d//
\ No newline at end of file
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_dml.sql
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_dml.sql
new file mode 100644
index 0000000..38964cc
--- /dev/null
+++
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_dml.sql
@@ -0,0 +1,16 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
\ No newline at end of file
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index ce0a080..e9d0cc5 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.server.master;
+import static
org.apache.dolphinscheduler.common.Constants.SPRING_DATASOURCE_DRIVER_CLASS_NAME;
+
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.thread.Stopper;
@@ -31,8 +33,9 @@ import
org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProce
import
org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
-import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
+import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;
@@ -51,8 +54,6 @@ import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType;
import org.springframework.transaction.annotation.EnableTransactionManagement;
-import static
org.apache.dolphinscheduler.common.Constants.SPRING_DATASOURCE_DRIVER_CLASS_NAME;
-
/**
* master server
*/
@@ -105,6 +106,9 @@ public class MasterServer implements IStoppable {
@Autowired
private EventExecuteService eventExecuteService;
+ @Autowired
+ private FailoverExecuteThread failoverExecuteThread;
+
@Value("${spring.datasource.driver-class-name}")
private String driverClassName;
@@ -145,8 +149,8 @@ public class MasterServer implements IStoppable {
// self tolerant
this.masterRegistryClient.init(this.processInstanceExecMaps);
- this.masterRegistryClient.start();
this.masterRegistryClient.setRegistryStoppable(this);
+ this.masterRegistryClient.start();
this.eventExecuteService.init(this.processInstanceExecMaps);
this.eventExecuteService.start();
@@ -155,6 +159,8 @@ public class MasterServer implements IStoppable {
this.masterSchedulerService.start();
+ this.failoverExecuteThread.start();
+
// start QuartzExecutors
// what system should do if exception
try {
@@ -217,8 +223,17 @@ public class MasterServer implements IStoppable {
}
// close spring Context and will invoke method with @PreDestroy
annotation to destory beans. like
ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
springApplicationContext.close();
+ logger.info("springApplicationContext close");
} catch (Exception e) {
logger.error("master server stop exception ", e);
+ } finally {
+ try {
+ // thread sleep 60 seconds for quietly stop
+ Thread.sleep(60000L);
+ } catch (Exception e) {
+ logger.warn("thread sleep exception ", e);
+ }
+ System.exit(1);
}
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index 124eceb..13a68c4 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -63,6 +63,12 @@ public class MasterConfig {
@Value("${master.cache.process.definition:true}")
private boolean masterCacheProcessDefinition;
+ @Value("${master.failover.interval:10}")
+ private int failoverInterval;
+
+ @Value("${master.kill.yarn.job.when.handle.fail.over:true}")
+ private boolean masterKillYarnJobWhenHandleFailOver;
+
public int getListenPort() {
return listenPort;
}
@@ -162,4 +168,19 @@ public class MasterConfig {
this.masterCacheProcessDefinition = masterCacheProcessDefinition;
}
+ public int getFailoverInterval() {
+ return failoverInterval;
+ }
+
+ public void setFailoverInterval(int failoverInterval) {
+ this.failoverInterval = failoverInterval;
+ }
+
+ public boolean getMasterKillYarnJobWhenHandleFailOver() {
+ return masterKillYarnJobWhenHandleFailOver;
+ }
+
+ public void setMasterKillYarnJobWhenHandleFailOver(boolean
masterKillYarnJobWhenHandleFailOver) {
+ this.masterKillYarnJobWhenHandleFailOver =
masterKillYarnJobWhenHandleFailOver;
+ }
}
\ No newline at end of file
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index 7b18e2b..574f5db 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -31,6 +31,7 @@ import
org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import java.util.ArrayList;
+import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@@ -136,8 +137,13 @@ public class TaskPriorityQueueConsumer extends Thread {
} else {
result = dispatcher.dispatch(executionContext);
}
+ if (result) {
+
processService.updateHostAndSubmitTimeById(taskPriority.getTaskId(),
executionContext.getHost().getAddress(), new Date());
+ }
} catch (ExecuteException e) {
- logger.error("dispatch error: {}", e.getMessage(),e);
+ logger.error("ExecuteException dispatch error: {}",
e.getMessage(), e);
+ } catch (Throwable t) {
+ logger.error("dispatch error: {}", t, t);
}
return result;
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
index 27b96e1..a320a70 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
@@ -110,6 +110,7 @@ public class TaskResponseService {
public void addResponse(TaskResponseEvent taskResponseEvent) {
try {
eventQueue.put(taskResponseEvent);
+ logger.debug("eventQueue size:{}", eventQueue.size());
} catch (InterruptedException e) {
logger.error("put task : {} error :{}", taskResponseEvent, e);
Thread.currentThread().interrupt();
@@ -155,36 +156,49 @@ public class TaskResponseService {
try {
if (taskInstance != null) {
ExecutionStatus status =
taskInstance.getState().typeIsFinished() ? taskInstance.getState() :
taskResponseEvent.getState();
- processService.changeTaskState(taskInstance, status,
+ boolean result =
processService.changeTaskState(taskInstance, status,
taskResponseEvent.getStartTime(),
taskResponseEvent.getWorkerAddress(),
taskResponseEvent.getExecutePath(),
taskResponseEvent.getLogPath(),
taskResponseEvent.getTaskInstanceId());
+ logger.debug("changeTaskState in ACK , changed in
meta:{} ,task instance state:{}, task response event state:{}, taskInstance
id:{},taskInstance host:{}",
+ result, taskInstance.getState(),
taskResponseEvent.getState(), taskInstance.getId(), taskInstance.getHost());
}
// if taskInstance is null (maybe deleted) . retry will be
meaningless . so ack success
DBTaskAckCommand taskAckCommand = new
DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(),
taskResponseEvent.getTaskInstanceId());
channel.writeAndFlush(taskAckCommand.convert2Command());
+ logger.debug("worker ack master success, taskInstance
id:{},taskInstance host:{}", taskInstance.getId(), taskInstance.getHost());
} catch (Exception e) {
logger.error("worker ack master error", e);
- DBTaskAckCommand taskAckCommand = new
DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
+ DBTaskAckCommand taskAckCommand = new
DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), taskInstance == null ? -1 :
taskInstance.getId());
channel.writeAndFlush(taskAckCommand.convert2Command());
}
break;
case RESULT:
try {
+ boolean result = true;
if (taskInstance != null) {
- processService.changeTaskState(taskInstance,
taskResponseEvent.getState(),
+ result = processService.changeTaskState(taskInstance,
taskResponseEvent.getState(),
taskResponseEvent.getEndTime(),
taskResponseEvent.getProcessId(),
taskResponseEvent.getAppIds(),
taskResponseEvent.getTaskInstanceId(),
taskResponseEvent.getVarPool()
);
+ logger.debug("changeTaskState in RESULT , changed in
meta:{} task instance state:{}, task response event state:{}, taskInstance
id:{},taskInstance host:{}",
+ result, taskInstance.getState(),
taskResponseEvent.getState(), taskInstance.getId(), taskInstance.getHost());
+ }
+ if (!result) {
+ DBTaskResponseCommand taskResponseCommand = new
DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(),
taskResponseEvent.getTaskInstanceId());
+
channel.writeAndFlush(taskResponseCommand.convert2Command());
+ logger.debug("worker response master failure,
taskInstance id:{},taskInstance host:{}", taskInstance.getId(),
taskInstance.getHost());
+ } else {
+ // if taskInstance is null (maybe deleted) . retry
will be meaningless . so response success
+ DBTaskResponseCommand taskResponseCommand = new
DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(),
taskResponseEvent.getTaskInstanceId());
+
channel.writeAndFlush(taskResponseCommand.convert2Command());
+ logger.debug("worker response master success,
taskInstance id:{},taskInstance host:{}", taskInstance.getId(),
taskInstance.getHost());
}
- // if taskInstance is null (maybe deleted) . retry will be
meaningless . so response success
- DBTaskResponseCommand taskResponseCommand = new
DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(),
taskResponseEvent.getTaskInstanceId());
-
channel.writeAndFlush(taskResponseCommand.convert2Command());
} catch (Exception e) {
logger.error("worker response master error", e);
DBTaskResponseCommand taskResponseCommand = new
DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), -1);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index ae2b969..5f1eff6 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -43,6 +43,7 @@ import
org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import java.util.Collections;
@@ -127,16 +128,16 @@ public class MasterRegistryClient {
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
}
- // self tolerant
- if (registryClient.getActiveMasterNum() == 1) {
- removeNodePath(null, NodeType.MASTER, true);
- removeNodePath(null, NodeType.WORKER, true);
- }
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new
MasterRegistryDataListener());
} catch (Exception e) {
logger.error("master start up exception", e);
+ this.registryClient.getStoppable().stop("master start up
exception");
} finally {
- registryClient.releaseLock(nodeLock);
+ try {
+ registryClient.releaseLock(nodeLock);
+ } catch (Exception e) {
+ logger.error("release lock error", e);
+ }
}
}
@@ -150,18 +151,57 @@ public class MasterRegistryClient {
}
/**
- * remove zookeeper node path
+ * remove master node path
*
- * @param path zookeeper node path
- * @param nodeType zookeeper node type
+ * @param path node path
+ * @param nodeType node type
* @param failover is failover
*/
- public void removeNodePath(String path, NodeType nodeType, boolean
failover) {
+ public void removeMasterNodePath(String path, NodeType nodeType, boolean
failover) {
logger.info("{} node deleted : {}", nodeType, path);
- String failoverPath = getFailoverLockPath(nodeType);
+
+ if (StringUtils.isEmpty(path)) {
+ logger.error("server down error: empty path: {}, nodeType:{}",
path, nodeType);
+ return;
+ }
+
+ String serverHost = registryClient.getHostByEventDataPath(path);
+ if (StringUtils.isEmpty(serverHost)) {
+ logger.error("server down error: unknown path: {}, nodeType:{}",
path, nodeType);
+ return;
+ }
+
+ String failoverPath = getFailoverLockPath(nodeType, serverHost);
try {
registryClient.getLock(failoverPath);
+ if (!registryClient.exists(path)) {
+ logger.info("path: {} not exists", path);
+ // handle dead server
+ registryClient.handleDeadServer(Collections.singleton(path),
nodeType, Constants.ADD_OP);
+ }
+
+ //failover server
+ if (failover) {
+ failoverServerWhenDown(serverHost, nodeType);
+ }
+ } catch (Exception e) {
+ logger.error("{} server failover failed, host:{}", nodeType,
serverHost, e);
+ } finally {
+ registryClient.releaseLock(failoverPath);
+ }
+ }
+
+ /**
+ * remove worker node path
+ *
+ * @param path node path
+ * @param nodeType node type
+ * @param failover is failover
+ */
+ public void removeWorkerNodePath(String path, NodeType nodeType, boolean
failover) {
+ logger.info("{} node deleted : {}", nodeType, path);
+ try {
String serverHost = null;
if (!StringUtils.isEmpty(path)) {
serverHost = registryClient.getHostByEventDataPath(path);
@@ -169,18 +209,18 @@ public class MasterRegistryClient {
logger.error("server down error: unknown path: {}", path);
return;
}
- // handle dead server
- registryClient.handleDeadServer(Collections.singleton(path),
nodeType, Constants.ADD_OP);
+ if (!registryClient.exists(path)) {
+ logger.info("path: {} not exists", path);
+ // handle dead server
+
registryClient.handleDeadServer(Collections.singleton(path), nodeType,
Constants.ADD_OP);
+ }
}
//failover server
if (failover) {
failoverServerWhenDown(serverHost, nodeType);
}
} catch (Exception e) {
- logger.error("{} server failover failed.", nodeType);
- logger.error("failover exception ", e);
- } finally {
- registryClient.releaseLock(failoverPath);
+ logger.error("{} server failover failed", nodeType, e);
}
}
@@ -209,12 +249,12 @@ public class MasterRegistryClient {
* @param nodeType zookeeper node type
* @return fail over lock path
*/
- private String getFailoverLockPath(NodeType nodeType) {
+ public String getFailoverLockPath(NodeType nodeType, String host) {
switch (nodeType) {
case MASTER:
- return
Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS;
+ return
Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS + "/" + host;
case WORKER:
- return
Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS;
+ return
Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS + "/" + host;
default:
return "";
}
@@ -226,7 +266,11 @@ public class MasterRegistryClient {
* @param taskInstance task instance
* @return true if task instance need fail over
*/
- private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) {
+ private boolean checkTaskInstanceNeedFailover(List<Server> workerServers,
TaskInstance taskInstance) {
+
+ // first submit: host is null
+ // dispatch succeed: host is not null && submit_time is null
+ // ACK || RESULT from worker: host is not null && start_time is not
null
boolean taskNeedFailover = true;
@@ -234,14 +278,15 @@ public class MasterRegistryClient {
if (taskInstance.getHost() == null) {
return false;
}
-
- // if the worker node exists in zookeeper, we must check the task
starts after the worker
- if (registryClient.checkNodeExists(taskInstance.getHost(),
NodeType.WORKER)) {
- //if task start after worker starts, there is no need to failover
the task.
- if (checkTaskAfterWorkerStart(taskInstance)) {
- taskNeedFailover = false;
- }
+ // host is not null and submit time is null, master will retry
+ if (taskInstance.getSubmitTime() == null) {
+ return false;
+ }
+ //if task start after worker starts, there is no need to failover the
task.
+ if (checkTaskAfterWorkerStart(workerServers, taskInstance)) {
+ taskNeedFailover = false;
}
+
return taskNeedFailover;
}
@@ -270,6 +315,54 @@ public class MasterRegistryClient {
}
/**
+ * check task start after the worker server starts.
+ *
+ * @param taskInstance task instance
+ * @return true if task instance start time after worker server start date
+ */
+ private boolean checkTaskAfterWorkerStart(List<Server> workerServers,
TaskInstance taskInstance) {
+ if (StringUtils.isEmpty(taskInstance.getHost())) {
+ return false;
+ }
+
+ Date taskTime = taskInstance.getStartTime() == null ?
taskInstance.getSubmitTime() : taskInstance.getStartTime();
+
+ Date workerServerStartDate = getServerStartupTime(workerServers,
taskInstance.getHost());
+ if (workerServerStartDate != null) {
+ return taskTime.after(workerServerStartDate);
+ }
+ return false;
+ }
+
+ /**
+ * get server startup time
+ */
+ private Date getServerStartupTime(List<Server> servers, String host) {
+ if (CollectionUtils.isEmpty(servers)) {
+ return null;
+ }
+ Date serverStartupTime = null;
+ for (Server server : servers) {
+ if (host.equals(server.getHost() + Constants.COLON +
server.getPort())) {
+ serverStartupTime = server.getCreateTime();
+ break;
+ }
+ }
+ return serverStartupTime;
+ }
+
+ /**
+ * get server startup time
+ */
+ private Date getServerStartupTime(NodeType nodeType, String host) {
+ if (StringUtils.isEmpty(host)) {
+ return null;
+ }
+ List<Server> servers = registryClient.getServerList(nodeType);
+ return getServerStartupTime(servers, host);
+ }
+
+ /**
* failover worker tasks
* <p>
* 1. kill yarn job if there are yarn jobs in tasks.
@@ -279,10 +372,13 @@ public class MasterRegistryClient {
* @param workerHost worker host
*/
private void failoverWorker(String workerHost) {
+
if (StringUtils.isEmpty(workerHost)) {
return;
}
+ List<Server> workerServers =
registryClient.getServerList(NodeType.WORKER);
+
long startTime = System.currentTimeMillis();
List<TaskInstance> needFailoverTaskInstanceList =
processService.queryNeedFailoverTaskInstances(workerHost);
Map<Integer, ProcessInstance> processInstanceCacheMap = new
HashMap<>();
@@ -300,11 +396,17 @@ public class MasterRegistryClient {
processInstanceCacheMap.put(processInstance.getId(),
processInstance);
}
+ if (!checkTaskInstanceNeedFailover(workerServers, taskInstance)) {
+ continue;
+ }
+
// only failover the task owned myself if worker down.
- if (processInstance.getHost().equalsIgnoreCase(getLocalAddress()))
{
- logger.info("failover task instance id: {}, process instance
id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
- failoverTaskInstance(processInstance, taskInstance);
+ if
(!processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
+ continue;
}
+
+ logger.info("failover task instance id: {}, process instance id:
{}", taskInstance.getId(), taskInstance.getProcessInstanceId());
+ failoverTaskInstance(processInstance, taskInstance);
}
logger.info("end worker[{}] failover, useTime:{}ms", workerHost,
System.currentTimeMillis() - startTime);
}
@@ -316,11 +418,15 @@ public class MasterRegistryClient {
*
* @param masterHost master host
*/
- private void failoverMaster(String masterHost) {
+ public void failoverMaster(String masterHost) {
+
if (StringUtils.isEmpty(masterHost)) {
return;
}
+ Date serverStartupTime = getServerStartupTime(NodeType.MASTER,
masterHost);
+ List<Server> workerServers =
registryClient.getServerList(NodeType.WORKER);
+
long startTime = System.currentTimeMillis();
List<ProcessInstance> needFailoverProcessInstanceList =
processService.queryNeedFailoverProcessInstances(masterHost);
logger.info("start master[{}] failover, process list size:{}",
masterHost, needFailoverProcessInstanceList.size());
@@ -330,6 +436,11 @@ public class MasterRegistryClient {
continue;
}
+ if (serverStartupTime != null && processInstance.getRestartTime()
!= null
+ &&
processInstance.getRestartTime().after(serverStartupTime)) {
+ continue;
+ }
+
logger.info("failover process instance id: {}",
processInstance.getId());
List<TaskInstance> validTaskInstanceList =
processService.findValidTaskListByProcessId(processInstance.getId());
@@ -337,6 +448,12 @@ public class MasterRegistryClient {
if (Constants.NULL.equals(taskInstance.getHost())) {
continue;
}
+ if (taskInstance.getState().typeIsFinished()) {
+ continue;
+ }
+ if (!checkTaskInstanceNeedFailover(workerServers,
taskInstance)) {
+ continue;
+ }
logger.info("failover task instance id: {}, process instance
id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
failoverTaskInstance(processInstance, taskInstance);
}
@@ -347,6 +464,13 @@ public class MasterRegistryClient {
logger.info("master[{}] failover end, useTime:{}ms", masterHost,
System.currentTimeMillis() - startTime);
}
+ /**
+ * failover task instance
+ * <p>
+ * 1. kill yarn job if there are yarn jobs in tasks.
+ * 2. change task state from running to need failover.
+ * 3. try to notify local master
+ */
private void failoverTaskInstance(ProcessInstance processInstance,
TaskInstance taskInstance) {
if (taskInstance == null) {
logger.error("failover task instance error, taskInstance is null");
@@ -359,24 +483,23 @@ public class MasterRegistryClient {
return;
}
- if (!checkTaskInstanceNeedFailover(taskInstance)) {
- return;
- }
-
taskInstance.setProcessInstance(processInstance);
TaskExecutionContext taskExecutionContext =
TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance)
.create();
- // only kill yarn job if exists , the local thread has exited
- ProcessUtils.killYarnJob(taskExecutionContext);
+ if (masterConfig.getMasterKillYarnJobWhenHandleFailOver()) {
+ // only kill yarn job if exists , the local thread has exited
+ ProcessUtils.killYarnJob(taskExecutionContext);
+ }
taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
processService.saveTaskInstance(taskInstance);
WorkflowExecuteThread workflowExecuteThreadNotify =
processInstanceExecMaps.get(processInstance.getId());
if (workflowExecuteThreadNotify == null) {
+ logger.info("workflowExecuteThreadNotify is null, just return,
task id:{},process id:{}", taskInstance.getId(), processInstance.getId());
return;
}
StateEvent stateEvent = new StateEvent();
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
index cb5b6be..361f09f 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
@@ -63,7 +63,7 @@ public class MasterRegistryDataListener implements
SubscribeListener {
logger.info("master node added : {}", path);
break;
case REMOVE:
- masterRegistryClient.removeNodePath(path, NodeType.MASTER,
true);
+ masterRegistryClient.removeMasterNodePath(path,
NodeType.MASTER, true);
break;
default:
break;
@@ -78,7 +78,7 @@ public class MasterRegistryDataListener implements
SubscribeListener {
break;
case REMOVE:
logger.info("worker node deleted : {}", path);
- masterRegistryClient.removeNodePath(path, NodeType.WORKER,
true);
+ masterRegistryClient.removeWorkerNodePath(path,
NodeType.WORKER, true);
break;
default:
break;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index b7e904b..6b30a1a 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -51,6 +51,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
@@ -387,7 +388,8 @@ public class ServerNodeManager implements InitializingBean {
workerGroup = workerGroup.toLowerCase();
Set<String> nodes = workerGroupNodes.get(workerGroup);
if (CollectionUtils.isNotEmpty(nodes)) {
- return Collections.unmodifiableSet(nodes);
+ // avoid ConcurrentModificationException
+ return
Collections.unmodifiableSet(nodes.stream().collect(Collectors.toSet()));
}
return nodes;
} finally {
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
new file mode 100644
index 0000000..81d02a9
--- /dev/null
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.NodeType;
+import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class FailoverExecuteThread extends Thread {
+
+ private static final Logger logger =
LoggerFactory.getLogger(FailoverExecuteThread.class);
+
+ @Autowired
+ private MasterRegistryClient masterRegistryClient;
+
+ @Autowired
+ private RegistryClient registryClient;
+
+ @Autowired
+ private MasterConfig masterConfig;
+
+ /**
+ * process service
+ */
+ @Autowired
+ private ProcessService processService;
+
+ @Override
+ public synchronized void start() {
+ super.setName("FailoverExecuteThread");
+ super.start();
+ }
+
+ @Override
+ public void run() {
+ while (Stopper.isRunning()) {
+ logger.info("failover execute started");
+ try {
+ List<String> hosts =
processService.queryNeedFailoverProcessInstanceHost();
+ if (CollectionUtils.isEmpty(hosts)) {
+ continue;
+ }
+ for (String host : hosts) {
+ String failoverPath =
masterRegistryClient.getFailoverLockPath(NodeType.MASTER, host);
+ try {
+ registryClient.getLock(failoverPath);
+ masterRegistryClient.failoverMaster(host);
+ } catch (Exception e) {
+ logger.error("{} server failover failed, host:{}",
NodeType.MASTER, host, e);
+ } finally {
+ registryClient.releaseLock(failoverPath);
+ }
+ }
+ } catch (Exception e) {
+ logger.error("failover execute error", e);
+ } finally {
+ ThreadUtils.sleep((long) Constants.SLEEP_TIME_MILLIS *
masterConfig.getFailoverInterval() * 60);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index ca44ead..33c84b3 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -234,6 +234,7 @@ public class MasterSchedulerService extends Thread {
if (ServerNodeManager.MASTER_SIZE == 0) {
return null;
}
+ logger.debug("master size:{}",ServerNodeManager.MASTER_SIZE);
List<Command> commandList =
processService.findCommandPage(ServerNodeManager.MASTER_SIZE, pageNumber);
if (commandList.size() == 0) {
return null;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 640abb7..bc6159d 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -409,9 +409,10 @@ public class WorkflowExecuteThread implements Runnable {
private boolean checkStateEvent(StateEvent stateEvent) {
if (this.processInstance.getId() != stateEvent.getProcessInstanceId())
{
- logger.error("mismatch process instance id: {}, state event:{}",
+ logger.error("mismatch process instance id: {}, state event:{},
task instance id:{}",
this.processInstance.getId(),
- stateEvent.toString());
+ stateEvent.toString(),
+ stateEvent.getTaskInstanceId());
return false;
}
return true;
@@ -482,6 +483,7 @@ public class WorkflowExecuteThread implements Runnable {
processDefinition.getGlobalParamList(),
CommandType.COMPLEMENT_DATA,
processInstance.getScheduleTime()));
processInstance.setStartTime(new Date());
+ processInstance.setRestartTime(processInstance.getStartTime());
processInstance.setEndTime(null);
processService.saveProcessInstance(processInstance);
this.taskInstanceHashMap.clear();
@@ -876,11 +878,11 @@ public class WorkflowExecuteThread implements Runnable {
}
if
(completeTaskList.containsKey(Long.toString(task.getTaskCode()))) {
- logger.info("task {} has already run success", task.getName());
+ logger.info("task {} has already run success, task id:{}",
task.getName(), task.getId());
continue;
}
if (task.getState().typeIsPause() ||
task.getState().typeIsCancel()) {
- logger.info("task {} stopped, the state is {}",
task.getName(), task.getState());
+ logger.info("task {} stopped, the state is {}, task id:{}",
task.getName(), task.getState(), task.getId());
} else {
addTaskToStandByList(task);
}
@@ -1167,13 +1169,13 @@ public class WorkflowExecuteThread implements Runnable {
* @param taskInstance task instance
*/
private void addTaskToStandByList(TaskInstance taskInstance) {
- logger.info("add task to stand by list: {}", taskInstance.getName());
+ logger.info("add task to stand by list, task name: {} , task id:{}",
taskInstance.getName(), taskInstance.getId());
try {
if (!readyToSubmitTaskQueue.contains(taskInstance)) {
readyToSubmitTaskQueue.put(taskInstance);
}
} catch (Exception e) {
- logger.error("add task instance to readyToSubmitTaskQueue error,
taskName: {}", taskInstance.getName(), e);
+ logger.error("add task instance to readyToSubmitTaskQueue,
taskName: {}, task id: {}", taskInstance.getName(), taskInstance.getId(), e);
}
}
@@ -1253,7 +1255,7 @@ public class WorkflowExecuteThread implements Runnable {
TaskInstance retryTask =
processService.findTaskInstanceById(task.getId());
if (retryTask != null &&
retryTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) {
task.setState(retryTask.getState());
- logger.info("task: {} has been forced success, put it
into complete task list and stop retrying", task.getName());
+ logger.info("task name: {} has been forced success,
put it into complete task list and stop retrying, task id:{}", task.getName(),
task.getId());
removeTaskFromStandbyList(task);
completeTaskList.put(Long.toString(task.getTaskCode()), task);
submitPostNode(Long.toString(task.getTaskCode()));
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index ee1c548..23988f9 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -70,8 +70,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
if (this.taskInstance == null) {
return false;
}
- dispatchTask(taskInstance, processInstance);
- return true;
+ return dispatchTask(taskInstance, processInstance);
}
@Override
@@ -127,7 +126,8 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
taskPriority.setTaskExecutionContext(taskExecutionContext);
taskUpdateQueue.put(taskPriority);
- logger.info(String.format("master submit success, task : %s",
taskInstance.getName()));
+ logger.info("master submit success, task id:{}, task name:{},
process id:{}",
+ taskInstance.getId(), taskInstance.getName(),
taskInstance.getProcessInstanceId());
return true;
} catch (Exception e) {
logger.error("submit task Exception: ", e);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 66f4e53..c66718f 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -21,7 +21,6 @@ import static
org.apache.dolphinscheduler.common.Constants.SPRING_DATASOURCE_DRI
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
-import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
@@ -29,12 +28,19 @@ import
org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager;
-import org.apache.dolphinscheduler.server.worker.processor.*;
+import org.apache.dolphinscheduler.server.worker.processor.DBTaskAckProcessor;
+import
org.apache.dolphinscheduler.server.worker.processor.DBTaskResponseProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.HostUpdateProcessor;
+import
org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import
org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+
+import javax.annotation.PostConstruct;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -45,9 +51,6 @@ import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType;
import org.springframework.transaction.annotation.EnableTransactionManagement;
-import javax.annotation.PostConstruct;
-import java.util.Set;
-
/**
* worker server
*/
@@ -146,14 +149,14 @@ public class WorkerServer implements IStoppable {
try {
this.workerRegistryClient.registry();
this.workerRegistryClient.setRegistryStoppable(this);
- Set<String> workerZkPaths =
this.workerRegistryClient.getWorkerZkPaths();
-
- this.workerRegistryClient.handleDeadServer(workerZkPaths,
NodeType.WORKER, Constants.DELETE_OP);
} catch (Exception e) {
- logger.error(e.getMessage(), e);
+ logger.error("worker registry error", e);
throw new RuntimeException(e);
}
+ // solve dead lock
+
logger.info(org.apache.dolphinscheduler.spi.utils.PropertyUtils.dumpProperties());
+
// task execute manager
this.workerManagerThread.start();
@@ -194,8 +197,17 @@ public class WorkerServer implements IStoppable {
this.workerRegistryClient.unRegistry();
this.alertClientService.close();
this.springApplicationContext.close();
+ logger.info("springApplicationContext close");
} catch (Exception e) {
logger.error("worker server stop exception ", e);
+ } finally {
+ try {
+ // thread sleep 60 seconds for quietly stop
+ Thread.sleep(60000L);
+ } catch (Exception e) {
+ logger.warn("thread sleep exception ", e);
+ }
+ System.exit(1);
}
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index a3feb77..57119a7 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -65,6 +65,17 @@ public class WorkerConfig {
@Value("${task.plugin.binding:}")
private String taskPluginBinding;
+ @Value("${worker.retry.report.task.statues.interval:10}")
+ private int retryReportTaskStatusInterval;
+
+ public int getRetryReportTaskStatusInterval() {
+ return retryReportTaskStatusInterval;
+ }
+
+ public void setRetryReportTaskStatusInterval(int
retryReportTaskStatusInterval) {
+ this.retryReportTaskStatusInterval = retryReportTaskStatusInterval;
+ }
+
public int getListenPort() {
return listenPort;
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java
index a340ad7..3aac840 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java
@@ -17,17 +17,21 @@
package org.apache.dolphinscheduler.server.worker.processor;
-import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.remote.command.*;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
+import io.netty.channel.Channel;
+
/**
* db task ack processor
*/
@@ -50,6 +54,7 @@ public class DBTaskAckProcessor implements
NettyRequestProcessor {
if (taskAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()){
ResponceCache.get().removeAckCache(taskAckCommand.getTaskInstanceId());
+ logger.debug("removeAckCache: taskinstance id:{}",
taskAckCommand.getTaskInstanceId());
}
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
index 97a9cf5..6da9fdd 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.worker.processor;
-import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command;
@@ -25,11 +24,14 @@ import
org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
+import io.netty.channel.Channel;
+
/**
* db task response processor
*/
@@ -51,6 +53,7 @@ public class DBTaskResponseProcessor implements
NettyRequestProcessor {
if (taskResponseCommand.getStatus() ==
ExecutionStatus.SUCCESS.getCode()){
ResponceCache.get().removeResponseCache(taskResponseCommand.getTaskInstanceId());
+ logger.debug("removeResponseCache: taskinstance id:{}",
taskResponseCommand.getTaskInstanceId());
}
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
index 54ec49c..326fcb2 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
@@ -20,11 +20,14 @@ package org.apache.dolphinscheduler.server.worker.registry;
import static
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static
org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
@@ -104,6 +107,14 @@ public class WorkerRegistryClient {
logger.info("worker node : {} registry to ZK {} successfully",
address, workerZKPath);
}
+ while (!this.checkNodeExists()) {
+ ThreadUtils.sleep(SLEEP_TIME_MILLIS);
+ }
+
+ this.handleDeadServer(workerZkPaths, NodeType.WORKER,
Constants.DELETE_OP);
+
+ registryClient.addConnectionStateListener(this::handleConnectionState);
+
HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
workerConfig.getWorkerMaxCpuloadAvg(),
workerConfig.getWorkerReservedMemory(),
@@ -119,6 +130,32 @@ public class WorkerRegistryClient {
logger.info("worker node : {} heartbeat interval {} s", address,
workerHeartbeatInterval);
}
+ public void handleConnectionState(ConnectionState state) {
+ switch (state) {
+ case CONNECTED:
+ logger.info("registry connection state is {}", state);
+ break;
+ case SUSPENDED:
+ logger.info("registry connection state is {}, ready to stop
myself", state);
+ registryClient.getStoppable().stop("registry connection state
is SUSPENDED, stop myself");
+ break;
+ case RECONNECTED:
+ logger.info("registry connection state is {}, clean the node
info", state);
+ String address =
NetUtils.getAddr(workerConfig.getListenPort());
+ Set<String> workerZkPaths = getWorkerZkPaths();
+ for (String workerZKPath : workerZkPaths) {
+ registryClient.persistEphemeral(workerZKPath, "");
+ logger.info("worker node : {} reconnect to ZK {}
successfully", address, workerZKPath);
+ }
+ break;
+ case DISCONNECTED:
+ logger.info("registry connection state is {}, ready to stop
myself", state);
+ registryClient.getStoppable().stop("registry connection state
is DISCONNECTED, stop myself");
+ break;
+ default:
+ }
+ }
+
/**
* remove registry info
*/
@@ -177,4 +214,11 @@ public class WorkerRegistryClient {
registryClient.setStoppable(stoppable);
}
+ public boolean checkNodeExists() {
+ boolean result = registryClient.checkNodeExists(NetUtils.getHost(),
NodeType.WORKER);
+ if (result) {
+ logger.info("check worker, node exist success, host:{}",
NetUtils.getHost());
+ }
+ return result;
+ }
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
index b2d0031..f52be9d 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
@@ -18,18 +18,20 @@
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.thread.Stopper;
-
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+
+import java.util.Map;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import java.util.Map;
-
/**
* Retry Report Task Status Thread
*/
@@ -38,10 +40,8 @@ public class RetryReportTaskStatusThread implements Runnable
{
private final Logger logger =
LoggerFactory.getLogger(RetryReportTaskStatusThread.class);
- /**
- * every 5 minutes
- */
- private static long RETRY_REPORT_TASK_STATUS_INTERVAL = 5 * 60 * 1000L;
+ @Autowired
+ WorkerConfig workerConfig;
/**
* task callback service
@@ -68,7 +68,7 @@ public class RetryReportTaskStatusThread implements Runnable {
while (Stopper.isRunning()){
// sleep 5 minutes
- ThreadUtils.sleep(RETRY_REPORT_TASK_STATUS_INTERVAL);
+ ThreadUtils.sleep(workerConfig.getRetryReportTaskStatusInterval()
* 1000);
try {
if (!responceCache.getAckCache().isEmpty()){
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
index 65d6b89..7148105 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
@@ -114,9 +114,9 @@ public class MasterRegistryClientTest {
@Test
public void removeNodePathTest() {
- masterRegistryClient.removeNodePath("/path", NodeType.MASTER, false);
- masterRegistryClient.removeNodePath("/path", NodeType.MASTER, true);
+ masterRegistryClient.removeMasterNodePath("/path", NodeType.MASTER,
false);
+ masterRegistryClient.removeMasterNodePath("/path", NodeType.MASTER,
true);
//Cannot mock static methods
- masterRegistryClient.removeNodePath("/path", NodeType.WORKER, true);
+ masterRegistryClient.removeWorkerNodePath("/path", NodeType.WORKER,
true);
}
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index df7549c..cc968f9 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.service.process;
+import static java.util.stream.Collectors.toSet;
import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
@@ -27,8 +28,6 @@ import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS
import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
-import static java.util.stream.Collectors.toSet;
-
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -579,6 +578,7 @@ public class ProcessService {
processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
processInstance.setRecovery(Flag.NO);
processInstance.setStartTime(new Date());
+ processInstance.setRestartTime(processInstance.getStartTime());
processInstance.setRunTimes(1);
processInstance.setMaxTryTimes(0);
processInstance.setCommandParam(command.getCommandParam());
@@ -775,6 +775,7 @@ public class ProcessService {
processInstance.setScheduleTime(command.getScheduleTime());
}
processInstance.setHost(host);
+ processInstance.setRestartTime(new Date());
ExecutionStatus runStatus = ExecutionStatus.RUNNING_EXECUTION;
int runTime = processInstance.getRunTimes();
switch (commandType) {
@@ -844,6 +845,7 @@ public class ProcessService {
updateTaskInstance(taskInstance);
}
processInstance.setStartTime(new Date());
+ processInstance.setRestartTime(processInstance.getStartTime());
processInstance.setEndTime(null);
processInstance.setRunTimes(runTime + 1);
initComplementDataParam(processDefinition, processInstance,
cmdParam);
@@ -1015,6 +1017,7 @@ public class ProcessService {
}
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
updateTaskInstance(taskInstance);
+ logger.debug("update task instance, task instance id:{}",
taskInstance.getId());
}
/**
@@ -1317,9 +1320,6 @@ public class ProcessService {
taskInstance.setExecutorId(processInstance.getExecutorId());
taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority());
taskInstance.setState(getSubmitTaskState(taskInstance,
processInstanceState));
- if (taskInstance.getSubmitTime() == null) {
- taskInstance.setSubmitTime(new Date());
- }
if (taskInstance.getFirstSubmitTime() == null) {
taskInstance.setFirstSubmitTime(taskInstance.getSubmitTime());
}
@@ -1435,6 +1435,11 @@ public class ProcessService {
}
}
+ public boolean updateHostAndSubmitTimeById(int id, String host, Date date)
{
+ int count = taskInstanceMapper.updateHostAndSubmitTimeById(id, host,
date);
+ return count > 0;
+ }
+
/**
* insert task instance
*
@@ -1454,6 +1459,7 @@ public class ProcessService {
*/
public boolean updateTaskInstance(TaskInstance taskInstance) {
int count = taskInstanceMapper.updateById(taskInstance);
+ logger.debug("updateTaskInstance, task instance id:{}, state;{}",
taskInstance.getId(), taskInstance.getState());
return count > 0;
}
@@ -1691,8 +1697,9 @@ public class ProcessService {
* @param executePath executePath
* @param logPath logPath
* @param taskInstId taskInstId
+ * @reutrn
*/
- public void changeTaskState(TaskInstance taskInstance, ExecutionStatus
state, Date startTime, String host,
+ public boolean changeTaskState(TaskInstance taskInstance, ExecutionStatus
state, Date startTime, String host,
String executePath,
String logPath,
int taskInstId) {
@@ -1701,7 +1708,7 @@ public class ProcessService {
taskInstance.setHost(host);
taskInstance.setExecutePath(executePath);
taskInstance.setLogPath(logPath);
- saveTaskInstance(taskInstance);
+ return saveTaskInstance(taskInstance);
}
/**
@@ -1721,8 +1728,9 @@ public class ProcessService {
* @param endTime endTime
* @param taskInstId taskInstId
* @param varPool varPool
+ * @return
*/
- public void changeTaskState(TaskInstance taskInstance, ExecutionStatus
state,
+ public boolean changeTaskState(TaskInstance taskInstance, ExecutionStatus
state,
Date endTime,
int processId,
String appIds,
@@ -1734,7 +1742,7 @@ public class ProcessService {
taskInstance.setEndTime(endTime);
taskInstance.setVarPool(varPool);
changeOutParam(taskInstance);
- saveTaskInstance(taskInstance);
+ return saveTaskInstance(taskInstance);
}
/**
@@ -1819,6 +1827,10 @@ public class ProcessService {
return processInstanceMapper.queryByHostAndStatus(host, stateArray);
}
+ public List<String> queryNeedFailoverProcessInstanceHost() {
+ return
processInstanceMapper.queryNeedFailoverProcessInstanceHost(stateArray);
+ }
+
/**
* process need failover process instance
*
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java
index 622206a..86d73b6 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java
@@ -17,39 +17,6 @@
package org.apache.dolphinscheduler.service.quartz;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.commons.lang.StringUtils;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.PropertyUtils;
-import org.apache.dolphinscheduler.dao.entity.Schedule;
-import org.apache.dolphinscheduler.service.exceptions.ServiceException;
-import org.quartz.CronTrigger;
-import org.quartz.Job;
-import org.quartz.JobDetail;
-import org.quartz.JobKey;
-import org.quartz.Scheduler;
-import org.quartz.SchedulerException;
-import org.quartz.TriggerKey;
-import org.quartz.impl.StdSchedulerFactory;
-import org.quartz.impl.jdbcjobstore.JobStoreTX;
-import org.quartz.impl.jdbcjobstore.PostgreSQLDelegate;
-import org.quartz.impl.jdbcjobstore.StdJDBCDelegate;
-import org.quartz.impl.matchers.GroupMatcher;
-import org.quartz.simpl.SimpleThreadPool;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import static
org.apache.dolphinscheduler.common.Constants.ORG_POSTGRESQL_DRIVER;
import static
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS;
import static
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK;
@@ -61,6 +28,7 @@ import static
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_I
import static
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD;
import static
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_TABLEPREFIX;
import static
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_USEPROPERTIES;
+import static
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_SCHEDULER_BATCHTRIGGERACQUISTITIONMAXCOUNT;
import static
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_SCHEDULER_INSTANCEID;
import static
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_SCHEDULER_INSTANCENAME;
import static
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON;
@@ -70,6 +38,7 @@ import static
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_THREADPOOL
import static
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_THREADPOOL_THREADPRIORITY;
import static org.apache.dolphinscheduler.common.Constants.PROJECT_ID;
import static
org.apache.dolphinscheduler.common.Constants.QUARTZ_ACQUIRETRIGGERSWITHINLOCK;
+import static
org.apache.dolphinscheduler.common.Constants.QUARTZ_BATCHTRIGGERACQUISTITIONMAXCOUNT;
import static
org.apache.dolphinscheduler.common.Constants.QUARTZ_CLUSTERCHECKININTERVAL;
import static org.apache.dolphinscheduler.common.Constants.QUARTZ_DATASOURCE;
import static org.apache.dolphinscheduler.common.Constants.QUARTZ_INSTANCEID;
@@ -91,6 +60,42 @@ import static org.quartz.CronScheduleBuilder.cronSchedule;
import static org.quartz.JobBuilder.newJob;
import static org.quartz.TriggerBuilder.newTrigger;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+import org.apache.dolphinscheduler.dao.entity.Schedule;
+import org.apache.dolphinscheduler.service.exceptions.ServiceException;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.lang.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.quartz.CronTrigger;
+import org.quartz.Job;
+import org.quartz.JobDetail;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.TriggerKey;
+import org.quartz.impl.StdSchedulerFactory;
+import org.quartz.impl.jdbcjobstore.JobStoreTX;
+import org.quartz.impl.jdbcjobstore.PostgreSQLDelegate;
+import org.quartz.impl.jdbcjobstore.StdJDBCDelegate;
+import org.quartz.impl.matchers.GroupMatcher;
+import org.quartz.simpl.SimpleThreadPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* single Quartz executors instance
*/
@@ -170,6 +175,8 @@ public class QuartzExecutors {
properties.setProperty(ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK,
conf.getString(ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK,
QUARTZ_ACQUIRETRIGGERSWITHINLOCK));
properties.setProperty(ORG_QUARTZ_JOBSTORE_DATASOURCE,
conf.getString(ORG_QUARTZ_JOBSTORE_DATASOURCE, QUARTZ_DATASOURCE));
properties.setProperty(ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS,
conf.getString(ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS,
HikariConnectionProvider.class.getName()));
+
properties.setProperty(ORG_QUARTZ_SCHEDULER_BATCHTRIGGERACQUISTITIONMAXCOUNT,
+
conf.getString(ORG_QUARTZ_SCHEDULER_BATCHTRIGGERACQUISTITIONMAXCOUNT,
QUARTZ_BATCHTRIGGERACQUISTITIONMAXCOUNT));
schedulerFactory.initialize(properties);
scheduler = schedulerFactory.getScheduler();
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
index 8d630be..d9168d4 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
@@ -32,7 +32,7 @@ public class TaskPriorityQueueImpl implements
TaskPriorityQueue<TaskPriority> {
/**
* queue size
*/
- private static final Integer QUEUE_MAX_SIZE = 3000;
+ private static final Integer QUEUE_MAX_SIZE = 10000;
/**
* queue
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
index f384678..775fc10 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
@@ -330,7 +330,7 @@ public class RegistryClient {
if (serverPath.startsWith(serverType + UNDERLINE + host)) {
String server = REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS +
SINGLE_SLASH + serverPath;
remove(server);
- logger.info("{} server {} deleted from zk dead server path
success", serverType, host);
+ logger.info("{} server {} deleted from zk dead server path:{}
success", serverType, host,server);
}
}
}
diff --git
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/PropertyUtils.java
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/PropertyUtils.java
index 2b1adff..8a4068c 100644
---
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/PropertyUtils.java
+++
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/PropertyUtils.java
@@ -181,4 +181,7 @@ public class PropertyUtils {
properties.setProperty(key, value);
}
+ public static String dumpProperties() {
+ return properties.toString();
+ }
}