Lyalpha commented on issue #8494:
URL: https://github.com/apache/airflow/issues/8494#issuecomment-646848561


   I handled it with the following wrapper. Rather than changing the 
`execution_date`, just try again in a microsecond. (The micro-sleep is probably 
superfluous given there is an overhead with the call, but it just ensures >1ms 
passes between tries)
   ```
   def safe_trigger_dag(
       dag_id, run_id=None, conf=None, execution_date=None, 
replace_microseconds=False,
   ):
       """
       A wrapper for airflow's trigger_dag, which will micro-sleep if the dag 
fails to start
       due to a unique key violation on `dag_id` and `execution_time`
   
       Also, by default, will not replace microseconds - this allows for dag 
scheduling on
       microsecond intervals
       """
       while True:
           try:
               trigger_dag(
                   dag_id=dag_id,
                   run_id=run_id,
                   conf=conf,
                   execution_date=execution_date,
                   replace_microseconds=replace_microseconds,
               )
           except IntegrityError as e:
               assert isinstance(e.orig, UniqueViolation)
               logging.warning("UniqueViolation detected when triggering dag, 
starting micro-sleep")
               time.sleep(1e-6)
           else:
               break
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to