dingo4dev commented on code in PR #62501:
URL: https://github.com/apache/airflow/pull/62501#discussion_r3307919724


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2202,42 +2193,52 @@ def _create_dag_runs_asset_triggered(
                             ),
                         ),
                         AssetEvent.timestamp <= triggered_date,
-                        AssetEvent.timestamp > 
func.coalesce(cte.c.previous_dag_run_run_after, date.min),
+                        AssetEvent.id.not_in(
+                            select(association_table.c.event_id)
+                            .join(DagRun, DagRun.id == 
association_table.c.dag_run_id)
+                            .where(DagRun.dag_id == dag.dag_id)

Review Comment:
   already reverted this commit to main branch



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2118,45 +2118,52 @@ def _create_dag_runs_asset_triggered(
                     .order_by(AssetEvent.timestamp.asc(), AssetEvent.id.asc())
                 )
             )
-
-            dag_run = dag.create_dagrun(
-                run_id=DagRun.generate_run_id(
-                    run_type=DagRunType.ASSET_TRIGGERED, logical_date=None, 
run_after=triggered_date
-                ),
-                logical_date=None,
-                data_interval=None,
-                run_after=triggered_date,
-                run_type=DagRunType.ASSET_TRIGGERED,
-                triggered_by=DagRunTriggeredByType.ASSET,
-                state=DagRunState.QUEUED,
-                creating_job_id=self.job.id,
-                session=session,
-            )
-            Stats.incr("asset.triggered_dagruns")
-            dag_run.consumed_asset_events.extend(asset_events)
-            self.log.info(
-                "Created asset-triggered DagRun for '%s': run_id=%s, consumed 
%d asset events",
-                dag.dag_id,
-                dag_run.run_id,
-                len(asset_events),
-            )
-
-            # Delete only consumed ADRQ rows to avoid dropping newly queued 
events
-            # (e.g. DagRun triggered by asset A while a new event for asset B 
arrives).
-            adrq_pks = [(record.asset_id, record.target_dag_id) for record in 
queued_adrqs]
-            result = cast(
-                "CursorResult",
-                session.execute(
-                    delete(AssetDagRunQueue).where(
-                        tuple_(AssetDagRunQueue.asset_id, 
AssetDagRunQueue.target_dag_id).in_(adrq_pks)
-                    )
-                ),
-            )
-            self.log.info(
-                "Deleted %d ADRQ rows for '%s'",
-                result.rowcount,
-                dag.dag_id,
-            )
+            if asset_events:
+                dag_run = dag.create_dagrun(
+                    run_id=DagRun.generate_run_id(
+                        run_type=DagRunType.ASSET_TRIGGERED, 
logical_date=None, run_after=triggered_date
+                    ),
+                    logical_date=None,
+                    data_interval=None,
+                    run_after=triggered_date,
+                    run_type=DagRunType.ASSET_TRIGGERED,
+                    triggered_by=DagRunTriggeredByType.ASSET,
+                    state=DagRunState.QUEUED,
+                    creating_job_id=self.job.id,
+                    session=session,
+                )
+                Stats.incr("asset.triggered_dagruns")
+                dag_run.consumed_asset_events.extend(asset_events)
+                self.log.info(
+                    "Created asset-triggered DagRun for '%s': run_id=%s, 
consumed %d asset events",
+                    dag.dag_id,
+                    dag_run.run_id,
+                    len(asset_events),
+                )
+                # Delete only consumed ADRQ rows to avoid dropping newly 
queued events
+                # (e.g. 1. DagRun triggered by asset A while a new event for 
asset B arrives.
+                # 2. DagRun triggered by asset A while new event for asset A 
upsert to ADRQ)
+                adrq_pks = [(record.asset_id, record.target_dag_id) for record 
in queued_adrqs]
+                result = cast(
+                    "CursorResult",
+                    session.execute(
+                        delete(AssetDagRunQueue).where(
+                            tuple_(AssetDagRunQueue.asset_id, 
AssetDagRunQueue.target_dag_id).in_(adrq_pks),
+                            AssetDagRunQueue.created_at <= triggered_date,
+                        )
+                    ),
+                )
+                self.log.info(
+                    "Deleted %d ADRQ rows for '%s'",
+                    result.rowcount,
+                    dag.dag_id,
+                )
+            else:
+                self.log.warning(
+                    "No DagRun created for '%s' at '%s' - asset events already 
consumed",
+                    dag.dag_id,
+                    triggered_date,
+                )

Review Comment:
   Changed to info log level



##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -136,6 +137,40 @@ def _add_one(asset: SerializedAsset) -> AssetModel:
 
         return [_add_one(a) for a in assets]
 
+    @classmethod
+    def create_asset_event(cls, *, event_kwargs: dict, session: Session) -> 
AssetEvent:
+        """
+        Persist an :class:`AssetEvent` row and return it, bound to *session*.
+
+        For non-SQLite backends a short-lived independent session is used so
+        that the row is committed (and therefore visible to the scheduler's
+        session) before the caller continues.  SQLite does not support
+        concurrent connections, so the event is added directly to the caller's
+        *session* and flushed instead.
+        """
+        if get_dialect_name(session) == "sqlite":
+            # SQLite cannot have two concurrent connections to the same file, 
so
+            # opening a second session would deadlock.  Add directly and flush 
so
+            # the object gets an id without committing the outer transaction.
+            asset_event = AssetEvent(**event_kwargs)
+            session.add(asset_event)
+            session.flush()
+            return asset_event
+
+        # Create a short-lived session to populate asset event in db.
+        # This is to ensure the asset event is committed and visible to other 
sessions.
+        # e.g. Scheduler's session when it looks for new asset events to 
trigger dags via ADRQ.
+        # Use ``scoped=False`` to get a truly independent session with its own 
connection/transaction.
+        with create_session(scoped=False) as ae_session:
+            _asset_event = AssetEvent(**event_kwargs)
+            ae_session.add(_asset_event)
+            ae_session.flush()
+            asset_event_id = _asset_event.id
+
+        # Re-load the now-committed AssetEvent into the caller's session so 
that
+        # subsequent relationship operations work correctly.
+        return session.get_one(AssetEvent, asset_event_id)

Review Comment:
   IMO, the scheduler is gated by ADRQ, and ADRQ is committed in the caller's 
transaction after this call. If we keep the AssetEvent insert in the caller's 
session, another scheduler can lock the ADRQ and run the query for 
`AssetEvent.timestamp <= triggered_date` before the caller commits — the event 
is invisible, the ADRQ gets consumed/deleted, and the trigger is silently 
dropped. Committing the event in an independent, earlier-finishing transaction 
guarantees event visibility precedes ADRQ visibility, so the scheduler can 
never see an ADRQ without its triggering event. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to