This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 076ccbd sleep when resource in not satisfy. fix #1522 (#1523)
076ccbd is described below
commit 076ccbdb9b3657f286198caf23f500341269dff5
Author: Tboy <[email protected]>
AuthorDate: Fri Dec 20 15:43:34 2019 +0800
sleep when resource in not satisfy. fix #1522 (#1523)
* fix #1515
* sleep when resource in not satisfy. fix #1522
* add sleep 1s for no command
---
.../master/runner/MasterSchedulerThread.java | 60 +++++++++++-----------
1 file changed, 31 insertions(+), 29 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java
index 69c2304..8d7d5a0 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java
@@ -98,39 +98,41 @@ public class MasterSchedulerThread implements Runnable {
InterProcessMutex mutex = null;
try {
-
if(OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(),
masterConfig.getMasterReservedMemory())){
- if (zkMasterClient.getZkClient().getState() ==
CuratorFrameworkState.STARTED) {
-
- // create distributed lock with the root node path of
the lock space as /dolphinscheduler/lock/masters
- String znodeLock = zkMasterClient.getMasterLockPath();
-
- mutex = new
InterProcessMutex(zkMasterClient.getZkClient(), znodeLock);
- mutex.acquire();
-
- ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor)
masterExecService;
- int activeCount = poolExecutor.getActiveCount();
- // make sure to scan and delete command table in one
transaction
- Command command = processDao.findOneCommand();
- if (command != null) {
- logger.info(String.format("find one command: id:
%d, type: %s", command.getId(),command.getCommandType().toString()));
-
- try{
- processInstance =
processDao.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum -
activeCount, command);
- if (processInstance != null) {
- logger.info("start master exec thread ,
split DAG ...");
- masterExecService.execute(new
MasterExecThread(processInstance,processDao));
- }
- }catch (Exception e){
- logger.error("scan command error ", e);
- processDao.moveToErrorCommand(command,
e.toString());
+ boolean runCheckFlag =
OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(),
masterConfig.getMasterReservedMemory());
+ if(!runCheckFlag) {
+ Thread.sleep(Constants.SLEEP_TIME_MILLIS);
+ continue;
+ }
+ if (zkMasterClient.getZkClient().getState() ==
CuratorFrameworkState.STARTED) {
+
+ // create distributed lock with the root node path of the
lock space as /dolphinscheduler/lock/masters
+ String znodeLock = zkMasterClient.getMasterLockPath();
+
+ mutex = new
InterProcessMutex(zkMasterClient.getZkClient(), znodeLock);
+ mutex.acquire();
+
+ ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor)
masterExecService;
+ int activeCount = poolExecutor.getActiveCount();
+ // make sure to scan and delete command table in one
transaction
+ Command command = processDao.findOneCommand();
+ if (command != null) {
+ logger.info(String.format("find one command: id: %d,
type: %s", command.getId(),command.getCommandType().toString()));
+
+ try{
+ processInstance = processDao.handleCommand(logger,
OSUtils.getHost(), this.masterExecThreadNum - activeCount, command);
+ if (processInstance != null) {
+ logger.info("start master exec thread , split
DAG ...");
+ masterExecService.execute(new
MasterExecThread(processInstance,processDao));
}
+ }catch (Exception e){
+ logger.error("scan command error ", e);
+ processDao.moveToErrorCommand(command,
e.toString());
}
+ } else{
+ //indicate that no command ,sleep for 1s
+ Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
}
-
- // accessing the command table every SLEEP_TIME_MILLIS
milliseconds
- Thread.sleep(Constants.SLEEP_TIME_MILLIS);
-
}catch (Exception e){
logger.error("master scheduler thread exception : " +
e.getMessage(),e);
}finally{