uranusjr commented on code in PR #54913:
URL: https://github.com/apache/airflow/pull/54913#discussion_r2309440760


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1610,22 +1626,31 @@ def _create_dag_runs_asset_triggered(
                 .order_by(AssetEvent.timestamp.asc(), AssetEvent.id.asc())
             ).all()
 
-            dag_run = dag.create_dagrun(
-                run_id=DagRun.generate_run_id(
-                    run_type=DagRunType.ASSET_TRIGGERED, logical_date=None, 
run_after=triggered_date
-                ),
-                logical_date=None,
-                data_interval=None,
-                run_after=triggered_date,
-                run_type=DagRunType.ASSET_TRIGGERED,
-                triggered_by=DagRunTriggeredByType.ASSET,
-                state=DagRunState.QUEUED,
-                creating_job_id=self.job.id,
-                session=session,
-            )
-            Stats.incr("asset.triggered_dagruns")
-            dag_run.consumed_asset_events.extend(asset_events)
-            
session.execute(delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id 
== dag_run.dag_id))
+            try:
+                dag_run = dag.create_dagrun(
+                    run_id=deterministic_run_id,
+                    logical_date=None,
+                    data_interval=None,
+                    run_after=triggered_date,
+                    run_type=DagRunType.ASSET_TRIGGERED,
+                    triggered_by=DagRunTriggeredByType.ASSET,
+                    state=DagRunState.QUEUED,
+                    creating_job_id=self.job.id,
+                    session=session,
+                )
+
+                dag_run.consumed_asset_events.extend(asset_events)
+                session.execute(
+                    
delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id == dag_run.dag_id)
+                )
+                session.flush()
+                Stats.incr("asset.triggered_dagruns")
+
+            except Exception as e:
+                if is_constraint_violation(e):
+                    session.rollback()
+                    continue
+                raise e

Review Comment:
   ```suggestion
                   raise
   ```



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1610,22 +1626,31 @@ def _create_dag_runs_asset_triggered(
                 .order_by(AssetEvent.timestamp.asc(), AssetEvent.id.asc())
             ).all()
 
-            dag_run = dag.create_dagrun(
-                run_id=DagRun.generate_run_id(
-                    run_type=DagRunType.ASSET_TRIGGERED, logical_date=None, 
run_after=triggered_date
-                ),
-                logical_date=None,
-                data_interval=None,
-                run_after=triggered_date,
-                run_type=DagRunType.ASSET_TRIGGERED,
-                triggered_by=DagRunTriggeredByType.ASSET,
-                state=DagRunState.QUEUED,
-                creating_job_id=self.job.id,
-                session=session,
-            )
-            Stats.incr("asset.triggered_dagruns")
-            dag_run.consumed_asset_events.extend(asset_events)
-            
session.execute(delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id 
== dag_run.dag_id))
+            try:
+                dag_run = dag.create_dagrun(
+                    run_id=deterministic_run_id,
+                    logical_date=None,
+                    data_interval=None,
+                    run_after=triggered_date,
+                    run_type=DagRunType.ASSET_TRIGGERED,
+                    triggered_by=DagRunTriggeredByType.ASSET,
+                    state=DagRunState.QUEUED,
+                    creating_job_id=self.job.id,
+                    session=session,
+                )
+
+                dag_run.consumed_asset_events.extend(asset_events)
+                session.execute(
+                    
delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id == dag_run.dag_id)
+                )
+                session.flush()
+                Stats.incr("asset.triggered_dagruns")
+
+            except Exception as e:
+                if is_constraint_violation(e):
+                    session.rollback()
+                    continue

Review Comment:
   This should probably be stricter; we should check if the conflict is indeed 
caused by the issue we are expecting.
   
   I think one better way to do this without going too deep into the database 
internals would be
   
   1. Try to create the dag run
   2. If it failed with a constraint violation, _do not delete the queue entry_
   3. Next time the scheduler call `_create_dagruns_for_dags`, it would run the 
checks you added on lines 1584-1596 and correctly avoid those runs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to