Lyalpha commented on issue #8494:
URL: https://github.com/apache/airflow/issues/8494#issuecomment-646848561
I handled it with the following wrapper. Rather than changing the
`execution_date`, just try again in a microsecond. (The micro-sleep is probably
superfluous given there is an overhead with the call, but it just ensures >1ms
passes between tries)
```
def safe_trigger_dag(
dag_id, run_id=None, conf=None, execution_date=None,
replace_microseconds=False,
):
"""
A wrapper for airflow's trigger_dag, which will micro-sleep if the dag
fails to start
due to a unique key violation on `dag_id` and `execution_time`
Also, by default, will not replace microseconds - this allows for dag
scheduling on
microsecond intervals
"""
while True:
try:
trigger_dag(
dag_id=dag_id,
run_id=run_id,
conf=conf,
execution_date=execution_date,
replace_microseconds=replace_microseconds,
)
except IntegrityError as e:
assert isinstance(e.orig, UniqueViolation)
logging.warning("UniqueViolation detected when triggering dag,
starting micro-sleep")
time.sleep(1e-6)
else:
break
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]