This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new 8aaee39 task prioriry refator (#2094)
8aaee39 is described below
commit 8aaee39aa28e9d61eac25aee181fd48e790b929c
Author: qiaozhanwei <[email protected]>
AuthorDate: Thu Mar 5 18:25:48 2020 +0800
task prioriry refator (#2094)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
* add TaskInstanceCacheManager receive Worker report result
* TaskInstance setExecutePath
* add TaskInstanceCacheManager to receive Worker Task result report
* TaskInstanceCacheManager add remove method
* add license
* add dispatcht task method
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify
* worker remove db
* ShellTask modify
* master persistence processId and appIds
* master persistence processId and appIds
* master add kill task logic
* master add kill task logic
* master add kill task logic
* javadoc error modify
* remove chinese log
* executeDirectly method add Override
* remote module modify
* TaskKillResponseProcessor command type modify
* create buildKillCommand
* host add host:port format
* host add host:port format
* TaskAckProcessor modify
* TaskAckProcessor modify
* task prioriry refator
* remove ITaskQueue
---
.../apache/dolphinscheduler/common/Constants.java | 2 +
.../dolphinscheduler/dao/entity/TaskInstance.java | 14 +
.../server/entity/TaskPriority.java | 146 ++++++++
.../master/consumer/TaskUpdateQueueConsumer.java | 169 ++++++++++
.../server/master/processor/TaskAckProcessor.java | 5 +-
.../master/runner/MasterBaseTaskExecThread.java | 152 +++------
.../server/master/runner/MasterTaskExecThread.java | 5 +-
.../server/worker/WorkerServer.java | 8 +-
.../service/process/ProcessService.java | 42 +--
.../dolphinscheduler/service/queue/ITaskQueue.java | 102 ------
.../service/queue/TaskQueueFactory.java | 4 +-
.../service/queue/TaskQueueZkImpl.java | 375 ---------------------
.../service/queue/TaskUpdateQueue.java} | 50 ++-
.../service/queue/TaskUpdateQueueImpl.java | 115 +++++++
.../src/test/java/queue/TaskQueueZKImplTest.java | 229 -------------
.../src/test/java/queue/TaskUpdateQueueTest.java | 59 ++++
16 files changed, 588 insertions(+), 889 deletions(-)
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 2aff56e..67ce5fd 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -1004,4 +1004,6 @@ public final class Constants {
* default worker group
*/
public static final String DEFAULT_WORKER_GROUP = "default";
+
+ public static final Integer TASK_INFO_LENGTH = 5;
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index e444ad2..dc463fe 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -191,6 +191,11 @@ public class TaskInstance implements Serializable {
*/
private int workerGroupId;
+ /**
+ * workerGroup
+ */
+ private String workerGroup;
+
public ProcessInstance getProcessInstance() {
return processInstance;
}
@@ -460,6 +465,14 @@ public class TaskInstance implements Serializable {
this.dependentResult = dependentResult;
}
+ public String getWorkerGroup() {
+ return workerGroup;
+ }
+
+ public void setWorkerGroup(String workerGroup) {
+ this.workerGroup = workerGroup;
+ }
+
@Override
public String toString() {
return "TaskInstance{" +
@@ -492,6 +505,7 @@ public class TaskInstance implements Serializable {
", processInstancePriority=" + processInstancePriority +
", dependentResult='" + dependentResult + '\'' +
", workerGroupId=" + workerGroupId +
+ ", workerGroup='" + workerGroup + '\'' +
'}';
}
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java
new file mode 100644
index 0000000..7db5f45
--- /dev/null
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.entity;
+
+import static org.apache.dolphinscheduler.common.Constants.*;
+
+/**
+ * task priority info
+ */
+public class TaskPriority {
+
+ /**
+ * processInstancePriority
+ */
+ private int processInstancePriority;
+
+ /**
+ * processInstanceId
+ */
+ private int processInstanceId;
+
+ /**
+ * taskInstancePriority
+ */
+ private int taskInstancePriority;
+
+ /**
+ * taskId
+ */
+ private int taskId;
+
+ /**
+ * groupName
+ */
+ private String groupName;
+
+ /**
+ *
${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_${groupName}
+ */
+ private String taskPriorityInfo;
+
+ public TaskPriority(){}
+
+ public TaskPriority(int processInstancePriority,
+ int processInstanceId,
+ int taskInstancePriority,
+ int taskId, String groupName) {
+ this.processInstancePriority = processInstancePriority;
+ this.processInstanceId = processInstanceId;
+ this.taskInstancePriority = taskInstancePriority;
+ this.taskId = taskId;
+ this.groupName = groupName;
+ this.taskPriorityInfo = this.processInstancePriority +
+ UNDERLINE +
+ this.processInstanceId +
+ UNDERLINE +
+ this.taskInstancePriority +
+ UNDERLINE +
+ this.taskId +
+ UNDERLINE +
+ this.groupName;
+ }
+
+ public int getProcessInstancePriority() {
+ return processInstancePriority;
+ }
+
+ public void setProcessInstancePriority(int processInstancePriority) {
+ this.processInstancePriority = processInstancePriority;
+ }
+
+ public int getProcessInstanceId() {
+ return processInstanceId;
+ }
+
+ public void setProcessInstanceId(int processInstanceId) {
+ this.processInstanceId = processInstanceId;
+ }
+
+ public int getTaskInstancePriority() {
+ return taskInstancePriority;
+ }
+
+ public void setTaskInstancePriority(int taskInstancePriority) {
+ this.taskInstancePriority = taskInstancePriority;
+ }
+
+ public int getTaskId() {
+ return taskId;
+ }
+
+ public void setTaskId(int taskId) {
+ this.taskId = taskId;
+ }
+
+ public String getGroupName() {
+ return groupName;
+ }
+
+ public void setGroupName(String groupName) {
+ this.groupName = groupName;
+ }
+
+ public String getTaskPriorityInfo() {
+ return taskPriorityInfo;
+ }
+
+ public void setTaskPriorityInfo(String taskPriorityInfo) {
+ this.taskPriorityInfo = taskPriorityInfo;
+ }
+
+ /**
+ * taskPriorityInfo convert taskPriority
+ *
+ * @param taskPriorityInfo taskPriorityInfo
+ * @return TaskPriority
+ */
+ public static TaskPriority of(String taskPriorityInfo){
+ String[] parts = taskPriorityInfo.split(UNDERLINE);
+ if (parts.length != 4) {
+ throw new IllegalArgumentException(String.format("TaskPriority :
%s illegal.", taskPriorityInfo));
+ }
+ TaskPriority taskPriority = new TaskPriority(
+ Integer.parseInt(parts[0]),
+ Integer.parseInt(parts[1]),
+ Integer.parseInt(parts[2]),
+ Integer.parseInt(parts[3]),
+ parts[4]);
+ return taskPriority;
+ }
+}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
new file mode 100644
index 0000000..cccc700
--- /dev/null
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.consumer;
+
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.utils.FileUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.Tenant;
+import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskPriority;
+import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
+import
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
+import
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.queue.TaskUpdateQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * TaskUpdateQueue consumer
+ */
+@Component
+public class TaskUpdateQueueConsumer extends Thread{
+
+ /**
+ * logger of TaskUpdateQueueConsumer
+ */
+ private static final Logger logger =
LoggerFactory.getLogger(TaskUpdateQueueConsumer.class);
+
+ /**
+ * taskUpdateQueue
+ */
+ @Autowired
+ private TaskUpdateQueue taskUpdateQueue;
+
+ /**
+ * processService
+ */
+ @Autowired
+ private ProcessService processService;
+
+ /**
+ * executor dispatcher
+ */
+ @Autowired
+ private ExecutorDispatcher dispatcher;
+
+ @Override
+ public void run() {
+ while (Stopper.isRunning()){
+ try {
+ if (taskUpdateQueue.size() == 0){
+ continue;
+ }
+ String taskPriorityInfo = taskUpdateQueue.take();
+ TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo);
+ dispatch(taskPriority.getTaskId());
+ }catch (Exception e){
+ logger.error("dispatcher task error",e);
+ }
+ }
+ }
+
+
+ /**
+ * TODO dispatch task
+ *
+ * @param taskInstanceId taskInstanceId
+ * @return result
+ */
+ private Boolean dispatch(int taskInstanceId){
+ TaskExecutionContext context = getTaskExecutionContext(taskInstanceId);
+ ExecutionContext executionContext = new
ExecutionContext(context.toCommand(), ExecutorType.WORKER,
context.getWorkerGroup());
+ try {
+ return dispatcher.dispatch(executionContext);
+ } catch (ExecuteException e) {
+ logger.error("execute exception", e);
+ return false;
+ }
+
+ }
+
+ /**
+ * get TaskExecutionContext
+ * @param taskInstanceId taskInstanceId
+ * @return TaskExecutionContext
+ */
+ protected TaskExecutionContext getTaskExecutionContext(int taskInstanceId){
+ TaskInstance taskInstance =
processService.getTaskInstanceDetailByTaskId(taskInstanceId);
+
+ Integer userId = taskInstance.getProcessDefine() == null ? 0 :
taskInstance.getProcessDefine().getUserId();
+ Tenant tenant =
processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(),
userId);
+
+ // verify tenant is null
+ if (verifyTenantIsNull(tenant, taskInstance)) {
+ processService.changeTaskState(ExecutionStatus.FAILURE,
+ taskInstance.getStartTime(),
+ taskInstance.getHost(),
+ null,
+ null,
+ taskInstance.getId());
+ return null;
+ }
+ // set queue for process instance, user-specified queue takes
precedence over tenant queue
+ String userQueue =
processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
+
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ?
tenant.getQueue() : userQueue);
+
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
+ taskInstance.setExecutePath(getExecLocalPath(taskInstance));
+
+ return TaskExecutionContextBuilder.get()
+ .buildTaskInstanceRelatedInfo(taskInstance)
+
.buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance())
+
.buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine())
+ .create();
+ }
+
+ /**
+ * get execute local path
+ *
+ * @return execute local path
+ */
+ private String getExecLocalPath(TaskInstance taskInstance){
+ return
FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(),
+ taskInstance.getProcessDefine().getId(),
+ taskInstance.getProcessInstance().getId(),
+ taskInstance.getId());
+ }
+
+
+ /**
+ * whehter tenant is null
+ * @param tenant tenant
+ * @param taskInstance taskInstance
+ * @return result
+ */
+ private boolean verifyTenantIsNull(Tenant tenant, TaskInstance
taskInstance) {
+ if(tenant == null){
+ logger.error("tenant not exists,process instance id : {},task
instance id : {}",
+ taskInstance.getProcessInstance().getId(),
+ taskInstance.getId());
+ return true;
+ }
+ return false;
+ }
+
+
+
+}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
index ef2cb67..8f0b731 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
@@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import
org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import
org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
@@ -66,12 +67,14 @@ public class TaskAckProcessor implements
NettyRequestProcessor {
logger.info("taskAckCommand : {}", taskAckCommand);
taskInstanceCacheManager.cacheTaskInstance(taskAckCommand);
+
+ String workerAddress = ChannelUtils.toAddress(channel).getAddress();
/**
* change Task state
*/
processService.changeTaskState(ExecutionStatus.of(taskAckCommand.getStatus()),
taskAckCommand.getStartTime(),
- taskAckCommand.getHost(),
+ workerAddress,
taskAckCommand.getExecutePath(),
taskAckCommand.getLogPath(),
taskAckCommand.getTaskInstanceId());
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
index 71bb8f8..c8b7b0e 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
@@ -17,26 +17,18 @@
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.dao.entity.Tenant;
-import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
-import
org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
-import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
-import
org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.queue.ITaskQueue;
-import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
+import org.apache.dolphinscheduler.service.queue.TaskUpdateQueue;
+import org.apache.dolphinscheduler.service.queue.TaskUpdateQueueImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.dolphinscheduler.common.Constants.*;
import java.util.concurrent.Callable;
@@ -72,11 +64,6 @@ public class MasterBaseTaskExecThread implements
Callable<Boolean> {
protected TaskInstance taskInstance;
/**
- * task queue
- */
- protected ITaskQueue taskQueue;
-
- /**
* whether need cancel
*/
protected boolean cancel;
@@ -86,12 +73,10 @@ public class MasterBaseTaskExecThread implements
Callable<Boolean> {
*/
private MasterConfig masterConfig;
-
/**
- * executor dispatcher
+ * taskUpdateQueue
*/
- private ExecutorDispatcher dispatcher;
-
+ private TaskUpdateQueue taskUpdateQueue;
/**
* constructor of MasterBaseTaskExecThread
* @param taskInstance task instance
@@ -101,11 +86,10 @@ public class MasterBaseTaskExecThread implements
Callable<Boolean> {
this.processService =
SpringApplicationContext.getBean(ProcessService.class);
this.alertDao = SpringApplicationContext.getBean(AlertDao.class);
this.processInstance = processInstance;
- this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
this.cancel = false;
this.taskInstance = taskInstance;
this.masterConfig =
SpringApplicationContext.getBean(MasterConfig.class);
- this.dispatcher =
SpringApplicationContext.getBean(ExecutorDispatcher.class);
+ this.taskUpdateQueue = new TaskUpdateQueueImpl();
}
/**
@@ -124,87 +108,6 @@ public class MasterBaseTaskExecThread implements
Callable<Boolean> {
}
/**
- * TODO 分发任务
- * dispatch task to worker
- * @param taskInstance
- */
- private Boolean dispatch(TaskInstance taskInstance){
- TaskExecutionContext context = getTaskExecutionContext(taskInstance);
- ExecutionContext executionContext = new
ExecutionContext(context.toCommand(), ExecutorType.WORKER,
context.getWorkerGroup());
- try {
- return dispatcher.dispatch(executionContext);
- } catch (ExecuteException e) {
- logger.error("execute exception", e);
- return false;
- }
-
- }
-
- /**
- * get TaskExecutionContext
- *
- * @param taskInstance taskInstance
- * @return TaskExecutionContext
- */
- protected TaskExecutionContext getTaskExecutionContext(TaskInstance
taskInstance){
- taskInstance =
processService.getTaskInstanceDetailByTaskId(taskInstance.getId());
-
- Integer userId = taskInstance.getProcessDefine() == null ? 0 :
taskInstance.getProcessDefine().getUserId();
- Tenant tenant =
processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(),
userId);
-
- // verify tenant is null
- if (verifyTenantIsNull(tenant, taskInstance)) {
- processService.changeTaskState(ExecutionStatus.FAILURE,
- taskInstance.getStartTime(),
- taskInstance.getHost(),
- null,
- null,
- taskInstance.getId());
- return null;
- }
- // set queue for process instance, user-specified queue takes
precedence over tenant queue
- String userQueue =
processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
-
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ?
tenant.getQueue() : userQueue);
-
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
- taskInstance.setExecutePath(getExecLocalPath(taskInstance));
-
- return TaskExecutionContextBuilder.get()
- .buildTaskInstanceRelatedInfo(taskInstance)
-
.buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance())
-
.buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine())
- .create();
- }
-
-
- /**
- * get execute local path
- *
- * @return execute local path
- */
- private String getExecLocalPath(TaskInstance taskInstance){
- return
FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(),
- taskInstance.getProcessDefine().getId(),
- taskInstance.getProcessInstance().getId(),
- taskInstance.getId());
- }
-
-
- /**
- * whehter tenant is null
- * @param tenant tenant
- * @param taskInstance taskInstance
- * @return result
- */
- private boolean verifyTenantIsNull(Tenant tenant, TaskInstance
taskInstance) {
- if(tenant == null){
- logger.error("tenant not exists,process instance id : {},task
instance id : {}",
- taskInstance.getProcessInstance().getId(),
- taskInstance.getId());
- return true;
- }
- return false;
- }
- /**
* submit master base task exec thread
* @return TaskInstance
*/
@@ -268,10 +171,20 @@ public class MasterBaseTaskExecThread implements
Callable<Boolean> {
logger.info(String.format("submit to task, but task [%s] state
already be running. ", taskInstance.getName()));
return true;
}
- logger.info("task ready to submit: {}" , taskInstance);
- boolean submitTask = dispatch(taskInstance);
+ logger.info("task ready to submit: {}", taskInstance);
+
+ /**
+ * taskPriorityInfo
+ */
+ String taskPriorityInfo =
buildTaskPriorityInfo(processInstance.getProcessInstancePriority().getCode(),
+ processInstance.getId(),
+ taskInstance.getProcessInstancePriority().getCode(),
+ taskInstance.getId(),
+ taskInstance.getWorkerGroup());
+
+ taskUpdateQueue.put(taskPriorityInfo);
logger.info(String.format("master submit success, task : %s",
taskInstance.getName()) );
- return submitTask;
+ return true;
}catch (Exception e){
logger.error("submit task Exception: ", e);
logger.error("task error : %s", JSONUtils.toJson(taskInstance));
@@ -279,6 +192,33 @@ public class MasterBaseTaskExecThread implements
Callable<Boolean> {
}
}
+
+ /**
+ * buildTaskPriorityInfo
+ *
+ * @param processInstancePriority processInstancePriority
+ * @param processInstanceId processInstanceId
+ * @param taskInstancePriority taskInstancePriority
+ * @param taskInstanceId taskInstanceId
+ * @param workerGroup workerGroup
+ * @return TaskPriorityInfo
+ */
+ private String buildTaskPriorityInfo(int processInstancePriority,
+ int processInstanceId,
+ int taskInstancePriority,
+ int taskInstanceId,
+ String workerGroup){
+ return processInstancePriority +
+ UNDERLINE +
+ processInstanceId +
+ UNDERLINE +
+ taskInstancePriority +
+ UNDERLINE +
+ taskInstanceId +
+ UNDERLINE +
+ workerGroup;
+ }
+
/**
* submit wait complete
* @return true
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
index fb3f8e9..1197dc2 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
@@ -182,7 +182,10 @@ public class MasterTaskExecThread extends
MasterBaseTaskExecThread {
}
alreadyKilled = true;
- TaskExecutionContext taskExecutionContext =
super.getTaskExecutionContext(taskInstance);
+ TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
+ taskExecutionContext.setTaskInstanceId(taskInstance.getId());
+ taskExecutionContext.setProcessId(taskInstance.getPid());
+
ExecutionContext executionContext = new
ExecutionContext(taskExecutionContext.toKillCommand(), ExecutorType.WORKER,
taskExecutionContext.getWorkerGroup());
Host host = Host.of(taskInstance.getHost());
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index cfb94b5..fb35f47 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -31,7 +31,6 @@ import
org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.server.zk.ZKWorkerClient;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.queue.ITaskQueue;
import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,10 +60,7 @@ public class WorkerServer implements IStoppable {
@Autowired
private ZKWorkerClient zkWorkerClient = null;
- /**
- * task queue impl
- */
- protected ITaskQueue taskQueue;
+
/**
* fetch task executor service
@@ -136,7 +132,7 @@ public class WorkerServer implements IStoppable {
this.zkWorkerClient.init();
- this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
+
this.fetchTaskExecutorService =
ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor");
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 55cd634..ca0ed79 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -29,7 +29,6 @@ import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
-import org.apache.dolphinscheduler.service.queue.ITaskQueue;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -99,11 +98,6 @@ public class ProcessService {
private ProjectMapper projectMapper;
/**
- * task queue impl
- */
- @Autowired
- private ITaskQueue taskQueue;
- /**
* handle Command (construct ProcessInstance from Command) , wrapped in
transaction
* @param logger logger
* @param host host
@@ -960,40 +954,7 @@ public class ProcessService {
return taskInstance;
}
- /**
- * submit task to queue
- * @param taskInstance taskInstance
- * @return whether submit task to queue success
- */
- public Boolean submitTaskToQueue(TaskInstance taskInstance) {
- try{
- if(taskInstance.isSubProcess()){
- return true;
- }
- if(taskInstance.getState().typeIsFinished()){
- logger.info(String.format("submit to task queue, but task [%s]
state [%s] is already finished. ", taskInstance.getName(),
taskInstance.getState().toString()));
- return true;
- }
- // task cannot submit when running
- if(taskInstance.getState() == ExecutionStatus.RUNNING_EXEUTION){
- logger.info(String.format("submit to task queue, but task [%s]
state already be running. ", taskInstance.getName()));
- return true;
- }
- if(checkTaskExistsInTaskQueue(taskInstance)){
- logger.info(String.format("submit to task queue, but task [%s]
already exists in the queue.", taskInstance.getName()));
- return true;
- }
- logger.info("task ready to queue: {}" , taskInstance);
- boolean insertQueueResult =
taskQueue.add(DOLPHINSCHEDULER_TASKS_QUEUE, taskZkInfo(taskInstance));
- logger.info(String.format("master insert into queue success, task
: %s", taskInstance.getName()) );
- return insertQueueResult;
- }catch (Exception e){
- logger.error("submit task to queue Exception: ", e);
- logger.error("task queue error : %s",
JSONUtils.toJson(taskInstance));
- return false;
- }
- }
/**
*
${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskInstanceId}_${task
executed by ip1},${ip2}...
@@ -1127,7 +1088,8 @@ public class ProcessService {
String taskZkInfo = taskZkInfo(taskInstance);
- return taskQueue.checkTaskExists(DOLPHINSCHEDULER_TASKS_QUEUE,
taskZkInfo);
+// return taskQueue.checkTaskExists(DOLPHINSCHEDULER_TASKS_QUEUE,
taskZkInfo);
+ return false;
}
/**
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/ITaskQueue.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/ITaskQueue.java
deleted file mode 100644
index bed8a11..0000000
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/ITaskQueue.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.service.queue;
-
-import java.util.List;
-import java.util.Set;
-
-public interface ITaskQueue {
-
- /**
- * take out all the elements
- *
- *
- * @param key
- * @return
- */
- List<String> getAllTasks(String key);
-
- /**
- * check if has a task
- * @param key queue name
- * @return true if has; false if not
- */
- boolean hasTask(String key);
-
- /**
- * check task exists in the task queue or not
- *
- * @param key queue name
- * @param task
${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
- * @return true if exists in the queue
- */
- boolean checkTaskExists(String key, String task);
-
- /**
- * add an element to the queue
- *
- * @param key queue name
- * @param value
- */
- boolean add(String key, String value);
-
- /**
- * an element pops out of the queue
- *
- * @param key queue name
- * @param n how many elements to poll
- * @return
- */
- List<String> poll(String key, int n);
-
- /**
- * remove a element from queue
- * @param key
- * @param value
- */
- void removeNode(String key, String value);
-
- /**
- * add an element to the set
- *
- * @param key
- * @param value
- */
- void sadd(String key, String value);
-
- /**
- * delete the value corresponding to the key in the set
- *
- * @param key
- * @param value
- */
- void srem(String key, String value);
-
- /**
- * gets all the elements of the set based on the key
- *
- * @param key
- * @return
- */
- Set<String> smembers(String key);
-
-
- /**
- * clear the task queue for use by junit tests only
- */
- void delete();
-}
\ No newline at end of file
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueFactory.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueFactory.java
index 6be419f..3ea3195 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueFactory.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueFactory.java
@@ -40,11 +40,11 @@ public class TaskQueueFactory {
*
* @return instance
*/
- public static ITaskQueue getTaskQueueInstance() {
+ public static TaskUpdateQueue getTaskQueueInstance() {
String queueImplValue = CommonUtils.getQueueImplValue();
if (StringUtils.isNotBlank(queueImplValue)) {
logger.info("task queue impl use zookeeper ");
- return SpringApplicationContext.getBean(TaskQueueZkImpl.class);
+ return SpringApplicationContext.getBean(TaskUpdateQueueImpl.class);
}else{
logger.error("property dolphinscheduler.queue.impl can't be blank,
system will exit ");
System.exit(-1);
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueZkImpl.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueZkImpl.java
deleted file mode 100644
index 874512c..0000000
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueZkImpl.java
+++ /dev/null
@@ -1,375 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.service.queue;
-
-
-import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.utils.IpUtils;
-import org.apache.dolphinscheduler.common.utils.OSUtils;
-import org.apache.dolphinscheduler.service.zk.ZookeeperOperator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import java.util.*;
-
-/**
- * A singleton of a task queue implemented with zookeeper
- * tasks queue implementation
- */
-@Service
-public class TaskQueueZkImpl implements ITaskQueue {
-
- private static final Logger logger =
LoggerFactory.getLogger(TaskQueueZkImpl.class);
-
- private final ZookeeperOperator zookeeperOperator;
-
- @Autowired
- public TaskQueueZkImpl(ZookeeperOperator zookeeperOperator) {
- this.zookeeperOperator = zookeeperOperator;
-
- try {
- String tasksQueuePath =
getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
- String tasksKillPath =
getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_KILL);
-
- for (String key : new String[]{tasksQueuePath,tasksKillPath}){
- if (!zookeeperOperator.isExisted(key)){
- zookeeperOperator.persist(key, "");
- logger.info("create tasks queue parent node success : {}",
key);
- }
- }
- } catch (Exception e) {
- logger.error("create tasks queue parent node failure", e);
- }
- }
-
-
- /**
- * get all tasks from tasks queue
- * @param key task queue name
- * @return
- */
- @Override
- public List<String> getAllTasks(String key) {
- try {
- List<String> list =
zookeeperOperator.getChildrenKeys(getTasksPath(key));
- return list;
- } catch (Exception e) {
- logger.error("get all tasks from tasks queue exception",e);
- }
- return Collections.emptyList();
- }
-
- /**
- * check if has a task
- * @param key queue name
- * @return true if has; false if not
- */
- @Override
- public boolean hasTask(String key) {
- try {
- return zookeeperOperator.hasChildren(getTasksPath(key));
- } catch (Exception e) {
- logger.error("check has task in tasks queue exception",e);
- }
- return false;
- }
-
- /**
- * check task exists in the task queue or not
- *
- * @param key queue name
- * @param task
${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
- * @return true if exists in the queue
- */
- @Override
- public boolean checkTaskExists(String key, String task) {
- String taskPath = getTasksPath(key) + Constants.SINGLE_SLASH + task;
-
- return zookeeperOperator.isExisted(taskPath);
-
- }
-
-
- /**
- * add task to tasks queue
- *
- * @param key task queue name
- * @param value
${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,...
- */
- @Override
- public boolean add(String key, String value){
- try {
- String taskIdPath = getTasksPath(key) + Constants.SINGLE_SLASH +
value;
- zookeeperOperator.persist(taskIdPath, value);
- return true;
- } catch (Exception e) {
- logger.error("add task to tasks queue exception",e);
- return false;
- }
-
- }
-
-
- /**
- * An element pops out of the queue <p>
- * note:
- *
${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,...
- * The tasks with the highest priority are selected by comparing the
priorities of the above four levels from high to low.
- *
- * @param key task queue name
- * @param tasksNum how many elements to poll
- * @return the task ids to be executed
- */
- @Override
- public List<String> poll(String key, int tasksNum) {
- try{
- List<String> list =
zookeeperOperator.getChildrenKeys(getTasksPath(key));
-
- if(list != null && list.size() > 0){
-
- String workerIp = OSUtils.getHost();
- String workerIpLongStr =
String.valueOf(IpUtils.ipToLong(workerIp));
-
- int size = list.size();
-
- Set<String> taskTreeSet = new TreeSet<>(new
Comparator<String>() {
- @Override
- public int compare(String o1, String o2) {
-
- String s1 = o1;
- String s2 = o2;
- String[] s1Array = s1.split(Constants.UNDERLINE);
- if(s1Array.length>4){
- // warning: if this length > 5, need to be changed
- s1 = s1.substring(0,
s1.lastIndexOf(Constants.UNDERLINE) );
- }
-
- String[] s2Array = s2.split(Constants.UNDERLINE);
- if(s2Array.length>4){
- // warning: if this length > 5, need to be changed
- s2 = s2.substring(0,
s2.lastIndexOf(Constants.UNDERLINE) );
- }
-
- return s1.compareTo(s2);
- }
- });
-
- for (int i = 0; i < size; i++) {
-
- String taskDetail = list.get(i);
- String[] taskDetailArrs =
taskDetail.split(Constants.UNDERLINE);
-
- //forward compatibility
- if(taskDetailArrs.length >= 4){
-
- //format
${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
- String formatTask = String.format("%s_%010d_%s_%010d",
taskDetailArrs[0], Long.parseLong(taskDetailArrs[1]), taskDetailArrs[2],
Long.parseLong(taskDetailArrs[3]));
- if(taskDetailArrs.length > 4){
- String taskHosts = taskDetailArrs[4];
-
- //task can assign to any worker host if equals
default ip value of worker server
-
if(!taskHosts.equals(String.valueOf(Constants.DEFAULT_WORKER_ID))){
- String[] taskHostsArr =
taskHosts.split(Constants.COMMA);
-
if(!Arrays.asList(taskHostsArr).contains(workerIpLongStr)){
- continue;
- }
- }
- formatTask += Constants.UNDERLINE +
taskDetailArrs[4];
- }
- taskTreeSet.add(formatTask);
- }
- }
-
- List<String> tasksList = getTasksListFromTreeSet(tasksNum,
taskTreeSet);
-
- logger.info("consume tasks: {},there still have {} tasks need
to be executed", Arrays.toString(tasksList.toArray()), size - tasksList.size());
-
- return tasksList;
- }else{
- Thread.sleep(Constants.SLEEP_TIME_MILLIS);
- }
-
- } catch (Exception e) {
- logger.error("add task to tasks queue exception",e);
- }
- return Collections.emptyList();
- }
-
-
- /**
- * get task list from tree set
- *
- * @param tasksNum
- * @param taskTreeSet
- */
- public List<String> getTasksListFromTreeSet(int tasksNum, Set<String>
taskTreeSet) {
- Iterator<String> iterator = taskTreeSet.iterator();
- int j = 0;
- List<String> tasksList = new ArrayList<>(tasksNum);
- while(iterator.hasNext()){
- if(j++ >= tasksNum){
- break;
- }
- String task = iterator.next();
- tasksList.add(getOriginTaskFormat(task));
- }
- return tasksList;
- }
-
- /**
- * format
${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
- * processInstanceId and task id need to be convert to int.
- * @param formatTask
- * @return
- */
- private String getOriginTaskFormat(String formatTask){
- String[] taskArray = formatTask.split(Constants.UNDERLINE);
- if(taskArray.length< 4){
- return formatTask;
- }
- int processInstanceId = Integer.parseInt(taskArray[1]);
- int taskId = Integer.parseInt(taskArray[3]);
-
- StringBuilder sb = new StringBuilder(50);
- String destTask = String.format("%s_%s_%s_%s", taskArray[0],
processInstanceId, taskArray[2], taskId);
-
- sb.append(destTask);
-
- if(taskArray.length > 4){
- for(int index = 4; index < taskArray.length; index++){
- sb.append(Constants.UNDERLINE).append(taskArray[index]);
- }
- }
- return sb.toString();
- }
-
- @Override
- public void removeNode(String key, String nodeValue){
-
- String tasksQueuePath = getTasksPath(key) + Constants.SINGLE_SLASH;
- String taskIdPath = tasksQueuePath + nodeValue;
- logger.info("removeNode task {}", taskIdPath);
- try{
- zookeeperOperator.remove(taskIdPath);
-
- }catch(Exception e){
- logger.error(String.format("delete task:%s from zookeeper fail,
exception:" ,nodeValue) ,e);
- }
-
- }
-
-
-
- /**
- * In order to be compatible with redis implementation
- *
- * To be compatible with the redis implementation, add an element to the
set
- * @param key The key is the kill/cancel queue path name
- * @param value host-taskId The name of the zookeeper node
- */
- @Override
- public void sadd(String key,String value) {
- try {
-
- if(value != null && value.trim().length() > 0){
- String path = getTasksPath(key) + Constants.SINGLE_SLASH;
- if(!zookeeperOperator.isExisted(path + value)){
- zookeeperOperator.persist(path + value,value);
- logger.info("add task:{} to tasks set ",value);
- } else{
- logger.info("task {} exists in tasks set ",value);
- }
-
- }else{
- logger.warn("add host-taskId:{} to tasks set is empty ",value);
- }
-
- } catch (Exception e) {
- logger.error("add task to tasks set exception",e);
- }
- }
-
-
- /**
- * delete the value corresponding to the key in the set
- * @param key The key is the kill/cancel queue path name
- * @param value host-taskId-taskType The name of the zookeeper node
- */
- @Override
- public void srem(String key, String value) {
- try{
- String path = getTasksPath(key) + Constants.SINGLE_SLASH;
- zookeeperOperator.remove(path + value);
-
- }catch(Exception e){
- logger.error(String.format("delete task:" + value + "
exception"),e);
- }
- }
-
-
- /**
- * Gets all the elements of the set based on the key
- * @param key The key is the kill/cancel queue path name
- * @return
- */
- @Override
- public Set<String> smembers(String key) {
- try {
- List<String> list =
zookeeperOperator.getChildrenKeys(getTasksPath(key));
- return new HashSet<>(list);
- } catch (Exception e) {
- logger.error("get all tasks from tasks queue exception",e);
- }
- return Collections.emptySet();
- }
-
- /**
- * Clear the task queue of zookeeper node
- */
- @Override
- public void delete(){
- try {
- String tasksQueuePath =
getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
- String tasksKillPath =
getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_KILL);
-
- for (String key : new String[]{tasksQueuePath,tasksKillPath}){
- if (zookeeperOperator.isExisted(key)){
- List<String> list = zookeeperOperator.getChildrenKeys(key);
- for (String task : list) {
- zookeeperOperator.remove(key + Constants.SINGLE_SLASH
+ task);
- logger.info("delete task from tasks queue : {}/{} ",
key, task);
- }
- }
- }
-
- } catch (Exception e) {
- logger.error("delete all tasks in tasks queue failure", e);
- }
- }
-
- /**
- * Get the task queue path
- * @param key task queue name
- * @return
- */
- public String getTasksPath(String key){
- return zookeeperOperator.getZookeeperConfig().getDsRoot() +
Constants.SINGLE_SLASH + key;
- }
-
-}
diff --git
a/dolphinscheduler-service/src/test/java/queue/BaseTaskQueueTest.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueue.java
similarity index 52%
rename from dolphinscheduler-service/src/test/java/queue/BaseTaskQueueTest.java
rename to
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueue.java
index a0cc457..48f510e 100644
--- a/dolphinscheduler-service/src/test/java/queue/BaseTaskQueueTest.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueue.java
@@ -14,35 +14,31 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package queue;
+package org.apache.dolphinscheduler.service.queue;
-import org.apache.dolphinscheduler.service.queue.ITaskQueue;
-import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
-import org.junit.*;
-/**
- * base task queue test for only start zk server once
- */
-@Ignore
-public class BaseTaskQueueTest {
+public interface TaskUpdateQueue {
- protected static ITaskQueue tasksQueue = null;
+ /**
+ * put task info
+ *
+ * @param taskInfo taskInfo
+ * @throws Exception
+ */
+ void put(String taskInfo) throws Exception;
- @BeforeClass
- public static void setup() {
- ZKServer.start();
- tasksQueue = TaskQueueFactory.getTaskQueueInstance();
- //clear all data
- tasksQueue.delete();
- }
+ /**
+ * take taskInfo
+ * @return taskInfo
+ * @throws Exception
+ */
+ String take()throws Exception;
- @AfterClass
- public static void tearDown() {
- tasksQueue.delete();
- ZKServer.stop();
- }
- @Test
- public void tasksQueueNotNull(){
- Assert.assertNotNull(tasksQueue);
- }
-}
+ /**
+ * size
+ *
+ * @return size
+ * @throws Exception
+ */
+ int size() throws Exception;
+}
\ No newline at end of file
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueueImpl.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueueImpl.java
new file mode 100644
index 0000000..fda5a4c
--- /dev/null
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueueImpl.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.service.queue;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import java.util.*;
+import java.util.concurrent.PriorityBlockingQueue;
+
+import static org.apache.dolphinscheduler.common.Constants.*;
+
+/**
+ * A singleton of a task queue implemented with zookeeper
+ * tasks queue implementation
+ */
+@Service
+public class TaskUpdateQueueImpl implements TaskUpdateQueue {
+
+ private static final Logger logger =
LoggerFactory.getLogger(TaskUpdateQueueImpl.class);
+
+ /**
+ * queue size
+ */
+ private static final Integer QUEUE_MAX_SIZE = 100;
+
+ /**
+ * queue
+ */
+ private PriorityBlockingQueue<String> queue = new
PriorityBlockingQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator());
+
+ /**
+ * put task takePriorityInfo
+ *
+ * @param taskPriorityInfo takePriorityInfo
+ * @throws Exception
+ */
+ @Override
+ public void put(String taskPriorityInfo) throws Exception {
+
+ if (QUEUE_MAX_SIZE.equals(queue.size())){
+ //TODO need persist db , then load from db to queue when queue
size is zero
+ logger.error("queue is full...");
+ return;
+ }
+ queue.put(taskPriorityInfo);
+ }
+
+ /**
+ * take taskInfo
+ * @return taskInfo
+ * @throws Exception
+ */
+ @Override
+ public String take() throws Exception {
+ return queue.take();
+ }
+
+ /**
+ * queue size
+ * @return size
+ * @throws Exception
+ */
+ @Override
+ public int size() throws Exception {
+ return queue.size();
+ }
+
+ /**
+ * TaskInfoComparator
+ */
+ private class TaskInfoComparator implements Comparator<String>{
+
+ /**
+ * compare o1 o2
+ * @param o1 o1
+ * @param o2 o2
+ * @return compare result
+ */
+ @Override
+ public int compare(String o1, String o2) {
+ String s1 = o1;
+ String s2 = o2;
+ String[] s1Array = s1.split(UNDERLINE);
+ if(s1Array.length > TASK_INFO_LENGTH){
+ // warning: if this length > 5, need to be changed
+ s1 = s1.substring(0, s1.lastIndexOf(UNDERLINE) );
+ }
+
+ String[] s2Array = s2.split(UNDERLINE);
+ if(s2Array.length > TASK_INFO_LENGTH){
+ // warning: if this length > 5, need to be changed
+ s2 = s2.substring(0, s2.lastIndexOf(UNDERLINE) );
+ }
+
+ return s1.compareTo(s2);
+ }
+ }
+}
diff --git
a/dolphinscheduler-service/src/test/java/queue/TaskQueueZKImplTest.java
b/dolphinscheduler-service/src/test/java/queue/TaskQueueZKImplTest.java
deleted file mode 100644
index d29c5aa..0000000
--- a/dolphinscheduler-service/src/test/java/queue/TaskQueueZKImplTest.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package queue;
-
-import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.utils.IpUtils;
-import org.apache.dolphinscheduler.common.utils.OSUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.List;
-import java.util.Random;
-
-import static org.junit.Assert.*;
-
-/**
- * task queue test
- */
-@Ignore
-public class TaskQueueZKImplTest extends BaseTaskQueueTest {
-
- @Before
- public void before(){
-
- //clear all data
- tasksQueue.delete();
- }
-
- @After
- public void after(){
- //clear all data
- tasksQueue.delete();
- }
-
- /**
- * test take out all the elements
- */
- @Test
- public void getAllTasks(){
-
- //add
- init();
- // get all
- List<String> allTasks =
tasksQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
- assertEquals(allTasks.size(),2);
- //delete all
- tasksQueue.delete();
- allTasks =
tasksQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
- assertEquals(allTasks.size(),0);
- }
- @Test
- public void hasTask(){
- init();
- boolean hasTask =
tasksQueue.hasTask(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
- assertTrue(hasTask);
- //delete all
- tasksQueue.delete();
- hasTask = tasksQueue.hasTask(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
- assertFalse(hasTask);
- }
- /**
- * test check task exists in the task queue or not
- */
- @Test
- public void checkTaskExists(){
-
- String task= "1_0_1_1_-1";
- //add
- init();
- // check Exist true
- boolean taskExists =
tasksQueue.checkTaskExists(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, task);
- assertTrue(taskExists);
-
- //remove task
- tasksQueue.removeNode(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task);
- // check Exist false
- taskExists =
tasksQueue.checkTaskExists(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, task);
- assertFalse(taskExists);
- }
-
- /**
- * test add element to the queue
- */
- @Test
- public void add(){
-
- //add
- tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"1_0_1_1_-1");
- tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"0_1_1_1_-1");
- tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"0_0_0_1_" +
IpUtils.ipToLong(OSUtils.getHost()));
- tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"1_2_1_1_" +
IpUtils.ipToLong(OSUtils.getHost()) + 10);
-
- List<String> tasks =
tasksQueue.poll(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, 1);
-
- if(tasks.size() <= 0){
- return;
- }
-
- //pop
- String node1 = tasks.get(0);
- assertEquals(node1,"0_0_0_1_" + IpUtils.ipToLong(OSUtils.getHost()));
- }
-
- /**
- * test element pops out of the queue
- */
- @Test
- public void poll(){
-
- //add
- init();
- List<String> taskList =
tasksQueue.poll(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, 2);
- assertEquals(taskList.size(),2);
-
- assertEquals(taskList.get(0),"0_1_1_1_-1");
- assertEquals(taskList.get(1),"1_0_1_1_-1");
- }
-
- /**
- * test remove element from queue
- */
- @Test
- public void removeNode(){
- String task = "1_0_1_1_-1";
- //add
- init();
- tasksQueue.removeNode(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task);
-
assertFalse(tasksQueue.checkTaskExists(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task));
- }
-
- /**
- * test add an element to the set
- */
- @Test
- public void sadd(){
-
- String task = "1_0_1_1_-1";
- tasksQueue.sadd(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task);
- //check size
-
assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),1);
- }
-
-
- /**
- * test delete the value corresponding to the key in the set
- */
- @Test
- public void srem(){
-
- String task = "1_0_1_1_-1";
- tasksQueue.sadd(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task);
- //check size
-
assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),1);
- //remove and get size
- tasksQueue.srem(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task);
-
assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),0);
- }
-
- /**
- * test gets all the elements of the set based on the key
- */
- @Test
- public void smembers(){
-
- //first init
-
assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),0);
- //add
- String task = "1_0_1_1_-1";
- tasksQueue.sadd(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task);
- //check size
-
assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),1);
- //add
- task = "0_1_1_1_";
- tasksQueue.sadd(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task);
- //check size
-
assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),2);
- }
-
-
- /**
- * init data
- */
- private void init(){
- //add
- tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"1_0_1_1_-1");
- tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"0_1_1_1_-1");
- }
-
-
-
- /**
- * test one million data from zookeeper queue
- */
- @Ignore
- @Test
- public void extremeTest(){
- int total = 30 * 10000;
-
- for(int i = 0; i < total; i++) {
- for(int j = 0; j < total; j++) {
-
//${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
- //format
${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
- String formatTask = String.format("%s_%d_%s_%d", i, i + 1, j,
j == 0 ? 0 : j + new Random().nextInt(100));
- tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,
formatTask);
- }
- }
-
- String node1 = tasksQueue.poll(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,
1).get(0);
- assertEquals(node1,"0");
-
- }
-
-}
diff --git
a/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java
b/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java
new file mode 100644
index 0000000..a0e4fad
--- /dev/null
+++ b/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package queue;
+
+import org.apache.dolphinscheduler.service.queue.TaskUpdateQueue;
+import org.apache.dolphinscheduler.service.queue.TaskUpdateQueueImpl;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TaskUpdateQueueTest {
+
+ /**
+ * test put
+ */
+ @Test
+ public void testQueue() throws Exception{
+
+ //
${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_${groupName}
+
+ /**
+ * 1_1_2_1_default
+ * 1_1_2_2_default
+ * 1_1_0_3_default
+ * 1_1_0_4_default
+ */
+
+ String taskInfo1 = "1_1_2_1_default";
+ String taskInfo2 = "1_1_2_2_default";
+ String taskInfo3 = "1_1_0_3_default";
+ String taskInfo4 = "1_1_0_4_default";
+
+ TaskUpdateQueue queue = new TaskUpdateQueueImpl();
+ queue.put(taskInfo1);
+ queue.put(taskInfo2);
+ queue.put(taskInfo3);
+ queue.put(taskInfo4);
+
+ assertEquals("1_1_0_3_default", queue.take());
+ assertEquals("1_1_0_4_default", queue.take());
+ assertEquals("1_1_2_1_default",queue.take());
+ assertEquals("1_1_2_2_default",queue.take());
+ }
+}