blackberrier commented on a change in pull request #5572:
URL: https://github.com/apache/dolphinscheduler/pull/5572#discussion_r643788658



##########
File path: 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
##########
@@ -47,6 +58,33 @@
     @Autowired
     private ProcessService processService;
 
+    @PostConstruct
+    public void init() {
+        super.setName("TaskInstanceCacheRefreshThread");
+        super.start();
+    }
+
+    /**
+     * issue#5539 add thread to fetch task state from database in a fixed rate
+     */
+    @Override
+    public void run() {
+        while (Stopper.isRunning()) {
+            try {
+                for (Entry<Integer, TaskInstance> taskInstanceEntry : 
taskInstanceCache.entrySet()) {
+                    TaskInstance taskInstance = 
processService.findTaskInstanceById(taskInstanceEntry.getKey());
+                    if (null != taskInstance && taskInstance.getState() == 
ExecutionStatus.NEED_FAULT_TOLERANCE) {
+                        logger.debug("task {} need fault tolerance, update 
instance cache", taskInstance.getId());
+                        taskInstanceCache.put(taskInstanceEntry.getKey(), 
taskInstance);

Review comment:
       Hi, @ruanwenjun I don't quite get the situation.
   Assume that in one loop, we are checking the task instance, and at this 
time, `MasterTaskExecThread` remove it. At this point, 'remove' may mean two 
level, first is database remove, and we will get a null taskInstance. Second is 
cache remove, which means taskInstance is finished state. In both level, the 
'if' condition is not satisfied.
   Or another situation, one task is removed outside the entryset loop, then 
the task will not enter loop.
   I wrote a demo for testing , listed below. In timer thread, the sleep 
simulate a time wasting opration, and another thread remove the entry while 
timer thread is doing the opration.
   ```
   import java.util.Map;
   import java.util.Timer;
   import java.util.TimerTask;
   import java.util.concurrent.ConcurrentHashMap;
   
   public class ConcurrentHashMapRemoveIteratorTest {
   
       private static ConcurrentHashMap<String, String> map = new 
ConcurrentHashMap<>();
   
       public static void main(String[] args) {
           /*for (int i = 0; i < 10; i++) {
               map.put(i + ",", i + "");
           }*/
           map.put(10 + ",", 10 + "");
   
           new Timer().scheduleAtFixedRate(
                   new TimerTask() {
                       @Override
                       public void run() {
                           System.out.println("new loop");
                           for (Map.Entry<String, String> entry : 
map.entrySet()) {
                               System.out.println("begin" + entry.getKey() + 
entry.getValue());
                               try {
                                   System.out.println("sleeping");
                                   Thread.sleep(5000);
                               } catch (InterruptedException e) {
                                   e.printStackTrace();
                               }
                               System.out.println("end" + entry.getKey() + 
entry.getValue());
                           }
                           System.out.println("new loop end");
                           System.out.println();
   
                       }
                   }, 1000, 10000
           );
   
           new Thread(() -> {
               try {
                   Thread.sleep(12000);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
               map.remove(10 + ",");
               //map.put(10 + ",","?");
               System.out.println("removed!");
   
           }).start();
       }
   }
   
   ```
   
   The output is 
   ```
   new loop
   begin10,10
   sleeping
   end10,10
   new loop end
   
   new loop
   begin10,10
   sleeping
   removed!
   end10,10
   new loop end
   
   new loop
   new loop end
   ```
   
   Maybe there are something wrong in my situation. Can you point out? Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to