ruanwenjun commented on a change in pull request #5572:
URL: https://github.com/apache/dolphinscheduler/pull/5572#discussion_r643830987



##########
File path: 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
##########
@@ -47,6 +58,33 @@
     @Autowired
     private ProcessService processService;
 
+    @PostConstruct
+    public void init() {
+        super.setName("TaskInstanceCacheRefreshThread");
+        super.start();
+    }
+
+    /**
+     * issue#5539 add thread to fetch task state from database in a fixed rate
+     */
+    @Override
+    public void run() {
+        while (Stopper.isRunning()) {
+            try {
+                for (Entry<Integer, TaskInstance> taskInstanceEntry : 
taskInstanceCache.entrySet()) {
+                    TaskInstance taskInstance = 
processService.findTaskInstanceById(taskInstanceEntry.getKey());
+                    if (null != taskInstance && taskInstance.getState() == 
ExecutionStatus.NEED_FAULT_TOLERANCE) {
+                        logger.debug("task {} need fault tolerance, update 
instance cache", taskInstance.getId());
+                        taskInstanceCache.put(taskInstanceEntry.getKey(), 
taskInstance);

Review comment:
       @blackberrier
   This is not an atomic operation.
   
![image](https://user-images.githubusercontent.com/22415594/120464902-05a14100-c3d0-11eb-8329-ab4d6be3198e.png)
   When you remove the task in MasterTaskExecThread, but then at the same time 
you query from database and add the task to cache.
   Right?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to