potiuk edited a comment on issue #17897:
URL: https://github.com/apache/airflow/issues/17897#issuecomment-910789621


   @ashb  that might be an interesting one - and need your second pair of eyes 
here (and your expert knowldge of that part).
   
   I looked at the code and the only explanation I have (regardless of the 
isolation level question) is that when you have two parsers and when you change 
a DAG file 'quickly" you might technically end up with two parsers loading same 
file in parallel (and subsequently failing on updating TAGs. Tag Updating is 
done in several steps with DELETES and INSERTS and it looks like pretty 
interesting (and likely) race condition. 
   
   My hypothesis:
   
   1) first pass of lDagFileProcessorManager oop finds a DAG file to parse - 
adds it to the _file_path_queue (but other files are being parsed so it is not 
picked up yet)
   
   2) the DAG file gets changed (tag is renamed to a new name)
   
   3) next pass of parser loop finds again the same file (sorted by 
modification time reverse so it is found first) . in case of  modified_time, 
the "min_file_process_interval" is not honored so it will be picked up 
immediately and added to the list.
   The check in DagFileProcessorManager does not include self.__file_path_queue 
- so I believe if the same file is already in the queue, it will be added for 
the second time (this queue is a list):
   ```
           file_paths_to_exclude = set(file_paths_in_progress).union(
               file_paths_recently_processed, files_paths_at_run_limit
           )
   ```
   
   4) Then the same file appears twice in the self._file_path_queue and if they 
are next to each other in the queue and so it happens that they are picked up 
by two parallel parser processes. By the time those parsers pick them up they 
pick the same version of the files and they try to insert the same TAG for the 
same dag -> hence the duplicate entry. 
   
   Does it sound plausible? Did I miss something? I think the solution for that 
will be:
   
   ```
           file_paths_to_exclude = set(file_paths_in_progress).union(
               file_paths_recently_processed, files_paths_at_run_limit
           ).union(set(self._file_path_queue))
   ```
   
   Does it look legit ? 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to