houqp commented on code in PR #22701:
URL: https://github.com/apache/airflow/pull/22701#discussion_r844268111
##########
airflow/jobs/backfill_job.py:
##########
@@ -782,6 +784,28 @@ def _execute(self, session=None):
return
dagrun_infos = [DagRunInfo.interval(dagrun_start_date,
dagrun_end_date)]
+ dag_with_subdags_ids = [d.dag_id for d in self._get_dag_with_subdags()]
+ running_dagruns = DagRun.find(
+ dag_id=dag_with_subdags_ids,
+ execution_start_date=self.bf_start_date,
+ execution_end_date=self.bf_end_date,
+ no_backfills=True,
+ state=State.RUNNING,
+ )
+
+ if running_dagruns:
+ for run in running_dagruns:
+ self.log.error(
+ "Backfill cannot be created for DagRun %s in %s, as
there's already %s in a RUNNING "
+ "state. Changing DagRun into BACKFILL would cause
scheduler to lose track of executing "
+ "tasks. Not changing DagRun type into BACKFILL, and trying
insert another DagRun into "
+ "database would cause database constraint violation for
dag_id + execution_date "
+ "combination. Please adjust backfill dates or wait for
this DagRun to finish.",
+ run.dag_id,
+ run.execution_date.strftime("%Y-%m-%dT%H:%M:%S"),
+ run.run_type,
+ )
Review Comment:
I pushed a change to move majority repeated of the error log into a single
log entry. Left the dynamic part to remain per entry so it still maintains the
dag_id, execution_date and run_type mapping.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]