Hi zhanwei,
   what do you mean "another situation is db task insert is slow" .
   Task should save into db and then save to zk queue orderly by
MasterServer. otherwise , it may occur inaccurate task status or data
inconsistency.  and we have to keep more code to keep it right like the
above way


On Tue, Dec 10, 2019 at 2:47 PM qiao zhanwei <[email protected]>
wrote:

> Currently the communication between master and worker is through zk as a
> queue
>
> 1.master inserts tasks into zk and db as transactions
>
> @Transactional(rollbackFor = Exception.class)
>     public TaskInstance submitTask(TaskInstance taskInstance,
> ProcessInstance processInstance){
>         logger.info("start submit task : {}, instance id:{}, state: {}, ",
>                 taskInstance.getName(), processInstance.getId(),
> processInstance.getState() );
>         processInstance =
> this.findProcessInstanceDetailById(processInstance.getId());
>         //submit to mysql
>         TaskInstance task= submitTaskInstanceToMysql(taskInstance,
> processInstance);
>         if(task.isSubProcess() && !task.getState().typeIsFinished()){
>             ProcessInstanceMap processInstanceMap =
> setProcessInstanceMap(processInstance, task);
>
>             TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(),
> TaskNode.class);
>             Map<String, String> subProcessParam =
> JSONUtils.toMap(taskNode.getParams());
>             Integer defineId =
> Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID));
>             createSubWorkProcessCommand(processInstance,
> processInstanceMap, defineId, task);
>         }else if(!task.getState().typeIsFinished()){
>             //submit to task queue
>
> task.setProcessInstancePriority(processInstance.getProcessInstancePriority());
>             submitTaskToQueue(task);
>         }
>         logger.info("submit task :{} state:{} complete, instance id:{}
> state: {}  ",
>                 taskInstance.getName(), task.getState(),
> processInstance.getId(), processInstance.getState());
>         return task;
>     }
>
> 2.worker fetch task from zk and query db.if both exist,submit task to
> execute
> another situation is db task insert is slow . then
>
>     /**
>      * wait for task instance exists, because of db action would be
> delayed.
>      *
>      * @throws Exception exception
>      */
>     private void waitForTaskInstance()throws Exception{
>         int retryTimes = 30;
>         while (taskInstance == null && retryTimes > 0) {
>             Thread.sleep(Constants.SLEEP_TIME_MILLIS);
>             taskInstance = processDao.findTaskInstanceById(taskInstId);
>             retryTimes--;
>         }
>     }
>
> The worker waits for the db 30s to insert the task instance. If it exceeds
> 30s, the task corresponding to the zk queue is deleted.
> This will cause the task to be in the task submited state and the process
> instance to be running.
>
>
>
> In this case :
> master to ensure data consistency or other ?
>
> Welcome to discuss
>
> thx
>
> —————————————
> DolphinScheduler(Incubator)  PPMC
> Zhanwei Qiao 乔占卫
>
> [email protected]
>

Reply via email to