This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new ae608e0 [Fix-5037][Server] Fix that both the master and the worker is
hanging after restarting and stopping (#5038)
ae608e0 is described below
commit ae608e024a6e0f7c8777324736a8b22685789882
Author: Shiwen Cheng <[email protected]>
AuthorDate: Fri Mar 12 09:33:30 2021 +0800
[Fix-5037][Server] Fix that both the master and the worker is hanging after
restarting and stopping (#5038)
* [Fix-5037][Server] Fix that both the master and the worker is hanging
after restarting and stopping
* [Improvement][*] Replace commons.lang.StringUtils with
common.utils.StringUtils
---
.../api/interceptor/LoginHandlerInterceptor.java | 13 +++++++----
.../apache/dolphinscheduler/common/IStoppable.java | 14 ++++++-----
.../common/task/http/HttpParameters.java | 5 ++--
.../common/task/procedure/ProcedureParameters.java | 4 ++--
.../server/master/MasterServer.java | 27 ++++++++--------------
.../server/master/registry/MasterRegistry.java | 2 ++
.../master/runner/MasterSchedulerService.java | 25 +++++++++++---------
.../server/registry/HeartBeatTask.java | 5 ++--
.../server/registry/ZookeeperRegistryCenter.java | 8 ++-----
.../server/worker/WorkerServer.java | 21 ++++++-----------
.../server/worker/registry/WorkerRegistry.java | 3 ++-
.../server/worker/task/AbstractTask.java | 4 ++--
.../dolphinscheduler/server/zk/ZKMasterClient.java | 2 +-
.../server/worker/EnvFileTest.java | 9 ++++----
.../service/zk/ZookeeperOperator.java | 2 +-
15 files changed, 68 insertions(+), 76 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/interceptor/LoginHandlerInterceptor.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/interceptor/LoginHandlerInterceptor.java
index 83eb4fe..e727d79 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/interceptor/LoginHandlerInterceptor.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/interceptor/LoginHandlerInterceptor.java
@@ -14,19 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.api.interceptor;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
+package org.apache.dolphinscheduler.api.interceptor;
-import org.apache.commons.httpclient.HttpStatus;
-import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.security.Authenticator;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
+
+import org.apache.commons.httpclient.HttpStatus;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/IStoppable.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/IStoppable.java
index 0f6f40b..af54a2a 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/IStoppable.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/IStoppable.java
@@ -14,16 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.common;
/**
* server stop interface.
*/
public interface IStoppable {
- /**
- * Stop this service.
- * @param cause why stopping
- */
- void stop(String cause);
-}
\ No newline at end of file
+ /**
+ * Stop this service.
+ * @param cause why stopping
+ */
+ void stop(String cause);
+
+}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/http/HttpParameters.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/http/HttpParameters.java
index 7e4cf74..f439dc1 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/http/HttpParameters.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/http/HttpParameters.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.common.task.http;
import org.apache.dolphinscheduler.common.enums.HttpCheckCondition;
@@ -21,7 +22,7 @@ import org.apache.dolphinscheduler.common.enums.HttpMethod;
import org.apache.dolphinscheduler.common.process.HttpProperty;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
-import org.apache.commons.lang.StringUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.util.ArrayList;
import java.util.List;
@@ -70,7 +71,7 @@ public class HttpParameters extends AbstractParameters {
@Override
public boolean checkParameters() {
- return StringUtils.isNotEmpty(url);
+ return StringUtils.isNotEmpty(url);
}
@Override
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/procedure/ProcedureParameters.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/procedure/ProcedureParameters.java
index 2811f10..30ee349 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/procedure/ProcedureParameters.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/procedure/ProcedureParameters.java
@@ -14,16 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.common.task.procedure;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
-import org.apache.commons.lang.StringUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.util.ArrayList;
import java.util.List;
-
/**
* procedure parameter
*/
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 5e00784..e03e8e8 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -85,9 +85,7 @@ public class MasterServer implements IStoppable {
private MasterSchedulerService masterSchedulerService;
/**
- * master server startup
- * <p>
- * master server not use web service
+ * master server startup, not use web service
*
* @param args arguments
*/
@@ -131,14 +129,11 @@ public class MasterServer implements IStoppable {
}
/**
- * register hooks, which are called before the process exits
+ * register hooks, which are called before the process exits
*/
- Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
- @Override
- public void run() {
- if (Stopper.isRunning()) {
- close("shutdownHook");
- }
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ if (Stopper.isRunning()) {
+ close("shutdownHook");
}
}));
@@ -152,7 +147,7 @@ public class MasterServer implements IStoppable {
public void close(String cause) {
try {
- //execute only once
+ // execute only once
if (Stopper.isStopped()) {
return;
}
@@ -163,27 +158,24 @@ public class MasterServer implements IStoppable {
Stopper.stop();
try {
- //thread sleep 3 seconds for thread quietly stop
+ // thread sleep 3 seconds for thread quietly stop
Thread.sleep(3000L);
} catch (Exception e) {
logger.warn("thread sleep exception ", e);
}
- //close
+ // close
this.masterSchedulerService.close();
this.nettyRemotingServer.close();
this.zkMasterClient.close();
- //close quartz
+ // close quartz
try {
QuartzExecutors.getInstance().shutdown();
logger.info("Quartz service stopped");
} catch (Exception e) {
logger.warn("Quartz service stopped exception:{}",
e.getMessage());
}
-
} catch (Exception e) {
logger.error("master server stop exception ", e);
- } finally {
- System.exit(-1);
}
}
@@ -192,4 +184,3 @@ public class MasterServer implements IStoppable {
close(cause);
}
}
-
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
index b492395..e54fc84 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
@@ -116,6 +116,8 @@ public class MasterRegistry {
String localNodePath = getMasterPath();
zookeeperRegistryCenter.getRegisterOperator().remove(localNodePath);
logger.info("master node : {} unRegistry to ZK.", address);
+ heartBeatExecutor.shutdown();
+ logger.info("heartbeat executor shutdown");
}
/**
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index b2659ba..6a3cc60 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -14,15 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.runner;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.PostConstruct;
+package org.apache.dolphinscheduler.server.master.runner;
-import org.apache.curator.framework.imps.CuratorFrameworkState;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
@@ -36,6 +30,15 @@ import
org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.AlertManager;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.PostConstruct;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -90,14 +93,14 @@ public class MasterSchedulerService extends Thread {
* constructor of MasterSchedulerService
*/
@PostConstruct
- public void init(){
+ public void init() {
this.masterExecService =
(ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread",
masterConfig.getMasterExecThreads());
NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
}
@Override
- public synchronized void start(){
+ public synchronized void start() {
super.setName("MasterSchedulerService");
super.start();
}
@@ -110,7 +113,7 @@ public class MasterSchedulerService extends Thread {
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
- if(!terminated){
+ if (!terminated) {
logger.warn("masterExecService shutdown without terminated,
increase await time");
}
nettyRemotingClient.close();
@@ -123,7 +126,7 @@ public class MasterSchedulerService extends Thread {
@Override
public void run() {
logger.info("master scheduler started");
- while (Stopper.isRunning()){
+ while (Stopper.isRunning()) {
try {
boolean runCheckFlag =
OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(),
masterConfig.getMasterReservedMemory());
if (!runCheckFlag) {
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
index a12583b..d5f7a6e 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
@@ -43,9 +43,8 @@ public class HeartBeatTask implements Runnable {
private Set<String> heartBeatPaths;
private String serverType;
private ZookeeperRegistryCenter zookeeperRegistryCenter;
- /**
- * server stop or not
- */
+
+ // server stop or not
protected IStoppable stoppable = null;
public HeartBeatTask(String startTime,
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
index 9017a13..591333b 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
@@ -227,18 +227,14 @@ public class ZookeeperRegistryCenter implements
InitializingBean {
* @throws Exception errors
*/
protected boolean checkIsDeadServer(String zNode, String serverType)
throws Exception {
- //ip_sequenceno
+ // ip_sequence_no
String[] zNodesPath = zNode.split("\\/");
String ipSeqNo = zNodesPath[zNodesPath.length - 1];
String type = serverType.equals(MASTER_PREFIX) ? MASTER_PREFIX :
WORKER_PREFIX;
String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type
+ UNDERLINE + ipSeqNo;
- if (!registerOperator.isExisted(zNode) ||
registerOperator.isExisted(deadServerPath)) {
- return true;
- }
-
- return false;
+ return !registerOperator.isExisted(zNode) ||
registerOperator.isExisted(deadServerPath);
}
public RegisterOperator getRegisterOperator() {
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 98c58ac..1278ae9 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -95,9 +95,8 @@ public class WorkerServer implements IStoppable {
private WorkerManagerThread workerManagerThread;
/**
- * worker server startup
+ * worker server startup, not use web service
*
- * worker server not use web service
* @param args arguments
*/
public static void main(String[] args) {
@@ -143,12 +142,9 @@ public class WorkerServer implements IStoppable {
/**
* register hooks, which are called before the process exits
*/
- Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
- @Override
- public void run() {
- if (Stopper.isRunning()) {
- close("shutdownHook");
- }
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ if (Stopper.isRunning()) {
+ close("shutdownHook");
}
}));
}
@@ -156,7 +152,7 @@ public class WorkerServer implements IStoppable {
public void close(String cause) {
try {
- //execute only once
+ // execute only once
if (Stopper.isStopped()) {
return;
}
@@ -167,21 +163,18 @@ public class WorkerServer implements IStoppable {
Stopper.stop();
try {
- //thread sleep 3 seconds for thread quitely stop
+ // thread sleep 3 seconds for thread quitely stop
Thread.sleep(3000L);
} catch (Exception e) {
logger.warn("thread sleep exception", e);
}
+ // close
this.nettyRemotingServer.close();
this.workerRegistry.unRegistry();
-
this.alertClientService.close();
-
} catch (Exception e) {
logger.error("worker server stop exception ", e);
- } finally {
- System.exit(-1);
}
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
index 87c6af6..06b72a5 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
@@ -47,7 +47,6 @@ import org.springframework.stereotype.Service;
import com.google.common.collect.Sets;
-
/**
* worker registry
*/
@@ -115,6 +114,7 @@ public class WorkerRegistry {
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath,
"");
} else if (newState == ConnectionState.SUSPENDED) {
logger.warn("worker : {} connection SUSPENDED ",
address);
+
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath,
"");
}
});
logger.info("worker node : {} registry to ZK {} successfully",
address, workerZKPath);
@@ -142,6 +142,7 @@ public class WorkerRegistry {
logger.info("worker node : {} unRegistry from ZK {}.", address,
workerZkPath);
}
this.heartBeatExecutor.shutdownNow();
+ logger.info("heartbeat executor shutdown");
}
/**
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
index 68152e2..bf36b24 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.worker.task;
import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER;
@@ -25,13 +26,12 @@ import
org.apache.dolphinscheduler.common.enums.TaskRecordStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.TaskRecordDao;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
-import org.apache.commons.lang.StringUtils;
-
import java.util.List;
import java.util.Map;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
index b77b84d..cf8e23f 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
@@ -110,8 +110,8 @@ public class ZKMasterClient extends AbstractZKClient {
@Override
public void close() {
- super.close();
masterRegistry.unRegistry();
+ super.close();
}
/**
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/EnvFileTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/EnvFileTest.java
index 7ed1522..dc2a600 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/EnvFileTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/EnvFileTest.java
@@ -14,18 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.worker;
-import org.apache.commons.lang.StringUtils;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class EnvFileTest {
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
index e441986..6652f87 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
@@ -58,7 +58,7 @@ public class ZookeeperOperator implements InitializingBean {
protected CuratorFramework zkClient;
@Override
- public void afterPropertiesSet() throws Exception {
+ public void afterPropertiesSet() {
this.zkClient = buildClient();
initStateListener();
treeCacheStart();