uranusjr commented on code in PR #54913:
URL: https://github.com/apache/airflow/pull/54913#discussion_r2309440760
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1610,22 +1626,31 @@ def _create_dag_runs_asset_triggered(
.order_by(AssetEvent.timestamp.asc(), AssetEvent.id.asc())
).all()
- dag_run = dag.create_dagrun(
- run_id=DagRun.generate_run_id(
- run_type=DagRunType.ASSET_TRIGGERED, logical_date=None,
run_after=triggered_date
- ),
- logical_date=None,
- data_interval=None,
- run_after=triggered_date,
- run_type=DagRunType.ASSET_TRIGGERED,
- triggered_by=DagRunTriggeredByType.ASSET,
- state=DagRunState.QUEUED,
- creating_job_id=self.job.id,
- session=session,
- )
- Stats.incr("asset.triggered_dagruns")
- dag_run.consumed_asset_events.extend(asset_events)
-
session.execute(delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id
== dag_run.dag_id))
+ try:
+ dag_run = dag.create_dagrun(
+ run_id=deterministic_run_id,
+ logical_date=None,
+ data_interval=None,
+ run_after=triggered_date,
+ run_type=DagRunType.ASSET_TRIGGERED,
+ triggered_by=DagRunTriggeredByType.ASSET,
+ state=DagRunState.QUEUED,
+ creating_job_id=self.job.id,
+ session=session,
+ )
+
+ dag_run.consumed_asset_events.extend(asset_events)
+ session.execute(
+
delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id == dag_run.dag_id)
+ )
+ session.flush()
+ Stats.incr("asset.triggered_dagruns")
+
+ except Exception as e:
+ if is_constraint_violation(e):
+ session.rollback()
+ continue
+ raise e
Review Comment:
```suggestion
raise
```
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1610,22 +1626,31 @@ def _create_dag_runs_asset_triggered(
.order_by(AssetEvent.timestamp.asc(), AssetEvent.id.asc())
).all()
- dag_run = dag.create_dagrun(
- run_id=DagRun.generate_run_id(
- run_type=DagRunType.ASSET_TRIGGERED, logical_date=None,
run_after=triggered_date
- ),
- logical_date=None,
- data_interval=None,
- run_after=triggered_date,
- run_type=DagRunType.ASSET_TRIGGERED,
- triggered_by=DagRunTriggeredByType.ASSET,
- state=DagRunState.QUEUED,
- creating_job_id=self.job.id,
- session=session,
- )
- Stats.incr("asset.triggered_dagruns")
- dag_run.consumed_asset_events.extend(asset_events)
-
session.execute(delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id
== dag_run.dag_id))
+ try:
+ dag_run = dag.create_dagrun(
+ run_id=deterministic_run_id,
+ logical_date=None,
+ data_interval=None,
+ run_after=triggered_date,
+ run_type=DagRunType.ASSET_TRIGGERED,
+ triggered_by=DagRunTriggeredByType.ASSET,
+ state=DagRunState.QUEUED,
+ creating_job_id=self.job.id,
+ session=session,
+ )
+
+ dag_run.consumed_asset_events.extend(asset_events)
+ session.execute(
+
delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id == dag_run.dag_id)
+ )
+ session.flush()
+ Stats.incr("asset.triggered_dagruns")
+
+ except Exception as e:
+ if is_constraint_violation(e):
+ session.rollback()
+ continue
Review Comment:
This should probably be stricter; we should check if the conflict is indeed
caused by the issue we are expecting.
I think one better way to do this without going too deep into the database
internals would be
1. Try to create the dag run
2. If it failed with a constraint violation, _do not delete the queue entry_
3. Next time the scheduler call `_create_dagruns_for_dags`, it would run the
checks you added on lines 1584-1596 and correctly avoid those runs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]