Have other opinions ? If not, the master code will be modified to manually guarantee transactions
――――――――――――― DolphinScheduler(Incubator) PPMC Zhanwei Qiao 乔占卫 [email protected] 发件人: [email protected]<mailto:[email protected]> 发送时间: 2019-12-11 10:17 收件人: dev<mailto:[email protected]> 主题: Re: Re: worker deletes queue task, if the db task insert is slow agree to ensure data consistency on the master side ――――――――――――― DolphinScheduler(Incubator) PPMC Zhanwei Qiao 乔占卫 [email protected] From: guo jiwei<mailto:[email protected]> Date: 2019-12-10 23:40 To: dev<mailto:[email protected]> Subject: Re: worker deletes queue task, if the db task insert is slow Hi zhanwei, what do you mean "another situation is db task insert is slow" . Task should save into db and then save to zk queue orderly by MasterServer. otherwise , it may occur inaccurate task status or data inconsistency. and we have to keep more code to keep it right like the above way On Tue, Dec 10, 2019 at 2:47 PM qiao zhanwei <[email protected]> wrote: > Currently the communication between master and worker is through zk as a > queue > > 1.master inserts tasks into zk and db as transactions > > @Transactional(rollbackFor = Exception.class) > public TaskInstance submitTask(TaskInstance taskInstance, > ProcessInstance processInstance){ > logger.info("start submit task : {}, instance id:{}, state: {}, ", > taskInstance.getName(), processInstance.getId(), > processInstance.getState() ); > processInstance = > this.findProcessInstanceDetailById(processInstance.getId()); > //submit to mysql > TaskInstance task= submitTaskInstanceToMysql(taskInstance, > processInstance); > if(task.isSubProcess() && !task.getState().typeIsFinished()){ > ProcessInstanceMap processInstanceMap = > setProcessInstanceMap(processInstance, task); > > TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), > TaskNode.class); > Map<String, String> subProcessParam = > JSONUtils.toMap(taskNode.getParams()); > Integer defineId = > Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID)); > createSubWorkProcessCommand(processInstance, > processInstanceMap, defineId, task); > }else if(!task.getState().typeIsFinished()){ > //submit to task queue > > task.setProcessInstancePriority(processInstance.getProcessInstancePriority()); > submitTaskToQueue(task); > } > logger.info("submit task :{} state:{} complete, instance id:{} > state: {} ", > taskInstance.getName(), task.getState(), > processInstance.getId(), processInstance.getState()); > return task; > } > > 2.worker fetch task from zk and query db.if both exist,submit task to > execute > another situation is db task insert is slow . then > > /** > * wait for task instance exists, because of db action would be > delayed. > * > * @throws Exception exception > */ > private void waitForTaskInstance()throws Exception{ > int retryTimes = 30; > while (taskInstance == null && retryTimes > 0) { > Thread.sleep(Constants.SLEEP_TIME_MILLIS); > taskInstance = processDao.findTaskInstanceById(taskInstId); > retryTimes--; > } > } > > The worker waits for the db 30s to insert the task instance. If it exceeds > 30s, the task corresponding to the zk queue is deleted. > This will cause the task to be in the task submited state and the process > instance to be running. > > > > In this case : > master to ensure data consistency or other ? > > Welcome to discuss > > thx > > ――――――――――――― > DolphinScheduler(Incubator) PPMC > Zhanwei Qiao 乔占卫 > > [email protected] >
