This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 88b022c ITaskQueue should have "hasTask" function (#1744)
88b022c is described below
commit 88b022cb571c76bf6687998bde30d7608f19e1a8
Author: gabry.wu <[email protected]>
AuthorDate: Thu Jan 9 10:41:48 2020 +0800
ITaskQueue should have "hasTask" function (#1744)
* issue https://github.com/apache/incubator-dolphinscheduler/issues/1742
* remove unnecessary catch
* fix exception type
* fix bad code smell
* remove log exceptions
* catch exception
---
.../apache/dolphinscheduler/common/queue/ITaskQueue.java | 7 +++++++
.../dolphinscheduler/common/queue/TaskQueueZkImpl.java | 15 +++++++++++++++
.../dolphinscheduler/common/zk/ZookeeperOperator.java | 11 +++++++++++
.../common/queue/TaskQueueZKImplTest.java | 11 ++++++++++-
.../server/worker/runner/FetchTaskThread.java | 5 +++--
5 files changed, 46 insertions(+), 3 deletions(-)
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/ITaskQueue.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/ITaskQueue.java
index 6e937f0..5beb811 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/ITaskQueue.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/ITaskQueue.java
@@ -31,6 +31,13 @@ public interface ITaskQueue {
List<String> getAllTasks(String key);
/**
+ * check if has a task
+ * @param key queue name
+ * @return true if has; false if not
+ */
+ boolean hasTask(String key);
+
+ /**
* check task exists in the task queue or not
*
* @param key queue name
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java
index 8f5677d..eb9e047 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java
@@ -76,6 +76,21 @@ public class TaskQueueZkImpl implements ITaskQueue {
}
/**
+ * check if has a task
+ * @param key queue name
+ * @return true if has; false if not
+ */
+ @Override
+ public boolean hasTask(String key) {
+ try {
+ return zookeeperOperator.hasChildren(key);
+ } catch (Exception e) {
+ logger.error("check has task in tasks queue exception",e);
+ }
+ return false;
+ }
+
+ /**
* check task exists in the task queue or not
*
* @param key queue name
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperOperator.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperOperator.java
index c6faec2..9442afd 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperOperator.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperOperator.java
@@ -27,6 +27,7 @@ import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
@@ -139,6 +140,16 @@ public class ZookeeperOperator implements InitializingBean
{
}
}
+ public boolean hasChildren(final String key){
+ Stat stat ;
+ try {
+ stat = zkClient.checkExists().forPath(key);
+ return stat.getNumChildren() >= 1;
+ } catch (Exception ex) {
+ throw new IllegalStateException(ex);
+ }
+ }
+
public boolean isExisted(final String key) {
try {
return zkClient.checkExists().forPath(key) != null;
diff --git
a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueZKImplTest.java
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueZKImplTest.java
index 1b44673..b34a7d6 100644
---
a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueZKImplTest.java
+++
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueZKImplTest.java
@@ -64,7 +64,16 @@ public class TaskQueueZKImplTest extends BaseTaskQueueTest {
allTasks =
tasksQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
assertEquals(allTasks.size(),0);
}
-
+ @Test
+ public void hasTask(){
+ init();
+ boolean hasTask =
tasksQueue.hasTask(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
+ assertTrue(hasTask);
+ //delete all
+ tasksQueue.delete();
+ hasTask = tasksQueue.hasTask(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
+ assertFalse(hasTask);
+ }
/**
* test check task exists in the task queue or not
*/
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java
index 6a25931..4899f4b 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java
@@ -150,8 +150,9 @@ public class FetchTaskThread implements Runnable{
}
//whether have tasks, if no tasks , no need lock //get all
tasks
- List<String> tasksQueueList =
taskQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
- if (CollectionUtils.isEmpty(tasksQueueList)){
+ boolean hasTask =
taskQueue.hasTask(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
+
+ if (!hasTask){
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue;
}