This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch 3.0.1-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/3.0.1-prepare by this push:
new ab8f878631 fix heartBeatTaskCount bug (#12024)
ab8f878631 is described below
commit ab8f87863127485b986c656fa1c532a43e46b852
Author: JinYong Li <[email protected]>
AuthorDate: Sun Sep 18 11:49:21 2022 +0800
fix heartBeatTaskCount bug (#12024)
---
.../master/registry/MasterHeartBeatTask.java | 59 +++++-----------------
.../master/registry/MasterRegistryClient.java | 19 +++----
.../worker/registry/WorkerHeartBeatTask.java | 47 ++++-------------
.../worker/registry/WorkerRegistryClient.java | 21 ++++----
4 files changed, 39 insertions(+), 107 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
similarity index 50%
copy from
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
copy to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
index c84abb4182..2bcbad2616 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
@@ -15,8 +15,9 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.registry;
+package org.apache.dolphinscheduler.server.master.registry;
+import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.HeartBeat;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
@@ -27,52 +28,25 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Heart beat task
+ * Master heart beat task
*/
-public class HeartBeatTask implements Runnable {
+public class MasterHeartBeatTask implements Runnable {
- private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
+ private final Logger logger =
LoggerFactory.getLogger(MasterHeartBeatTask.class);
private final Set<String> heartBeatPaths;
private final RegistryClient registryClient;
- private int workerWaitingTaskCount;
- private final String serverType;
private final HeartBeat heartBeat;
-
- private final int heartBeatErrorThreshold;
-
private final AtomicInteger heartBeatErrorTimes = new AtomicInteger();
- public HeartBeatTask(long startupTime,
- double maxCpuloadAvg,
- double reservedMemory,
- Set<String> heartBeatPaths,
- String serverType,
- RegistryClient registryClient,
- int heartBeatErrorThreshold) {
+ public MasterHeartBeatTask(long startupTime,
+ double maxCpuloadAvg,
+ double reservedMemory,
+ Set<String> heartBeatPaths,
+ RegistryClient registryClient) {
this.heartBeatPaths = heartBeatPaths;
this.registryClient = registryClient;
- this.serverType = serverType;
this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg,
reservedMemory);
- this.heartBeatErrorThreshold = heartBeatErrorThreshold;
- }
-
- public HeartBeatTask(long startupTime,
- double maxCpuloadAvg,
- double reservedMemory,
- int hostWeight,
- Set<String> heartBeatPaths,
- String serverType,
- RegistryClient registryClient,
- int workerThreadCount,
- int workerWaitingTaskCount,
- int heartBeatErrorThreshold) {
- this.heartBeatPaths = heartBeatPaths;
- this.registryClient = registryClient;
- this.workerWaitingTaskCount = workerWaitingTaskCount;
- this.serverType = serverType;
- this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg,
reservedMemory, hostWeight, workerThreadCount);
- this.heartBeatErrorThreshold = heartBeatErrorThreshold;
}
public String getHeartBeatInfo() {
@@ -82,27 +56,18 @@ public class HeartBeatTask implements Runnable {
@Override
public void run() {
try {
- // check dead or not in zookeeper
for (String heartBeatPath : heartBeatPaths) {
- if (registryClient.checkIsDeadServer(heartBeatPath,
serverType)) {
+ if (registryClient.checkIsDeadServer(heartBeatPath,
Constants.MASTER_TYPE)) {
registryClient.getStoppable().stop("i was judged to death,
release resources and stop myself");
return;
}
}
-
- // update waiting task count
- heartBeat.setWorkerWaitingTaskCount(workerWaitingTaskCount);
-
for (String heartBeatPath : heartBeatPaths) {
registryClient.persistEphemeral(heartBeatPath,
heartBeat.encodeHeartBeat());
}
heartBeatErrorTimes.set(0);
} catch (Throwable ex) {
- logger.error("HeartBeat task execute failed", ex);
- if (heartBeatErrorTimes.incrementAndGet() >=
heartBeatErrorThreshold) {
- registryClient.getStoppable()
- .stop("HeartBeat task connect to zk failed too
much times: " + heartBeatErrorTimes);
- }
+ logger.error("HeartBeat task execute failed, errorTimes: {}",
heartBeatErrorTimes.get(), ex);
}
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index a59b712828..56167f65b4 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -30,7 +30,6 @@ import
org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.service.FailoverService;
-import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.lang3.StringUtils;
@@ -50,7 +49,7 @@ import com.google.common.collect.Sets;
/**
* <p>DolphinScheduler master register client, used to connect to registry and
hand the registry events.
- * <p>When the Master node startup, it will register in registry center. And
schedule a {@link HeartBeatTask} to update its metadata in registry.
+ * <p>When the Master node startup, it will register in registry center. And
schedule a {@link MasterHeartBeatTask} to update its metadata in registry.
*/
@Component
public class MasterRegistryClient implements AutoCloseable {
@@ -97,7 +96,7 @@ public class MasterRegistryClient implements AutoCloseable {
// master registry
registry();
registryClient.addConnectionStateListener(new
MasterConnectionStateListener(getCurrentNodePath(),
-
registryClient));
+ registryClient));
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new
MasterRegistryDataListener());
} catch (Exception e) {
throw new RegistryException("Master registry client start up
error", e);
@@ -190,13 +189,11 @@ public class MasterRegistryClient implements
AutoCloseable {
logger.info("Master node : {} registering to registry center",
masterAddress);
String localNodePath = getCurrentNodePath();
Duration masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
- HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
-
masterConfig.getMaxCpuLoadAvg(),
-
masterConfig.getReservedMemory(),
-
Sets.newHashSet(localNodePath),
- Constants.MASTER_TYPE,
- registryClient,
-
masterConfig.getHeartbeatErrorThreshold());
+ MasterHeartBeatTask heartBeatTask = new
MasterHeartBeatTask(startupTime,
+ masterConfig.getMaxCpuLoadAvg(),
+ masterConfig.getReservedMemory(),
+ Sets.newHashSet(localNodePath),
+ registryClient);
// remove before persist
registryClient.remove(localNodePath);
@@ -247,4 +244,4 @@ public class MasterRegistryClient implements AutoCloseable {
return NetUtils.getAddr(masterConfig.getListenPort());
}
-}
+}
\ No newline at end of file
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerHeartBeatTask.java
similarity index 63%
rename from
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
rename to
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerHeartBeatTask.java
index c84abb4182..72315a67b6 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerHeartBeatTask.java
@@ -15,64 +15,43 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.registry;
+package org.apache.dolphinscheduler.server.worker.registry;
+import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.HeartBeat;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* Heart beat task
*/
-public class HeartBeatTask implements Runnable {
+public class WorkerHeartBeatTask implements Runnable {
- private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
+ private final Logger logger =
LoggerFactory.getLogger(WorkerHeartBeatTask.class);
private final Set<String> heartBeatPaths;
private final RegistryClient registryClient;
private int workerWaitingTaskCount;
- private final String serverType;
private final HeartBeat heartBeat;
- private final int heartBeatErrorThreshold;
-
private final AtomicInteger heartBeatErrorTimes = new AtomicInteger();
- public HeartBeatTask(long startupTime,
- double maxCpuloadAvg,
- double reservedMemory,
- Set<String> heartBeatPaths,
- String serverType,
- RegistryClient registryClient,
- int heartBeatErrorThreshold) {
- this.heartBeatPaths = heartBeatPaths;
- this.registryClient = registryClient;
- this.serverType = serverType;
- this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg,
reservedMemory);
- this.heartBeatErrorThreshold = heartBeatErrorThreshold;
- }
-
- public HeartBeatTask(long startupTime,
+ public WorkerHeartBeatTask(long startupTime,
double maxCpuloadAvg,
double reservedMemory,
int hostWeight,
Set<String> heartBeatPaths,
- String serverType,
RegistryClient registryClient,
int workerThreadCount,
- int workerWaitingTaskCount,
- int heartBeatErrorThreshold) {
+ int workerWaitingTaskCount) {
this.heartBeatPaths = heartBeatPaths;
this.registryClient = registryClient;
this.workerWaitingTaskCount = workerWaitingTaskCount;
- this.serverType = serverType;
this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg,
reservedMemory, hostWeight, workerThreadCount);
- this.heartBeatErrorThreshold = heartBeatErrorThreshold;
}
public String getHeartBeatInfo() {
@@ -82,14 +61,12 @@ public class HeartBeatTask implements Runnable {
@Override
public void run() {
try {
- // check dead or not in zookeeper
for (String heartBeatPath : heartBeatPaths) {
- if (registryClient.checkIsDeadServer(heartBeatPath,
serverType)) {
+ if (registryClient.checkIsDeadServer(heartBeatPath,
Constants.WORKER_TYPE)) {
registryClient.getStoppable().stop("i was judged to death,
release resources and stop myself");
return;
}
}
-
// update waiting task count
heartBeat.setWorkerWaitingTaskCount(workerWaitingTaskCount);
@@ -98,11 +75,7 @@ public class HeartBeatTask implements Runnable {
}
heartBeatErrorTimes.set(0);
} catch (Throwable ex) {
- logger.error("HeartBeat task execute failed", ex);
- if (heartBeatErrorTimes.incrementAndGet() >=
heartBeatErrorThreshold) {
- registryClient.getStoppable()
- .stop("HeartBeat task connect to zk failed too
much times: " + heartBeatErrorTimes);
- }
+ logger.error("HeartBeat task execute failed, errorTimes: {}",
heartBeatErrorTimes.get(), ex);
}
}
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
index 99d18acc30..6aff4dfb19 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
@@ -28,7 +28,6 @@ import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
-import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
@@ -101,16 +100,14 @@ public class WorkerRegistryClient implements
AutoCloseable {
Set<String> workerZkPaths = getWorkerZkPaths();
long workerHeartbeatInterval =
workerConfig.getHeartbeatInterval().getSeconds();
- HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
-
workerConfig.getMaxCpuLoadAvg(),
-
workerConfig.getReservedMemory(),
-
workerConfig.getHostWeight(),
- workerZkPaths,
- Constants.WORKER_TYPE,
- registryClient,
-
workerConfig.getExecThreads(),
-
workerManagerThread.getThreadPoolQueueSize(),
-
workerConfig.getHeartbeatErrorThreshold());
+ WorkerHeartBeatTask heartBeatTask = new
WorkerHeartBeatTask(startupTime,
+ workerConfig.getMaxCpuLoadAvg(),
+ workerConfig.getReservedMemory(),
+ workerConfig.getHostWeight(),
+ workerZkPaths,
+ registryClient,
+ workerConfig.getExecThreads(),
+ workerManagerThread.getThreadPoolQueueSize());
for (String workerZKPath : workerZkPaths) {
// remove before persist
@@ -199,4 +196,4 @@ public class WorkerRegistryClient implements AutoCloseable {
unRegistry();
}
-}
+}
\ No newline at end of file