This is an automated email from the ASF dual-hosted git repository.

journey pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 88b022c  ITaskQueue should have "hasTask" function (#1744)
88b022c is described below

commit 88b022cb571c76bf6687998bde30d7608f19e1a8
Author: gabry.wu <[email protected]>
AuthorDate: Thu Jan 9 10:41:48 2020 +0800

    ITaskQueue should have "hasTask" function (#1744)
    
    * issue https://github.com/apache/incubator-dolphinscheduler/issues/1742
    
    * remove unnecessary catch
    
    * fix exception type
    
    * fix bad code smell
    
    * remove log exceptions
    
    * catch exception
---
 .../apache/dolphinscheduler/common/queue/ITaskQueue.java  |  7 +++++++
 .../dolphinscheduler/common/queue/TaskQueueZkImpl.java    | 15 +++++++++++++++
 .../dolphinscheduler/common/zk/ZookeeperOperator.java     | 11 +++++++++++
 .../common/queue/TaskQueueZKImplTest.java                 | 11 ++++++++++-
 .../server/worker/runner/FetchTaskThread.java             |  5 +++--
 5 files changed, 46 insertions(+), 3 deletions(-)

diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/ITaskQueue.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/ITaskQueue.java
index 6e937f0..5beb811 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/ITaskQueue.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/ITaskQueue.java
@@ -31,6 +31,13 @@ public interface ITaskQueue {
     List<String> getAllTasks(String key);
 
     /**
+     * check if has a task
+     * @param key queue name
+     * @return true if has; false if not
+     */
+    boolean hasTask(String key);
+
+    /**
      * check task exists in the task queue or not
      *
      * @param key queue name
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java
index 8f5677d..eb9e047 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java
@@ -76,6 +76,21 @@ public class TaskQueueZkImpl implements ITaskQueue {
     }
 
     /**
+     * check if has a task
+     * @param key queue name
+     * @return true if has; false if not
+     */
+    @Override
+    public boolean hasTask(String key) {
+        try {
+            return zookeeperOperator.hasChildren(key);
+        } catch (Exception e) {
+            logger.error("check has task in tasks queue exception",e);
+        }
+        return false;
+    }
+
+    /**
      * check task exists in the task queue or not
      *
      * @param key       queue name
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperOperator.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperOperator.java
index c6faec2..9442afd 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperOperator.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperOperator.java
@@ -27,6 +27,7 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.InitializingBean;
@@ -139,6 +140,16 @@ public class ZookeeperOperator implements InitializingBean 
{
         }
     }
 
+    public boolean hasChildren(final String key){
+        Stat stat ;
+        try {
+            stat = zkClient.checkExists().forPath(key);
+            return stat.getNumChildren() >= 1;
+        } catch (Exception ex) {
+            throw new IllegalStateException(ex);
+        }
+    }
+
     public boolean isExisted(final String key) {
         try {
             return zkClient.checkExists().forPath(key) != null;
diff --git 
a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueZKImplTest.java
 
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueZKImplTest.java
index 1b44673..b34a7d6 100644
--- 
a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueZKImplTest.java
+++ 
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueZKImplTest.java
@@ -64,7 +64,16 @@ public class TaskQueueZKImplTest extends BaseTaskQueueTest  {
         allTasks = 
tasksQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
         assertEquals(allTasks.size(),0);
     }
-
+    @Test
+    public void hasTask(){
+        init();
+        boolean hasTask = 
tasksQueue.hasTask(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
+        assertTrue(hasTask);
+        //delete all
+        tasksQueue.delete();
+        hasTask = tasksQueue.hasTask(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
+        assertFalse(hasTask);
+    }
     /**
      * test check task exists in the task queue or not
      */
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java
index 6a25931..4899f4b 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java
@@ -150,8 +150,9 @@ public class FetchTaskThread implements Runnable{
                 }
 
                 //whether have tasks, if no tasks , no need lock  //get all 
tasks
-                List<String> tasksQueueList = 
taskQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
-                if (CollectionUtils.isEmpty(tasksQueueList)){
+                boolean hasTask = 
taskQueue.hasTask(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
+
+                if (!hasTask){
                     Thread.sleep(Constants.SLEEP_TIME_MILLIS);
                     continue;
                 }

Reply via email to