Lee-W commented on code in PR #63958:
URL: https://github.com/apache/airflow/pull/63958#discussion_r2964233859
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2090,15 +2096,26 @@ def _create_dag_runs_asset_triggered(
)
Stats.incr("asset.triggered_dagruns")
dag_run.consumed_asset_events.extend(asset_events)
+ self.log.info(
+ "Created asset-triggered DagRun for '%s': run_id=%s, consumed
%d asset events",
+ dag.dag_id,
+ dag_run.run_id,
+ len(asset_events),
+ )
# Delete only consumed ADRQ rows to avoid dropping newly queued
events
# (e.g. DagRun triggered by asset A while a new event for asset B
arrives).
adrq_pks = [(record.asset_id, record.target_dag_id) for record in
queued_adrqs]
- session.execute(
+ result = session.execute(
delete(AssetDagRunQueue).where(
tuple_(AssetDagRunQueue.asset_id,
AssetDagRunQueue.target_dag_id).in_(adrq_pks)
)
)
+ self.log.info(
+ "Deleted %d ADRQ rows for '%s'",
+ result.rowcount,
Review Comment:
```suggestion
result.rowcount if result else 0,
```
##########
airflow-core/src/airflow/models/dag.py:
##########
@@ -698,6 +706,10 @@ def dag_ready(dag_id: str, cond: SerializedAssetBase,
statuses: dict[UKey, bool]
)
)
if exclusion_list:
+ log.info(
+ "Asset-triggered DAGs at max_active_runs, deferring: %s",
Review Comment:
```suggestion
"Asset-triggered Dags at max_active_runs, deferring: %s",
```
##########
airflow-core/src/airflow/models/dag.py:
##########
@@ -665,6 +665,12 @@ def dag_ready(dag_id: str, cond: SerializedAssetBase,
statuses: dict[UKey, bool]
else:
adrq_by_dag[adrq.target_dag_id].append(adrq)
+ if adrq_by_dag:
+ log.info(
+ "Asset-triggered DAGs with queued events: %s",
Review Comment:
```suggestion
"Asset-triggered Dags with queued events: %s",
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]