This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 9856453cd7 [DSIP-82][Master/Worker] Use FAILOVER_FINISH_NODES to avoid
duplicate workflow/task when failover (#16821)
9856453cd7 is described below
commit 9856453cd7a9a51ce6934c8b89630f9d025e74a7
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Nov 20 16:04:11 2024 +0800
[DSIP-82][Master/Worker] Use FAILOVER_FINISH_NODES to avoid duplicate
workflow/task when failover (#16821)
---
docs/docs/en/architecture/configuration.md | 1 -
docs/docs/en/guide/upgrade/incompatible.md | 1 +
docs/docs/zh/architecture/configuration.md | 47 +++---
docs/docs/zh/guide/upgrade/incompatible.md | 1 +
.../server/master/cluster/BaseServerMetadata.java | 5 +
.../master/cluster/ClusterStateMonitors.java | 12 +-
.../server/master/cluster/IClusters.java | 3 +
.../server/master/cluster/MasterClusters.java | 16 +-
.../master/cluster/MasterServerMetadata.java | 7 +-
.../server/master/cluster/WorkerClusters.java | 6 +
.../master/cluster/WorkerServerMetadata.java | 3 +-
.../server/master/config/MasterConfig.java | 4 -
.../engine/system/event/AbstractSystemEvent.java | 6 +
.../engine/system/event/MasterFailoverEvent.java | 24 ++-
.../engine/system/event/WorkerFailoverEvent.java | 24 ++-
.../master/failover/FailoverCoordinator.java | 130 +++++++++++---
.../registry/MasterConnectionStateListener.java | 13 +-
.../master/registry/MasterHeartBeatTask.java | 20 ++-
.../master/registry/MasterRegistryClient.java | 5 +-
.../server/master/registry/MasterStopStrategy.java | 58 -------
.../master/registry/MasterWaitingStrategy.java | 116 -------------
.../src/main/resources/application.yaml | 3 -
.../src/test/resources/application.yaml | 3 -
.../registry/api/ConnectStrategy.java | 2 -
.../registry/api/ConnectStrategyProperties.java | 31 ----
.../registry/api/RegistryClient.java | 33 ++++
.../registry/api/StrategyType.java | 25 ---
.../registry/api/enums/RegistryNodeType.java | 2 +
.../registry/api/utils/RegistryUtils.java | 15 +-
.../server/worker/config/WorkerConfig.java | 3 -
.../worker/registry/WorkerConnectStrategy.java | 24 ---
.../registry/WorkerConnectionStateListener.java | 16 +-
.../worker/registry/WorkerRegistryClient.java | 8 +-
.../server/worker/registry/WorkerStopStrategy.java | 52 ------
.../worker/registry/WorkerWaitingStrategy.java | 131 ---------------
.../server/worker/task/WorkerHeartBeatTask.java | 16 +-
.../src/main/resources/application.yaml | 3 -
.../WorkerConnectionStateListenerTest.java | 21 +--
.../worker/registry/WorkerRegistryClientTest.java | 2 -
.../server/worker/registry/WorkerStrategyTest.java | 187 ---------------------
40 files changed, 304 insertions(+), 775 deletions(-)
diff --git a/docs/docs/en/architecture/configuration.md
b/docs/docs/en/architecture/configuration.md
index 86d4357e1b..fb8613d388 100644
--- a/docs/docs/en/architecture/configuration.md
+++ b/docs/docs/en/architecture/configuration.md
@@ -294,7 +294,6 @@ Location: `master-server/conf/application.yaml`
| master.server-load-protection.max-disk-usage-percentage-thresholds
| 0.7 | Master max disk usage , when the master's disk
usage is smaller then this value, master server can execute workflow.
|
| master.failover-interval
| 10 | failover interval, the unit is minute
|
| master.kill-application-when-task-failover
| true | whether to kill yarn/k8s application when
failover taskInstance
|
-| master.registry-disconnect-strategy.strategy
| stop | Used when the master disconnect from registry,
default value: stop. Optional values include stop, waiting
|
| master.registry-disconnect-strategy.max-waiting-time
| 100s | Used when the master disconnect from registry,
and the disconnect strategy is waiting, this config means the master will
waiting to reconnect to registry in given times, and after the waiting times,
if the master still cannot connect to registry, will stop itself, if the value
is 0s, the Master will wait infinitely |
| master.worker-group-refresh-interval
| 10s | The interval to refresh worker group from db
to memory
|
| master.command-fetch-strategy.type
| ID_SLOT_BASED | The command fetch strategy, only support
`ID_SLOT_BASED`
|
diff --git a/docs/docs/en/guide/upgrade/incompatible.md
b/docs/docs/en/guide/upgrade/incompatible.md
index c263d97ac4..91abd04811 100644
--- a/docs/docs/en/guide/upgrade/incompatible.md
+++ b/docs/docs/en/guide/upgrade/incompatible.md
@@ -33,4 +33,5 @@ This document records the incompatible updates between each
version. You need to
* Uniformly name `process` in code as `workflow`
([#16515])(https://github.com/apache/dolphinscheduler/pull/16515)
* Deprecated upgrade code of 1.x and 2.x
([#16543])(https://github.com/apache/dolphinscheduler/pull/16543)
* Remove the `Data Quality` module
([#16794])(https://github.com/apache/dolphinscheduler/pull/16794)
+* Remove the `registry-disconnect-strategy` in `application.yaml`
([#16821])(https://github.com/apache/dolphinscheduler/pull/16821)
diff --git a/docs/docs/zh/architecture/configuration.md
b/docs/docs/zh/architecture/configuration.md
index 8eed519f78..c76e869f72 100644
--- a/docs/docs/zh/architecture/configuration.md
+++ b/docs/docs/zh/architecture/configuration.md
@@ -276,30 +276,28 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId
位置:`master-server/conf/application.yaml`
-| 参数
| 默认值 |
描述
|
-|-----------------------------------------------------------------------------|------------------------------|------------------------------------------------------------------------------------------------------------------------------------------|
-| master.listen-port
| 5678 | master监听端口
|
-| master.pre-exec-threads
| 10 | master准备执行任务的数量,用于限制并行的command
|
-| master.exec-threads
| 100 | master工作线程数量,用于限制并行的流程实例数量
|
-| master.dispatch-task-number
| 3 | master每个批次的派发任务数量
|
-| master.worker-load-balancer-configuration-properties.type
| DYNAMIC_WEIGHTED_ROUND_ROBIN | Master
将会使用Worker的动态CPU/Memory/线程池使用率来计算Worker的负载,负载越低的worker将会有更高的机会被分发任务
|
-| master.max-heartbeat-interval
| 10s | master最大心跳间隔
|
-| master.task-commit-retry-times
| 5 | 任务重试次数
|
-| master.task-commit-interval
| 1000 | 任务提交间隔,单位为毫秒
|
-| master.state-wheel-interval
| 5 | 轮询检查状态时间
|
-| master.server-load-protection.enabled
| true | 是否开启系统保护策略
|
-| master.server-load-protection.max-system-cpu-usage-percentage-thresholds
| 0.7 |
master最大系统cpu使用值,只有当前系统cpu使用值低于最大系统cpu使用值,master服务才能调度任务. 默认值为0.7:
会使用70%的操作系统CPU |
-| master.server-load-protection.max-jvm-cpu-usage-percentage-thresholds
| 0.7 | master最大JVM cpu使用值,只有当前JVM cpu使用值低于最大JVM
cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的JVM CPU
|
-| master.server-load-protection.max-system-memory-usage-percentage-thresholds
| 0.7 | master最大系统
内存使用值,只有当前系统内存使用值低于最大系统内存使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统内存
|
-| master.server-load-protection.max-disk-usage-percentage-thresholds
| 0.7 |
master最大系统磁盘使用值,只有当前系统磁盘使用值低于最大系统磁盘使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统磁盘空间
|
-| master.failover-interval
| 10 | failover间隔,单位为分钟
|
-| master.kill-application-when-task-failover
| true | 当任务实例failover时,是否kill掉yarn或k8s application
|
-| master.registry-disconnect-strategy.strategy
| stop | 当Master与注册中心失联之后采取的策略, 默认值是: stop. 可选值包括:
stop, waiting
|
-| master.registry-disconnect-strategy.max-waiting-time
| 100s | 当Master与注册中心失联之后重连时间,
之后当strategy为waiting时,该值生效。 该值表示当Master与注册中心失联时会在给定时间之内进行重连,
在给定时间之内重连失败将会停止自己,在重连时,Master会丢弃目前正在执行的工作流,值为0表示会无限期等待 |
-| master.master.worker-group-refresh-interval
| 10s | 定期将workerGroup从数据库中同步到内存的时间间隔
|
-| master.command-fetch-strategy.type
| ID_SLOT_BASED | Command拉取策略, 目前仅支持 `ID_SLOT_BASED`
|
-| master.command-fetch-strategy.config.id-step
| 1 | 数据库中t_ds_command的id自增步长
|
-| master.command-fetch-strategy.config.fetch-size
| 10 | master拉取command数量
|
+| 参数
| 默认值 | 描述
|
+|-----------------------------------------------------------------------------|------------------------------|-----------------------------------------------------------------------------------------|
+| master.listen-port
| 5678 | master监听端口
|
+| master.pre-exec-threads
| 10 | master准备执行任务的数量,用于限制并行的command
|
+| master.exec-threads
| 100 | master工作线程数量,用于限制并行的流程实例数量
|
+| master.dispatch-task-number
| 3 | master每个批次的派发任务数量
|
+| master.worker-load-balancer-configuration-properties.type
| DYNAMIC_WEIGHTED_ROUND_ROBIN | Master
将会使用Worker的动态CPU/Memory/线程池使用率来计算Worker的负载,负载越低的worker将会有更高的机会被分发任务
|
+| master.max-heartbeat-interval
| 10s | master最大心跳间隔
|
+| master.task-commit-retry-times
| 5 | 任务重试次数
|
+| master.task-commit-interval
| 1000 | 任务提交间隔,单位为毫秒
|
+| master.state-wheel-interval
| 5 | 轮询检查状态时间
|
+| master.server-load-protection.enabled
| true | 是否开启系统保护策略
|
+| master.server-load-protection.max-system-cpu-usage-percentage-thresholds
| 0.7 |
master最大系统cpu使用值,只有当前系统cpu使用值低于最大系统cpu使用值,master服务才能调度任务. 默认值为0.7:
会使用70%的操作系统CPU |
+| master.server-load-protection.max-jvm-cpu-usage-percentage-thresholds
| 0.7 | master最大JVM cpu使用值,只有当前JVM cpu使用值低于最大JVM
cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的JVM CPU |
+| master.server-load-protection.max-system-memory-usage-percentage-thresholds
| 0.7 | master最大系统
内存使用值,只有当前系统内存使用值低于最大系统内存使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统内存 |
+| master.server-load-protection.max-disk-usage-percentage-thresholds
| 0.7 |
master最大系统磁盘使用值,只有当前系统磁盘使用值低于最大系统磁盘使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统磁盘空间
|
+| master.failover-interval
| 10 | failover间隔,单位为分钟
|
+| master.kill-application-when-task-failover
| true | 当任务实例failover时,是否kill掉yarn或k8s application
|
+| master.master.worker-group-refresh-interval
| 10s | 定期将workerGroup从数据库中同步到内存的时间间隔
|
+| master.command-fetch-strategy.type
| ID_SLOT_BASED | Command拉取策略, 目前仅支持 `ID_SLOT_BASED`
|
+| master.command-fetch-strategy.config.id-step
| 1 | 数据库中t_ds_command的id自增步长
|
+| master.command-fetch-strategy.config.fetch-size
| 10 | master拉取command数量
|
## Worker Server相关配置
@@ -319,7 +317,6 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId
| worker.server-load-protection.max-disk-usage-percentage-thresholds
| 0.7 | worker最大系统磁盘使用值,只有当前系统磁盘使用值低于最大系统磁盘使用值,worker服务才能接收任务. 默认值为0.7:
会使用70%的操作系统磁盘空间 |
| worker.alert-listen-host
| localhost | alert监听host
|
| worker.alert-listen-port
| 50052 | alert监听端口
|
-| worker.registry-disconnect-strategy.strategy
| stop | 当Worker与注册中心失联之后采取的策略, 默认值是: stop. 可选值包括: stop, waiting
|
| worker.registry-disconnect-strategy.max-waiting-time
| 100s | 当Worker与注册中心失联之后重连时间, 之后当strategy为waiting时,该值生效。
该值表示当Worker与注册中心失联时会在给定时间之内进行重连,
在给定时间之内重连失败将会停止自己,在重连时,Worker会丢弃kill正在执行的任务。值为0表示会无限期等待 |
| worker.task-execute-threads-full-policy
| REJECT | 如果是 REJECT, 当Worker中等待队列中的任务数达到exec-threads时,
Worker将会拒绝接下来新接收的任务,Master将会重新分发该任务; 如果是 CONTINUE,
Worker将会接收任务,放入等待队列中等待空闲线程去执行该任务 |
| worker.tenant-config.auto-create-tenant-enabled
| true |
租户对应于系统的用户,由worker提交作业.如果系统没有该用户,则在参数worker.tenant.auto.create为true后自动创建。
|
diff --git a/docs/docs/zh/guide/upgrade/incompatible.md
b/docs/docs/zh/guide/upgrade/incompatible.md
index f1fb24d9c9..4ba7d4bf95 100644
--- a/docs/docs/zh/guide/upgrade/incompatible.md
+++ b/docs/docs/zh/guide/upgrade/incompatible.md
@@ -31,4 +31,5 @@
* 统一代码中的 `process` 为 `workflow`
([#16515])(https://github.com/apache/dolphinscheduler/pull/16515)
* 废弃从 1.x 至 2.x 的升级代码
([#16543])(https://github.com/apache/dolphinscheduler/pull/16543)
* 移除 `数据质量` 模块
([#16794])(https://github.com/apache/dolphinscheduler/pull/16794)
+* 在`application.yaml`中移除`registry-disconnect-strategy`配置
([#16821])(https://github.com/apache/dolphinscheduler/pull/16821)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/BaseServerMetadata.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/BaseServerMetadata.java
index 002cd2903f..c9b89eee70 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/BaseServerMetadata.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/BaseServerMetadata.java
@@ -20,12 +20,17 @@ package org.apache.dolphinscheduler.server.master.cluster;
import org.apache.dolphinscheduler.common.enums.ServerStatus;
import lombok.Data;
+import lombok.ToString;
import lombok.experimental.SuperBuilder;
@Data
+@ToString
@SuperBuilder
public abstract class BaseServerMetadata implements IClusters.IServerMetadata {
+ // The server startup time in milliseconds.
+ private final long serverStartupTime;
+
private final String address;
private final double cpuUsage;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterStateMonitors.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterStateMonitors.java
index bc78cb80a9..06705a9f39 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterStateMonitors.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterStateMonitors.java
@@ -46,12 +46,16 @@ public class ClusterStateMonitors {
log.info("ClusterStateMonitors started...");
}
- void masterRemoved(MasterServerMetadata masterServer) {
-
systemEventBus.publish(MasterFailoverEvent.of(masterServer.getAddress(), new
Date()));
+ void masterRemoved(final MasterServerMetadata masterServer) {
+ // We set a delay of 30 seconds for the master failover event
+ // If the master can reconnect to registry within 30 seconds, the
master will skip failover.
+ systemEventBus.publish(MasterFailoverEvent.of(masterServer, new
Date(), 30_000));
}
- void workerRemoved(WorkerServerMetadata workerServer) {
-
systemEventBus.publish(WorkerFailoverEvent.of(workerServer.getAddress(), new
Date()));
+ void workerRemoved(final WorkerServerMetadata workerServer) {
+ // We set a delay of 30 seconds for the worker failover event
+ // If the worker can reconnect to registry within 30 seconds, the
worker will skip failover.
+ systemEventBus.publish(WorkerFailoverEvent.of(workerServer, new
Date(), 30_000));
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/IClusters.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/IClusters.java
index 5bcc85324e..3b97ea8529 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/IClusters.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/IClusters.java
@@ -20,11 +20,14 @@ package org.apache.dolphinscheduler.server.master.cluster;
import org.apache.dolphinscheduler.common.enums.ServerStatus;
import java.util.List;
+import java.util.Optional;
public interface IClusters<S extends IClusters.IServerMetadata> {
List<S> getServers();
+ Optional<S> getServer(final String address);
+
void registerListener(IClustersChangeListener<S> listener);
interface IServerMetadata {
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterClusters.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterClusters.java
index 568e8f1dc8..bb7d205c4c 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterClusters.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterClusters.java
@@ -26,6 +26,7 @@ import org.apache.commons.collections4.list.UnmodifiableList;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
@@ -50,6 +51,11 @@ public class MasterClusters extends
AbstractClusterSubscribeListener<MasterServe
return UnmodifiableList.unmodifiableList(new
ArrayList<>(masterServerMap.values()));
}
+ @Override
+ public Optional<MasterServerMetadata> getServer(final String address) {
+ return Optional.ofNullable(masterServerMap.get(address));
+ }
+
public List<MasterServerMetadata> getNormalServers() {
List<MasterServerMetadata> normalMasterServers =
masterServerMap.values()
.stream()
@@ -59,12 +65,12 @@ public class MasterClusters extends
AbstractClusterSubscribeListener<MasterServe
}
@Override
- public void registerListener(IClustersChangeListener<MasterServerMetadata>
listener) {
+ public void registerListener(final
IClustersChangeListener<MasterServerMetadata> listener) {
masterClusterChangeListeners.add(listener);
}
@Override
- MasterServerMetadata parseServerFromHeartbeat(String masterHeartBeatJson) {
+ MasterServerMetadata parseServerFromHeartbeat(final String
masterHeartBeatJson) {
MasterHeartBeat masterHeartBeat =
JSONUtils.parseObject(masterHeartBeatJson, MasterHeartBeat.class);
if (masterHeartBeat == null) {
return null;
@@ -73,7 +79,7 @@ public class MasterClusters extends
AbstractClusterSubscribeListener<MasterServe
}
@Override
- public void onServerAdded(MasterServerMetadata masterServer) {
+ public void onServerAdded(final MasterServerMetadata masterServer) {
masterServerMap.put(masterServer.getAddress(), masterServer);
for (IClustersChangeListener<MasterServerMetadata> listener :
masterClusterChangeListeners) {
listener.onServerAdded(masterServer);
@@ -81,7 +87,7 @@ public class MasterClusters extends
AbstractClusterSubscribeListener<MasterServe
}
@Override
- public void onServerRemove(MasterServerMetadata masterServer) {
+ public void onServerRemove(final MasterServerMetadata masterServer) {
masterServerMap.remove(masterServer.getAddress());
for (IClustersChangeListener<MasterServerMetadata> listener :
masterClusterChangeListeners) {
listener.onServerRemove(masterServer);
@@ -89,7 +95,7 @@ public class MasterClusters extends
AbstractClusterSubscribeListener<MasterServe
}
@Override
- public void onServerUpdate(MasterServerMetadata masterServer) {
+ public void onServerUpdate(final MasterServerMetadata masterServer) {
masterServerMap.put(masterServer.getAddress(), masterServer);
for (IClustersChangeListener<MasterServerMetadata> listener :
masterClusterChangeListeners) {
listener.onServerUpdate(masterServer);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java
index 32dd0cad9e..1005de4c50 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java
@@ -21,15 +21,18 @@ import
org.apache.dolphinscheduler.common.model.MasterHeartBeat;
import lombok.Data;
import lombok.EqualsAndHashCode;
+import lombok.ToString;
import lombok.experimental.SuperBuilder;
@Data
+@ToString(callSuper = true)
@SuperBuilder
@EqualsAndHashCode(callSuper = true)
public class MasterServerMetadata extends BaseServerMetadata implements
Comparable<MasterServerMetadata> {
- public static MasterServerMetadata parseFromHeartBeat(MasterHeartBeat
masterHeartBeat) {
+ public static MasterServerMetadata parseFromHeartBeat(final
MasterHeartBeat masterHeartBeat) {
return MasterServerMetadata.builder()
+ .serverStartupTime(masterHeartBeat.getStartupTime())
.address(masterHeartBeat.getHost() + ":" +
masterHeartBeat.getPort())
.cpuUsage(masterHeartBeat.getCpuUsage())
.memoryUsage(masterHeartBeat.getMemoryUsage())
@@ -39,7 +42,7 @@ public class MasterServerMetadata extends BaseServerMetadata
implements Comparab
// Use the master address to sort the master server
@Override
- public int compareTo(MasterServerMetadata o) {
+ public int compareTo(final MasterServerMetadata o) {
return this.getAddress().compareTo(o.getAddress());
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClusters.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClusters.java
index 363dee0222..291b2f1758 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClusters.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClusters.java
@@ -30,6 +30,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
@@ -53,6 +54,11 @@ public class WorkerClusters extends
AbstractClusterSubscribeListener<WorkerServe
return UnmodifiableList.unmodifiableList(new
ArrayList<>(workerMapping.values()));
}
+ @Override
+ public Optional<WorkerServerMetadata> getServer(final String address) {
+ return Optional.ofNullable(workerMapping.get(address));
+ }
+
public List<String> getWorkerServerAddressByGroup(String workerGroup) {
if (WorkerGroupUtils.getDefaultWorkerGroup().equals(workerGroup)) {
return UnmodifiableList.unmodifiableList(new
ArrayList<>(workerMapping.keySet()));
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerServerMetadata.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerServerMetadata.java
index 61cef98a00..de9a03506d 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerServerMetadata.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerServerMetadata.java
@@ -38,8 +38,9 @@ public class WorkerServerMetadata extends BaseServerMetadata {
private final double taskThreadPoolUsage;
- public static WorkerServerMetadata parseFromHeartBeat(WorkerHeartBeat
workerHeartBeat) {
+ public static WorkerServerMetadata parseFromHeartBeat(final
WorkerHeartBeat workerHeartBeat) {
return WorkerServerMetadata.builder()
+ .serverStartupTime(workerHeartBeat.getStartupTime())
.address(workerHeartBeat.getHost() + ":" +
workerHeartBeat.getPort())
.cpuUsage(workerHeartBeat.getCpuUsage())
.memoryUsage(workerHeartBeat.getMemoryUsage())
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index 9ff59e0cdb..bc1cfef2ec 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.server.master.config;
import org.apache.dolphinscheduler.common.utils.NetUtils;
-import org.apache.dolphinscheduler.registry.api.ConnectStrategyProperties;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import
org.apache.dolphinscheduler.server.master.cluster.loadbalancer.WorkerLoadBalancerConfigurationProperties;
@@ -60,8 +59,6 @@ public class MasterConfig implements Validator {
private MasterServerLoadProtection serverLoadProtection = new
MasterServerLoadProtection();
- private ConnectStrategyProperties registryDisconnectStrategy = new
ConnectStrategyProperties();
-
private Duration workerGroupRefreshInterval = Duration.ofSeconds(10L);
private CommandFetchStrategy commandFetchStrategy = new
CommandFetchStrategy();
@@ -120,7 +117,6 @@ public class MasterConfig implements Validator {
"\n workflow-event-bus-fire-thread-count -> " +
workflowEventBusFireThreadCount +
"\n max-heartbeat-interval -> " +
maxHeartbeatInterval +
"\n server-load-protection -> " +
serverLoadProtection +
- "\n registry-disconnect-strategy -> " +
registryDisconnectStrategy +
"\n master-address -> " + masterAddress +
"\n master-registry-path: " + masterRegistryPath +
"\n worker-group-refresh-interval: " +
workerGroupRefreshInterval +
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/AbstractSystemEvent.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/AbstractSystemEvent.java
index 0df2a87cdf..17c1dfb296 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/AbstractSystemEvent.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/AbstractSystemEvent.java
@@ -27,8 +27,14 @@ public abstract class AbstractSystemEvent extends
AbstractDelayEvent {
super(delayTime);
}
+ /**
+ * The event happen time.
+ */
public abstract Date getEventTime();
+ /**
+ * The event type.
+ */
public abstract SystemEventType getEventType();
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/MasterFailoverEvent.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/MasterFailoverEvent.java
index 9eab103f72..f2086a6569 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/MasterFailoverEvent.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/MasterFailoverEvent.java
@@ -19,6 +19,8 @@ package
org.apache.dolphinscheduler.server.master.engine.system.event;
import static com.google.common.base.Preconditions.checkNotNull;
+import org.apache.dolphinscheduler.server.master.cluster.MasterServerMetadata;
+
import java.util.Date;
import lombok.Getter;
@@ -26,20 +28,23 @@ import lombok.Getter;
@Getter
public class MasterFailoverEvent extends AbstractSystemEvent {
- private final String masterAddress;
+ private final MasterServerMetadata masterServerMetadata;
private final Date eventTime;
- private MasterFailoverEvent(final String masterAddress,
- final Date eventTime) {
- super(eventTime.getTime());
- this.masterAddress = masterAddress;
+ private MasterFailoverEvent(final MasterServerMetadata
masterServerMetadata,
+ final Date eventTime,
+ final long delayTime) {
+ super(delayTime);
+ this.masterServerMetadata = masterServerMetadata;
this.eventTime = eventTime;
}
- public static MasterFailoverEvent of(final String masterAddress, final
Date eventTime) {
- checkNotNull(masterAddress);
+ public static MasterFailoverEvent of(final MasterServerMetadata
masterServerMetadata,
+ final Date eventTime,
+ final long delayTime) {
+ checkNotNull(masterServerMetadata);
checkNotNull(eventTime);
- return new MasterFailoverEvent(masterAddress, eventTime);
+ return new MasterFailoverEvent(masterServerMetadata, eventTime,
delayTime);
}
@Override
@@ -50,8 +55,9 @@ public class MasterFailoverEvent extends AbstractSystemEvent {
@Override
public String toString() {
return "MasterFailoverEvent{" +
- "masterAddress='" + masterAddress + '\'' +
+ "masterServerMetadata='" + masterServerMetadata + '\'' +
", eventTime=" + eventTime +
+ ", delayTime=" + delayTime +
'}';
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/WorkerFailoverEvent.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/WorkerFailoverEvent.java
index d985f0546b..4dd33a2bc2 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/WorkerFailoverEvent.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/WorkerFailoverEvent.java
@@ -19,6 +19,8 @@ package
org.apache.dolphinscheduler.server.master.engine.system.event;
import static com.google.common.base.Preconditions.checkNotNull;
+import org.apache.dolphinscheduler.server.master.cluster.WorkerServerMetadata;
+
import java.util.Date;
import lombok.Getter;
@@ -26,20 +28,23 @@ import lombok.Getter;
@Getter
public class WorkerFailoverEvent extends AbstractSystemEvent {
- private final String workerAddress;
+ private final WorkerServerMetadata workerServerMetadata;
private final Date eventTime;
- private WorkerFailoverEvent(final String workerAddress,
- final Date eventTime) {
- super(eventTime.getTime());
- this.workerAddress = workerAddress;
+ private WorkerFailoverEvent(final WorkerServerMetadata
workerServerMetadata,
+ final Date eventTime,
+ final long delayTime) {
+ super(delayTime);
+ this.workerServerMetadata = workerServerMetadata;
this.eventTime = eventTime;
}
- public static WorkerFailoverEvent of(final String workerAddress, final
Date eventTime) {
- checkNotNull(workerAddress);
+ public static WorkerFailoverEvent of(final WorkerServerMetadata
workerServerMetadata,
+ final Date eventTime,
+ final long delayTime) {
+ checkNotNull(workerServerMetadata);
checkNotNull(eventTime);
- return new WorkerFailoverEvent(workerAddress, eventTime);
+ return new WorkerFailoverEvent(workerServerMetadata, eventTime,
delayTime);
}
@Override
@@ -50,8 +55,9 @@ public class WorkerFailoverEvent extends AbstractSystemEvent {
@Override
public String toString() {
return "WorkerFailoverEvent{" +
- "workerAddress='" + workerAddress + '\'' +
+ "workerServerMetadata='" + workerServerMetadata + '\'' +
", eventTime=" + eventTime +
+ ", delayTime=" + delayTime +
'}';
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java
index 81b1e1e151..7bb24907ca 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java
@@ -22,6 +22,10 @@ import
org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
+import org.apache.dolphinscheduler.registry.api.utils.RegistryUtils;
+import org.apache.dolphinscheduler.server.master.cluster.ClusterManager;
+import org.apache.dolphinscheduler.server.master.cluster.MasterServerMetadata;
+import org.apache.dolphinscheduler.server.master.cluster.WorkerServerMetadata;
import org.apache.dolphinscheduler.server.master.engine.IWorkflowRepository;
import
org.apache.dolphinscheduler.server.master.engine.system.event.GlobalMasterFailoverEvent;
import
org.apache.dolphinscheduler.server.master.engine.system.event.MasterFailoverEvent;
@@ -29,11 +33,11 @@ import
org.apache.dolphinscheduler.server.master.engine.system.event.WorkerFailo
import
org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
import
org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
-import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.time.StopWatch;
import java.util.Date;
import java.util.List;
+import java.util.Optional;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
@@ -49,6 +53,9 @@ public class FailoverCoordinator implements
IFailoverCoordinator {
@Autowired
private RegistryClient registryClient;
+ @Autowired
+ private ClusterManager clusterManager;
+
@Autowired
private IWorkflowRepository workflowRepository;
@@ -65,17 +72,23 @@ public class FailoverCoordinator implements
IFailoverCoordinator {
private WorkflowFailover workflowFailover;
@Override
- public void globalMasterFailover(GlobalMasterFailoverEvent
globalMasterFailoverEvent) {
+ public void globalMasterFailover(final GlobalMasterFailoverEvent
globalMasterFailoverEvent) {
final StopWatch failoverTimeCost = StopWatch.createStarted();
log.info("Global master failover starting");
- final List<MasterFailoverEvent> masterFailoverEvents =
workflowInstanceDao.queryNeedFailoverMasters()
- .stream()
- .map(masterAddress -> MasterFailoverEvent.of(masterAddress,
globalMasterFailoverEvent.getEventTime()))
- .collect(Collectors.toList());
-
- if (CollectionUtils.isNotEmpty(masterFailoverEvents)) {
- log.info("There are {} masters need to failover",
masterFailoverEvents.size());
- masterFailoverEvents.forEach(this::failoverMaster);
+ final List<String> masterAddressWhichContainsUnFinishedWorkflow =
+ workflowInstanceDao.queryNeedFailoverMasters();
+ for (final String masterAddress :
masterAddressWhichContainsUnFinishedWorkflow) {
+ final Optional<MasterServerMetadata> aliveMasterOptional =
+
clusterManager.getMasterClusters().getServer(masterAddress);
+ if (aliveMasterOptional.isPresent()) {
+ final MasterServerMetadata aliveMasterServerMetadata =
aliveMasterOptional.get();
+ log.info("The master[{}] is alive, do global master failover
on it", aliveMasterServerMetadata);
+ doMasterFailover(aliveMasterServerMetadata.getAddress(),
+ aliveMasterServerMetadata.getServerStartupTime());
+ } else {
+ log.info("The master[{}] is not alive, do global master
failover on it", masterAddress);
+ doMasterFailover(masterAddress,
globalMasterFailoverEvent.getEventTime().getTime());
+ }
}
failoverTimeCost.stop();
@@ -84,16 +97,55 @@ public class FailoverCoordinator implements
IFailoverCoordinator {
@Override
public void failoverMaster(final MasterFailoverEvent masterFailoverEvent) {
- final StopWatch failoverTimeCost = StopWatch.createStarted();
- final String masterAddress = masterFailoverEvent.getMasterAddress();
- log.info("Master[{}] failover starting", masterAddress);
+ final MasterServerMetadata masterServerMetadata =
masterFailoverEvent.getMasterServerMetadata();
+ log.info("Master[{}] failover starting", masterServerMetadata);
+
+ final Optional<MasterServerMetadata> aliveMasterOptional =
+
clusterManager.getMasterClusters().getServer(masterServerMetadata.getAddress());
+ if (aliveMasterOptional.isPresent()) {
+ final MasterServerMetadata aliveMasterServerMetadata =
aliveMasterOptional.get();
+ if (aliveMasterServerMetadata.getServerStartupTime() ==
masterServerMetadata.getServerStartupTime()) {
+ log.info("The master[{}] is alive, maybe it reconnect to
registry skip failover", masterServerMetadata);
+ } else {
+ log.info("The master[{}] is alive, but the startup time is
different, will failover on {}",
+ masterServerMetadata,
+ aliveMasterServerMetadata);
+ doMasterFailover(aliveMasterServerMetadata.getAddress(),
+ aliveMasterServerMetadata.getServerStartupTime());
+ }
+ } else {
+ log.info("The master[{}] is not alive, will failover",
masterServerMetadata);
+ doMasterFailover(masterServerMetadata.getAddress(),
masterServerMetadata.getServerStartupTime());
+ }
+ }
+ /**
+ * Do master failover.
+ * <p> Will failover the workflow which is scheduled by the master and the
workflow's fire time is before the maxWorkflowFireTime.
+ */
+ private void doMasterFailover(final String masterAddress, final long
masterStartupTime) {
+ // We use lock to avoid multiple master failover at the same time.
+ // Once the workflow has been failovered, then it's state will be
changed to FAILOVER
+ // Once the FAILOVER workflow has been refired, then it's host will be
changed to the new master and have a new
+ // start time.
+ // So if a master has been failovered multiple times, there is no
problem.
+ final StopWatch failoverTimeCost = StopWatch.createStarted();
registryClient.getLock(RegistryNodeType.MASTER_FAILOVER_LOCK.getRegistryPath());
try {
- final List<WorkflowInstance> needFailoverWorkflows =
getFailoverWorkflowsForMaster(masterFailoverEvent);
+ final String failoverFinishedNodePath =
+ RegistryUtils.getFailoverFinishedNodePath(masterAddress,
masterStartupTime);
+ if (registryClient.exists(failoverFinishedNodePath)) {
+ log.error("The master[{}-{}] is exist at: {}, means it has
already been failovered, skip failover",
+ masterAddress,
+ masterStartupTime,
+ failoverFinishedNodePath);
+ return;
+ }
+ final List<WorkflowInstance> needFailoverWorkflows =
+ getFailoverWorkflowsForMaster(masterAddress, new
Date(masterStartupTime));
needFailoverWorkflows.forEach(workflowFailover::failoverWorkflow);
-
failoverTimeCost.stop();
+ registryClient.persist(failoverFinishedNodePath,
String.valueOf(System.currentTimeMillis()));
log.info("Master[{}] failover {} workflows finished, cost: {}/ms",
masterAddress,
needFailoverWorkflows.size(),
@@ -103,10 +155,11 @@ public class FailoverCoordinator implements
IFailoverCoordinator {
}
}
- private List<WorkflowInstance> getFailoverWorkflowsForMaster(final
MasterFailoverEvent masterFailoverEvent) {
+ private List<WorkflowInstance> getFailoverWorkflowsForMaster(final String
masterAddress,
+ final Date
masterCrashTime) {
// todo: use page query
- final List<WorkflowInstance> workflowInstances =
workflowInstanceDao.queryNeedFailoverWorkflowInstances(
- masterFailoverEvent.getMasterAddress());
+ final List<WorkflowInstance> workflowInstances =
+
workflowInstanceDao.queryNeedFailoverWorkflowInstances(masterAddress);
return workflowInstances.stream()
.filter(workflowInstance -> {
@@ -117,25 +170,49 @@ public class FailoverCoordinator implements
IFailoverCoordinator {
// todo: If the first time run workflow have the
restartTime, then we can only check this
final Date restartTime = workflowInstance.getRestartTime();
if (restartTime != null) {
- return
restartTime.before(masterFailoverEvent.getEventTime());
+ return restartTime.before(masterCrashTime);
}
final Date startTime = workflowInstance.getStartTime();
- return
startTime.before(masterFailoverEvent.getEventTime());
+ return startTime.before(masterCrashTime);
})
.collect(Collectors.toList());
}
@Override
public void failoverWorker(final WorkerFailoverEvent workerFailoverEvent) {
- final StopWatch failoverTimeCost = StopWatch.createStarted();
+ final WorkerServerMetadata workerServerMetadata =
workerFailoverEvent.getWorkerServerMetadata();
+ log.info("Worker[{}] failover starting", workerServerMetadata);
+
+ final Optional<WorkerServerMetadata> aliveWorkerOptional =
+
clusterManager.getWorkerClusters().getServer(workerServerMetadata.getAddress());
+ if (aliveWorkerOptional.isPresent()) {
+ final WorkerServerMetadata aliveWorkerServerMetadata =
aliveWorkerOptional.get();
+ if (aliveWorkerServerMetadata.getServerStartupTime() ==
workerServerMetadata.getServerStartupTime()) {
+ log.info("The worker[{}] is alive, maybe it reconnect to
registry skip failover", workerServerMetadata);
+ } else {
+ log.info("The worker[{}] is alive, but the startup time is
different, will failover on {}",
+ workerServerMetadata,
+ aliveWorkerServerMetadata);
+ doWorkerFailover(aliveWorkerServerMetadata.getAddress(),
+ aliveWorkerServerMetadata.getServerStartupTime());
+ }
+ } else {
+ log.info("The worker[{}] is not alive, will failover",
workerServerMetadata);
+ doWorkerFailover(workerServerMetadata.getAddress(),
workerServerMetadata.getServerStartupTime());
+ }
+ }
- final String workerAddress = workerFailoverEvent.getWorkerAddress();
- log.info("Worker[{}] failover starting", workerAddress);
+ private void doWorkerFailover(final String workerAddress, final long
workerCrashTime) {
+ final StopWatch failoverTimeCost = StopWatch.createStarted();
- final List<ITaskExecutionRunnable> needFailoverTasks =
getFailoverTaskForWorker(workerFailoverEvent);
+ final List<ITaskExecutionRunnable> needFailoverTasks =
+ getFailoverTaskForWorker(workerAddress, new
Date(workerCrashTime));
needFailoverTasks.forEach(taskFailover::failoverTask);
+ registryClient.persist(
+ RegistryUtils.getFailoverFinishedNodePath(workerAddress,
workerCrashTime),
+ String.valueOf(System.currentTimeMillis()));
failoverTimeCost.stop();
log.info("Worker[{}] failover {} tasks finished, cost: {}/ms",
workerAddress,
@@ -143,9 +220,8 @@ public class FailoverCoordinator implements
IFailoverCoordinator {
failoverTimeCost.getTime());
}
- private List<ITaskExecutionRunnable> getFailoverTaskForWorker(final
WorkerFailoverEvent workerFailoverEvent) {
- final String workerAddress = workerFailoverEvent.getWorkerAddress();
- final Date workerCrashTime = workerFailoverEvent.getEventTime();
+ private List<ITaskExecutionRunnable> getFailoverTaskForWorker(final String
workerAddress,
+ final Date
workerCrashTime) {
return workflowRepository.getAll()
.stream()
.map(IWorkflowExecutionRunnable::getWorkflowExecutionGraph)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java
index 0a141b69e8..684482b7d2 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java
@@ -20,17 +20,17 @@ package org.apache.dolphinscheduler.server.master.registry;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.registry.api.ConnectionListener;
import org.apache.dolphinscheduler.registry.api.ConnectionState;
+import org.apache.dolphinscheduler.registry.api.RegistryClient;
-import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class MasterConnectionStateListener implements ConnectionListener {
- private final MasterConnectStrategy masterConnectStrategy;
+ private final RegistryClient registryClient;
- public MasterConnectionStateListener(@NonNull MasterConnectStrategy
masterConnectStrategy) {
- this.masterConnectStrategy = masterConnectStrategy;
+ public MasterConnectionStateListener(final RegistryClient registryClient) {
+ this.registryClient = registryClient;
}
@Override
@@ -43,12 +43,13 @@ public class MasterConnectionStateListener implements
ConnectionListener {
case SUSPENDED:
break;
case RECONNECTED:
- masterConnectStrategy.reconnect();
+ log.warn("Master reconnect to registry");
break;
case DISCONNECTED:
- masterConnectStrategy.disconnect();
+ registryClient.getStoppable().stop("Master disconnected from
registry, will stop myself");
break;
default:
+ log.warn("Unknown connection state: {}", state);
}
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
index 23cf638208..88306bfa6c 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
@@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
+import org.apache.dolphinscheduler.registry.api.utils.RegistryUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import
org.apache.dolphinscheduler.server.master.config.MasterServerLoadProtection;
import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
@@ -78,16 +79,27 @@ public class MasterHeartBeatTask extends
BaseHeartBeatTask<MasterHeartBeat> {
}
@Override
- public void writeHeartBeat(MasterHeartBeat masterHeartBeat) {
+ public void writeHeartBeat(final MasterHeartBeat masterHeartBeat) {
+ final String failoverNodePath =
RegistryUtils.getFailoverFinishedNodePath(masterHeartBeat);
+ if (registryClient.exists(failoverNodePath)) {
+ log.warn("The master: {} is under {}, means it has been failover
will close myself",
+ masterHeartBeat,
+ failoverNodePath);
+ registryClient
+ .getStoppable()
+ .stop("The master exist: " + failoverNodePath + ", means
it has been failover will close myself");
+ return;
+ }
String masterHeartBeatJson = JSONUtils.toJsonString(masterHeartBeat);
registryClient.persistEphemeral(heartBeatPath, masterHeartBeatJson);
MasterServerMetrics.incMasterHeartbeatCount();
log.debug("Success write master heartBeatInfo into registry,
masterRegistryPath: {}, heartBeatInfo: {}",
- heartBeatPath, masterHeartBeatJson);
+ heartBeatPath,
+ masterHeartBeatJson);
}
- private ServerStatus getServerStatus(SystemMetrics systemMetrics,
- MasterServerLoadProtection
masterServerLoadProtection) {
+ private ServerStatus getServerStatus(final SystemMetrics systemMetrics,
+ final MasterServerLoadProtection
masterServerLoadProtection) {
return masterServerLoadProtection.isOverload(systemMetrics) ?
ServerStatus.BUSY : ServerStatus.NORMAL;
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index ea7da82e2b..94e2dc0513 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -53,9 +53,6 @@ public class MasterRegistryClient implements AutoCloseable {
@Autowired
private MetricsProvider metricsProvider;
- @Autowired
- private MasterConnectStrategy masterConnectStrategy;
-
private MasterHeartBeatTask masterHeartBeatTask;
public void start() {
@@ -63,7 +60,7 @@ public class MasterRegistryClient implements AutoCloseable {
this.masterHeartBeatTask = new MasterHeartBeatTask(masterConfig,
metricsProvider, registryClient);
// master registry
registry();
- registryClient.addConnectionStateListener(new
MasterConnectionStateListener(masterConnectStrategy));
+ registryClient.addConnectionStateListener(new
MasterConnectionStateListener(registryClient));
} catch (Exception e) {
throw new RegistryException("Master registry client start up
error", e);
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterStopStrategy.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterStopStrategy.java
deleted file mode 100644
index bbc38d35fe..0000000000
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterStopStrategy.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.registry;
-
-import org.apache.dolphinscheduler.registry.api.RegistryClient;
-import org.apache.dolphinscheduler.registry.api.StrategyType;
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.stereotype.Service;
-
-/**
- * This strategy will stop the master server, when disconnected from {@link
org.apache.dolphinscheduler.registry.api.Registry}.
- */
-@Service
-@ConditionalOnProperty(prefix = "master.registry-disconnect-strategy", name =
"strategy", havingValue = "stop", matchIfMissing = true)
-@Slf4j
-public class MasterStopStrategy implements MasterConnectStrategy {
-
- @Autowired
- private RegistryClient registryClient;
- @Autowired
- private MasterConfig masterConfig;
-
- @Override
- public void disconnect() {
- registryClient.getStoppable()
- .stop("Master disconnected from registry, will stop myself due
to the stop strategy");
- }
-
- @Override
- public void reconnect() {
- log.warn("The current connect strategy is stop, so the master will not
reconnect to registry");
- }
-
- @Override
- public StrategyType getStrategyType() {
- return StrategyType.STOP;
- }
-}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterWaitingStrategy.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterWaitingStrategy.java
deleted file mode 100644
index a18e823e09..0000000000
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterWaitingStrategy.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.registry;
-
-import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleException;
-import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
-import org.apache.dolphinscheduler.common.lifecycle.ServerStatus;
-import org.apache.dolphinscheduler.registry.api.Registry;
-import org.apache.dolphinscheduler.registry.api.RegistryClient;
-import org.apache.dolphinscheduler.registry.api.RegistryException;
-import org.apache.dolphinscheduler.registry.api.StrategyType;
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.server.master.engine.IWorkflowRepository;
-
-import java.time.Duration;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.stereotype.Service;
-
-/**
- * This strategy will change the server status to {@link ServerStatus#WAITING}
when disconnect from {@link Registry}.
- */
-@Service
-@ConditionalOnProperty(prefix = "master.registry-disconnect-strategy", name =
"strategy", havingValue = "waiting")
-@Slf4j
-public class MasterWaitingStrategy implements MasterConnectStrategy {
-
- @Autowired
- private MasterConfig masterConfig;
- @Autowired
- private RegistryClient registryClient;
- @Autowired
- private IWorkflowRepository IWorkflowRepository;
-
- @Override
- public void disconnect() {
- try {
- ServerLifeCycleManager.toWaiting();
- clearMasterResource();
- Duration maxWaitingTime =
masterConfig.getRegistryDisconnectStrategy().getMaxWaitingTime();
- try {
- log.info("Master disconnect from registry will try to
reconnect in {} s",
- maxWaitingTime.getSeconds());
- registryClient.connectUntilTimeout(maxWaitingTime);
- } catch (RegistryException ex) {
- throw new ServerLifeCycleException(
- String.format("Waiting to reconnect to registry in %s
failed", maxWaitingTime), ex);
- }
- } catch (ServerLifeCycleException e) {
- String errorMessage = String.format(
- "Disconnect from registry and change the current status to
waiting error, the current server state is %s, will stop the current server",
- ServerLifeCycleManager.getServerStatus());
- log.error(errorMessage, e);
- registryClient.getStoppable().stop(errorMessage);
- } catch (RegistryException ex) {
- String errorMessage = "Disconnect from registry and waiting to
reconnect failed, will stop the server";
- log.error(errorMessage, ex);
- registryClient.getStoppable().stop(errorMessage);
- } catch (Exception ex) {
- String errorMessage = "Disconnect from registry and get an unknown
exception, will stop the server";
- log.error(errorMessage, ex);
- registryClient.getStoppable().stop(errorMessage);
- }
- }
-
- @Override
- public void reconnect() {
- if (ServerLifeCycleManager.isRunning()) {
- log.info("no need to reconnect, as the current server status is
running");
- } else {
- try {
- ServerLifeCycleManager.recoverFromWaiting();
- log.info("Recover from waiting success, the current server
status is {}",
- ServerLifeCycleManager.getServerStatus());
- } catch (Exception e) {
- String errorMessage =
- String.format(
- "Recover from waiting failed, the current
server status is %s, will stop the server",
- ServerLifeCycleManager.getServerStatus());
- log.error(errorMessage, e);
- registryClient.getStoppable().stop(errorMessage);
- }
- }
- }
-
- @Override
- public StrategyType getStrategyType() {
- return StrategyType.WAITING;
- }
-
- private void clearMasterResource() {
- log.warn("Master clear workflow event queue due to lost registry
connection");
- IWorkflowRepository.clear();
- log.warn("Master clear workflow instance cache due to lost registry
connection");
-
- }
-
-}
diff --git a/dolphinscheduler-master/src/main/resources/application.yaml
b/dolphinscheduler-master/src/main/resources/application.yaml
index 964fca836c..5521105f44 100644
--- a/dolphinscheduler-master/src/main/resources/application.yaml
+++ b/dolphinscheduler-master/src/main/resources/application.yaml
@@ -102,9 +102,6 @@ master:
max-system-memory-usage-percentage-thresholds: 0.7
# Master max disk usage , when the master's disk usage is smaller then
this value, master server can execute workflow.
max-disk-usage-percentage-thresholds: 0.7
- registry-disconnect-strategy:
- # The disconnect strategy: stop, waiting
- strategy: stop
worker-group-refresh-interval: 10s
command-fetch-strategy:
type: ID_SLOT_BASED
diff --git a/dolphinscheduler-master/src/test/resources/application.yaml
b/dolphinscheduler-master/src/test/resources/application.yaml
index 8ab6e7de0b..7f8f1594a6 100644
--- a/dolphinscheduler-master/src/test/resources/application.yaml
+++ b/dolphinscheduler-master/src/test/resources/application.yaml
@@ -67,9 +67,6 @@ master:
memory-usage-weight: 40
cpu-usage-weight: 30
task-thread-pool-usage-weight: 30
- registry-disconnect-strategy:
- # The disconnect strategy: stop, waiting
- strategy: stop
worker-group-refresh-interval: 10s
command-fetch-strategy:
type: ID_SLOT_BASED
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectStrategy.java
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectStrategy.java
index 448a46841e..4617a4869f 100644
---
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectStrategy.java
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectStrategy.java
@@ -26,6 +26,4 @@ public interface ConnectStrategy {
void reconnect();
- StrategyType getStrategyType();
-
}
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectStrategyProperties.java
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectStrategyProperties.java
deleted file mode 100644
index 5457a9e44a..0000000000
---
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectStrategyProperties.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.registry.api;
-
-import java.time.Duration;
-
-import lombok.Data;
-
-@Data
-public class ConnectStrategyProperties {
-
- private StrategyType strategy = StrategyType.STOP;
-
- private Duration maxWaitingTime = Duration.ofSeconds(0);
-
-}
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java
index 3ddce0eee8..df0587019a 100644
---
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java
@@ -41,6 +41,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@@ -67,6 +68,10 @@ public class RegistryClient {
if (!registry.exists(RegistryNodeType.ALERT_SERVER.getRegistryPath()))
{
registry.put(RegistryNodeType.ALERT_SERVER.getRegistryPath(),
EMPTY, false);
}
+ if
(!registry.exists(RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath())) {
+
registry.put(RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath(), EMPTY,
false);
+ }
+ cleanHistoryFailoverFinishedNodes();
}
public boolean isConnected() {
@@ -168,6 +173,11 @@ public class RegistryClient {
registry.put(key, value, true);
}
+ public void persist(String key, String value) {
+ log.info("persist key: {}, value: {}", key, value);
+ registry.put(key, value, false);
+ }
+
public void remove(String key) {
registry.delete(key);
}
@@ -222,4 +232,27 @@ public class RegistryClient {
private Collection<String> getServerNodes(RegistryNodeType nodeType) {
return getChildrenKeys(nodeType.getRegistryPath());
}
+
+ private void cleanHistoryFailoverFinishedNodes() {
+ // Clean the history failover finished nodes
+ // which failover is before the current time minus 1 week
+ final Collection<String> failoverFinishedNodes =
+
registry.children(RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath());
+ if (CollectionUtils.isEmpty(failoverFinishedNodes)) {
+ return;
+ }
+ for (final String failoverFinishedNode : failoverFinishedNodes) {
+ try {
+ final String failoverFinishTime =
registry.get(failoverFinishedNode);
+ if (System.currentTimeMillis() -
Long.parseLong(failoverFinishTime) > TimeUnit.DAYS.toMillis(7)) {
+ registry.delete(failoverFinishedNode);
+ log.info(
+ "Clear the failover finished node: {} which
failover time is before the current time minus 1 week",
+ failoverFinishedNode);
+ }
+ } catch (Exception ex) {
+ log.error("Failed to clean the failoverFinishedNode: {}",
failoverFinishedNode, ex);
+ }
+ }
+ }
}
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/StrategyType.java
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/StrategyType.java
deleted file mode 100644
index 214177ec44..0000000000
---
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/StrategyType.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.registry.api;
-
-public enum StrategyType {
-
- STOP,
- WAITING,
- ;
-}
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
index 31a0f8d5aa..d39e95a84f 100644
---
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
@@ -24,6 +24,8 @@ import lombok.Getter;
@AllArgsConstructor
public enum RegistryNodeType {
+ FAILOVER_FINISH_NODES("FailoverFinishNodes",
"/nodes/failover-finish-nodes"),
+
MASTER("Master", "/nodes/master"),
MASTER_FAILOVER_LOCK("MasterFailoverLock", "/lock/master-failover"),
MASTER_TASK_GROUP_COORDINATOR_LOCK("TaskGroupCoordinatorLock",
"/lock/master-task-group-coordinator"),
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectStrategy.java
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/utils/RegistryUtils.java
similarity index 53%
rename from
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectStrategy.java
rename to
dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/utils/RegistryUtils.java
index 4cecadce16..25ef976ed5 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectStrategy.java
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/utils/RegistryUtils.java
@@ -15,10 +15,19 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.registry;
+package org.apache.dolphinscheduler.registry.api.utils;
-import org.apache.dolphinscheduler.registry.api.ConnectStrategy;
+import org.apache.dolphinscheduler.common.model.BaseHeartBeat;
+import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
-public interface MasterConnectStrategy extends ConnectStrategy {
+public class RegistryUtils {
+ public static String getFailoverFinishedNodePath(final BaseHeartBeat
baseHeartBeat) {
+ return getFailoverFinishedNodePath(baseHeartBeat.getHost() + ":" +
baseHeartBeat.getPort(),
+ baseHeartBeat.getStartupTime());
+ }
+
+ public static String getFailoverFinishedNodePath(final String
masterAddress, final long masterStartupTime) {
+ return RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath() + "/"
+ masterAddress + "-" + masterStartupTime;
+ }
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index d901effbd1..e391916a09 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.server.worker.config;
import org.apache.dolphinscheduler.common.utils.NetUtils;
-import org.apache.dolphinscheduler.registry.api.ConnectStrategyProperties;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.commons.lang3.StringUtils;
@@ -46,7 +45,6 @@ public class WorkerConfig implements Validator {
private Duration maxHeartbeatInterval = Duration.ofSeconds(10);
private int hostWeight = 100;
private WorkerServerLoadProtection serverLoadProtection = new
WorkerServerLoadProtection();
- private ConnectStrategyProperties registryDisconnectStrategy = new
ConnectStrategyProperties();
/**
* This field doesn't need to set at config file, it will be calculated by
workerIp:listenPort
@@ -90,7 +88,6 @@ public class WorkerConfig implements Validator {
"\n host-weight -> " + hostWeight +
"\n tenantConfig -> " + tenantConfig +
"\n server-load-protection -> " +
serverLoadProtection +
- "\n registry-disconnect-strategy -> " +
registryDisconnectStrategy +
"\n task-execute-threads-full-policy: " +
taskExecuteThreadsFullPolicy +
"\n address -> " + workerAddress +
"\n registry-path: " + workerRegistryPath +
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectStrategy.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectStrategy.java
deleted file mode 100644
index 260b7e0390..0000000000
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectStrategy.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.registry;
-
-import org.apache.dolphinscheduler.registry.api.ConnectStrategy;
-
-public interface WorkerConnectStrategy extends ConnectStrategy {
-
-}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectionStateListener.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectionStateListener.java
index d070a90b34..af046b2f57 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectionStateListener.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectionStateListener.java
@@ -20,21 +20,17 @@ package org.apache.dolphinscheduler.server.worker.registry;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.registry.api.ConnectionListener;
import org.apache.dolphinscheduler.registry.api.ConnectionState;
-import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.registry.api.RegistryClient;
-import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class WorkerConnectionStateListener implements ConnectionListener {
- private final WorkerConfig workerConfig;
- private final WorkerConnectStrategy workerConnectStrategy;
+ private final RegistryClient registryClient;
- public WorkerConnectionStateListener(@NonNull WorkerConfig workerConfig,
- @NonNull WorkerConnectStrategy
workerConnectStrategy) {
- this.workerConfig = workerConfig;
- this.workerConnectStrategy = workerConnectStrategy;
+ public WorkerConnectionStateListener(final RegistryClient registryClient) {
+ this.registryClient = registryClient;
}
@Override
@@ -47,10 +43,10 @@ public class WorkerConnectionStateListener implements
ConnectionListener {
case SUSPENDED:
break;
case RECONNECTED:
- workerConnectStrategy.reconnect();
+ log.warn("Worker reconnect to registry");
break;
case DISCONNECTED:
- workerConnectStrategy.disconnect();
+ registryClient.getStoppable().stop("Worker disconnected from
registry, will stop myself");
default:
}
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
index 3fca5caee1..dd0375caf8 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
@@ -46,7 +46,6 @@ import javax.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
@Slf4j
@@ -62,10 +61,6 @@ public class WorkerRegistryClient implements AutoCloseable {
@Autowired
private RegistryClient registryClient;
- @Autowired
- @Lazy
- private WorkerConnectStrategy workerConnectStrategy;
-
@Autowired
private MetricsProvider metricsProvider;
@@ -83,8 +78,7 @@ public class WorkerRegistryClient implements AutoCloseable {
public void start() {
try {
registry();
- registryClient.addConnectionStateListener(
- new WorkerConnectionStateListener(workerConfig,
workerConnectStrategy));
+ registryClient.addConnectionStateListener(new
WorkerConnectionStateListener(registryClient));
} catch (Exception ex) {
throw new RegistryException("Worker registry client start up
error", ex);
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStopStrategy.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStopStrategy.java
deleted file mode 100644
index 4c575a29e8..0000000000
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStopStrategy.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.registry;
-
-import org.apache.dolphinscheduler.registry.api.RegistryClient;
-import org.apache.dolphinscheduler.registry.api.StrategyType;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.stereotype.Service;
-
-@Service
-@ConditionalOnProperty(prefix = "worker.registry-disconnect-strategy", name =
"strategy", havingValue = "stop", matchIfMissing = true)
-@Slf4j
-public class WorkerStopStrategy implements WorkerConnectStrategy {
-
- @Autowired
- public RegistryClient registryClient;
-
- @Override
- public void disconnect() {
- registryClient.getStoppable()
- .stop("Worker disconnected from registry, will stop myself due
to the stop strategy");
- }
-
- @Override
- public void reconnect() {
- log.warn("The current connect strategy is stop, so the worker will not
reconnect to registry");
- }
-
- @Override
- public StrategyType getStrategyType() {
- return StrategyType.STOP;
- }
-}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java
deleted file mode 100644
index 2be156b5df..0000000000
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.registry;
-
-import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleException;
-import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
-import org.apache.dolphinscheduler.registry.api.RegistryClient;
-import org.apache.dolphinscheduler.registry.api.RegistryException;
-import org.apache.dolphinscheduler.registry.api.StrategyType;
-import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
-import
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder;
-import
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
-
-import java.time.Duration;
-
-import lombok.NonNull;
-import lombok.extern.slf4j.Slf4j;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.stereotype.Service;
-
-@Service
-@ConditionalOnProperty(prefix = "worker.registry-disconnect-strategy", name =
"strategy", havingValue = "waiting")
-@Slf4j
-public class WorkerWaitingStrategy implements WorkerConnectStrategy {
-
- @Autowired
- private WorkerConfig workerConfig;
-
- @Autowired
- private RegistryClient registryClient;
-
- @Autowired
- private MessageRetryRunner messageRetryRunner;
-
- @Autowired
- private WorkerTaskExecutorThreadPool workerManagerThread;
-
- public WorkerWaitingStrategy(@NonNull WorkerConfig workerConfig,
- @NonNull RegistryClient registryClient,
- @NonNull MessageRetryRunner
messageRetryRunner,
- @NonNull WorkerTaskExecutorThreadPool
workerManagerThread) {
- this.workerConfig = workerConfig;
- this.registryClient = registryClient;
- this.messageRetryRunner = messageRetryRunner;
- this.workerManagerThread = workerManagerThread;
- }
-
- @Override
- public void disconnect() {
- try {
- ServerLifeCycleManager.toWaiting();
- clearWorkerResource();
- Duration maxWaitingTime =
workerConfig.getRegistryDisconnectStrategy().getMaxWaitingTime();
- try {
- log.info("Worker disconnect from registry will try to
reconnect in {} s",
- maxWaitingTime.getSeconds());
- registryClient.connectUntilTimeout(maxWaitingTime);
- } catch (RegistryException ex) {
- throw new ServerLifeCycleException(
- String.format("Waiting to reconnect to registry in %s
failed", maxWaitingTime), ex);
- }
-
- } catch (ServerLifeCycleException e) {
- String errorMessage = String.format(
- "Disconnect from registry and change the current status to
waiting error, the current server state is %s, will stop the current server",
- ServerLifeCycleManager.getServerStatus());
- log.error(errorMessage, e);
- registryClient.getStoppable().stop(errorMessage);
- } catch (RegistryException ex) {
- String errorMessage = "Disconnect from registry and waiting to
reconnect failed, will stop the server";
- log.error(errorMessage, ex);
- registryClient.getStoppable().stop(errorMessage);
- } catch (Exception ex) {
- String errorMessage = "Disconnect from registry and get an unknown
exception, will stop the server";
- log.error(errorMessage, ex);
- registryClient.getStoppable().stop(errorMessage);
- }
- }
-
- @Override
- public void reconnect() {
- if (ServerLifeCycleManager.isRunning()) {
- log.info("no need to reconnect, as the current server status is
running");
- } else {
- try {
- ServerLifeCycleManager.recoverFromWaiting();
- log.info("Recover from waiting success, the current server
status is {}",
- ServerLifeCycleManager.getServerStatus());
- } catch (Exception e) {
- String errorMessage =
- String.format(
- "Recover from waiting failed, the current
server status is %s, will stop the server",
- ServerLifeCycleManager.getServerStatus());
- log.error(errorMessage, e);
- registryClient.getStoppable().stop(errorMessage);
- }
- }
- }
-
- @Override
- public StrategyType getStrategyType() {
- return StrategyType.WAITING;
- }
-
- private void clearWorkerResource() {
- workerManagerThread.clearTask();
- WorkerTaskExecutorHolder.clear();
- log.warn("Worker server clear the tasks due to lost connection from
registry");
- messageRetryRunner.clearMessage();
- log.warn("Worker server clear the retry message due to lost connection
from registry");
- }
-
-}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
index 13f585d92a..a2e3f37825 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
@@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
+import org.apache.dolphinscheduler.registry.api.utils.RegistryUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import
org.apache.dolphinscheduler.server.worker.config.WorkerServerLoadProtection;
import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
@@ -81,14 +82,25 @@ public class WorkerHeartBeatTask extends
BaseHeartBeatTask<WorkerHeartBeat> {
}
@Override
- public void writeHeartBeat(WorkerHeartBeat workerHeartBeat) {
+ public void writeHeartBeat(final WorkerHeartBeat workerHeartBeat) {
+ final String failoverNodePath =
RegistryUtils.getFailoverFinishedNodePath(workerHeartBeat);
+ if (registryClient.exists(failoverNodePath)) {
+ log.warn("The worker: {} is under {}, means it has been failover
will close myself",
+ workerHeartBeat,
+ failoverNodePath);
+ registryClient
+ .getStoppable()
+ .stop("The worker exist: " + failoverNodePath + ", means
it has been failover will close myself");
+ return;
+ }
String workerHeartBeatJson = JSONUtils.toJsonString(workerHeartBeat);
String workerRegistryPath = workerConfig.getWorkerRegistryPath();
registryClient.persistEphemeral(workerRegistryPath,
workerHeartBeatJson);
WorkerServerMetrics.incWorkerHeartbeatCount();
log.debug(
"Success write worker group heartBeatInfo into registry,
workerRegistryPath: {} workerHeartBeatInfo: {}",
- workerRegistryPath, workerHeartBeatJson);
+ workerRegistryPath,
+ workerHeartBeatJson);
}
private ServerStatus getServerStatus(SystemMetrics systemMetrics,
diff --git a/dolphinscheduler-worker/src/main/resources/application.yaml
b/dolphinscheduler-worker/src/main/resources/application.yaml
index 5cac4c29e5..c4077e623d 100644
--- a/dolphinscheduler-worker/src/main/resources/application.yaml
+++ b/dolphinscheduler-worker/src/main/resources/application.yaml
@@ -59,9 +59,6 @@ worker:
max-system-memory-usage-percentage-thresholds: 0.7
# Worker max disk usage , when the worker's disk usage is smaller then
this value, worker server can be dispatched tasks.
max-disk-usage-percentage-thresholds: 0.7
- registry-disconnect-strategy:
- # The disconnect strategy: stop, waiting
- strategy: stop
task-execute-threads-full-policy: REJECT
tenant-config:
# tenant corresponds to the user of the system, which is used by the
worker to submit the job. If system does not have this user, it will be
automatically created after the parameter worker.tenant.auto.create is true.
diff --git
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectionStateListenerTest.java
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectionStateListenerTest.java
index 14d75779b9..c04ec14430 100644
---
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectionStateListenerTest.java
+++
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectionStateListenerTest.java
@@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.server.worker.registry;
import static org.mockito.Mockito.times;
import org.apache.dolphinscheduler.registry.api.ConnectionState;
-import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -28,8 +28,6 @@ import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* worker registry test
@@ -37,24 +35,23 @@ import org.slf4j.LoggerFactory;
@ExtendWith(MockitoExtension.class)
public class WorkerConnectionStateListenerTest {
- private static final Logger log =
LoggerFactory.getLogger(WorkerConnectionStateListenerTest.class);
@InjectMocks
private WorkerConnectionStateListener workerConnectionStateListener;
+
@Mock
- private WorkerConfig workerConfig;
- @Mock
- private WorkerConnectStrategy workerConnectStrategy;
+ private RegistryClient registryClient;
@Test
public void testWorkerConnectionStateListener() {
- workerConnectionStateListener.onUpdate(ConnectionState.CONNECTED);
+ Mockito.when(registryClient.getStoppable()).thenReturn(cause -> {
+ // do nothing
+ });
+ workerConnectionStateListener.onUpdate(ConnectionState.CONNECTED);
workerConnectionStateListener.onUpdate(ConnectionState.RECONNECTED);
- Mockito.verify(workerConnectStrategy, times(1)).reconnect();
-
workerConnectionStateListener.onUpdate(ConnectionState.SUSPENDED);
-
+ Mockito.verify(registryClient, times(0)).getStoppable();
workerConnectionStateListener.onUpdate(ConnectionState.DISCONNECTED);
- Mockito.verify(workerConnectStrategy, times(1)).disconnect();
+ Mockito.verify(registryClient, times(1)).getStoppable();
}
}
diff --git
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
index 8c7071cf60..54f9033e4e 100644
---
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
+++
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
@@ -63,8 +63,6 @@ public class WorkerRegistryClientTest {
@Mock
private WorkerTaskExecutorThreadPool workerManagerThread;
@Mock
- private WorkerConnectStrategy workerConnectStrategy;
- @Mock
private IStoppable stoppable;
@Test
diff --git
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStrategyTest.java
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStrategyTest.java
deleted file mode 100644
index 671fac5277..0000000000
---
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStrategyTest.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.registry;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.BDDMockito.given;
-import static org.mockito.Mockito.doNothing;
-
-import org.apache.dolphinscheduler.common.IStoppable;
-import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleException;
-import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
-import org.apache.dolphinscheduler.registry.api.ConnectStrategyProperties;
-import org.apache.dolphinscheduler.registry.api.RegistryClient;
-import org.apache.dolphinscheduler.registry.api.RegistryException;
-import org.apache.dolphinscheduler.registry.api.StrategyType;
-import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
-import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer;
-import
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
-
-import java.time.Duration;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.MockedStatic;
-import org.mockito.Mockito;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * worker registry test
- */
-@ExtendWith(MockitoExtension.class)
-public class WorkerStrategyTest {
-
- private static final Logger log =
LoggerFactory.getLogger(WorkerStrategyTest.class);
- @Mock
- private RegistryClient registryClient;
- @Mock
- private IStoppable stoppable;
- @Mock
- private WorkerConfig workerConfig;
- @Mock
- private WorkerRpcServer workerRpcServer;
- @Mock
- private MessageRetryRunner messageRetryRunner;
- @Mock
- private WorkerTaskExecutorThreadPool workerManagerThread;
- @Mock
- private ConnectStrategyProperties connectStrategyProperties;
-
- @Test
- public void testWorkerStopStrategy() {
- given(registryClient.getStoppable())
- .willReturn(stoppable);
- WorkerStopStrategy workerStopStrategy = new WorkerStopStrategy();
- workerStopStrategy.registryClient = registryClient;
- workerStopStrategy.reconnect();
- workerStopStrategy.disconnect();
- Assertions.assertEquals(workerStopStrategy.getStrategyType(),
StrategyType.STOP);
- }
-
- @Test
- public void testWorkerWaitingStrategyreconnect() {
- WorkerWaitingStrategy workerWaitingStrategy = new
WorkerWaitingStrategy(
- workerConfig,
- registryClient,
- messageRetryRunner,
- workerManagerThread);
- Assertions.assertEquals(workerWaitingStrategy.getStrategyType(),
StrategyType.WAITING);
-
- try (
- MockedStatic<ServerLifeCycleManager>
serverLifeCycleManagerMockedStatic =
- Mockito.mockStatic(ServerLifeCycleManager.class)) {
- serverLifeCycleManagerMockedStatic
- .when(() -> ServerLifeCycleManager.isRunning())
- .thenReturn(true);
- workerWaitingStrategy.reconnect();
-
- }
-
- try (
- MockedStatic<ServerLifeCycleManager>
serverLifeCycleManagerMockedStatic =
- Mockito.mockStatic(ServerLifeCycleManager.class)) {
- doNothing().when(stoppable).stop(anyString());
- given(registryClient.getStoppable())
- .willReturn(stoppable);
- serverLifeCycleManagerMockedStatic
- .when(() -> ServerLifeCycleManager.recoverFromWaiting())
- .thenThrow(new ServerLifeCycleException(""));
- workerWaitingStrategy.reconnect();
- }
-
- try (
- MockedStatic<ServerLifeCycleManager>
serverLifeCycleManagerMockedStatic =
- Mockito.mockStatic(ServerLifeCycleManager.class)) {
- serverLifeCycleManagerMockedStatic
- .when(() -> ServerLifeCycleManager.recoverFromWaiting())
- .thenAnswer(invocation -> null);
- workerWaitingStrategy.reconnect();
- }
- }
-
- @Test
- public void testWorkerWaitingStrategydisconnect() {
- WorkerWaitingStrategy workerWaitingStrategy = new
WorkerWaitingStrategy(
- workerConfig,
- registryClient,
- messageRetryRunner,
- workerManagerThread);
- Assertions.assertEquals(workerWaitingStrategy.getStrategyType(),
StrategyType.WAITING);
-
- try (
- MockedStatic<ServerLifeCycleManager>
serverLifeCycleManagerMockedStatic =
- Mockito.mockStatic(ServerLifeCycleManager.class)) {
- doNothing().when(stoppable).stop(anyString());
- given(registryClient.getStoppable())
- .willReturn(stoppable);
- serverLifeCycleManagerMockedStatic
- .when(() -> ServerLifeCycleManager.toWaiting())
- .thenThrow(new ServerLifeCycleException(""));
- workerWaitingStrategy.disconnect();
- }
-
- try (
- MockedStatic<ServerLifeCycleManager>
serverLifeCycleManagerMockedStatic =
- Mockito.mockStatic(ServerLifeCycleManager.class)) {
-
given(connectStrategyProperties.getMaxWaitingTime()).willReturn(Duration.ofSeconds(1));
-
given(workerConfig.getRegistryDisconnectStrategy()).willReturn(connectStrategyProperties);
- Mockito.reset(registryClient);
- doNothing().when(registryClient).connectUntilTimeout(any());
- serverLifeCycleManagerMockedStatic
- .when(() -> ServerLifeCycleManager.toWaiting())
- .thenAnswer(invocation -> null);
- workerWaitingStrategy.disconnect();
- }
-
- try (
- MockedStatic<ServerLifeCycleManager>
serverLifeCycleManagerMockedStatic =
- Mockito.mockStatic(ServerLifeCycleManager.class)) {
-
given(connectStrategyProperties.getMaxWaitingTime()).willReturn(Duration.ofSeconds(1));
-
given(workerConfig.getRegistryDisconnectStrategy()).willReturn(connectStrategyProperties);
- Mockito.reset(registryClient);
- doNothing().when(stoppable).stop(anyString());
- given(registryClient.getStoppable())
- .willReturn(stoppable);
- Mockito.doThrow(new
RegistryException("TEST")).when(registryClient).connectUntilTimeout(any());
- serverLifeCycleManagerMockedStatic
- .when(() -> ServerLifeCycleManager.toWaiting())
- .thenAnswer(invocation -> null);
- workerWaitingStrategy.disconnect();
- }
-
- try (
- MockedStatic<ServerLifeCycleManager>
serverLifeCycleManagerMockedStatic =
- Mockito.mockStatic(ServerLifeCycleManager.class)) {
- Mockito.reset(workerConfig);
- given(workerConfig.getRegistryDisconnectStrategy()).willThrow(new
NullPointerException(""));
- doNothing().when(stoppable).stop(anyString());
- given(registryClient.getStoppable())
- .willReturn(stoppable);
- serverLifeCycleManagerMockedStatic
- .when(() -> ServerLifeCycleManager.toWaiting())
- .thenAnswer(invocation -> null);
- workerWaitingStrategy.disconnect();
- }
- }
-}