dingo4dev commented on code in PR #62501:
URL: https://github.com/apache/airflow/pull/62501#discussion_r2969574055
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2074,29 +2074,31 @@ def _create_dag_runs_asset_triggered(
.order_by(AssetEvent.timestamp.asc(), AssetEvent.id.asc())
)
)
-
- dag_run = dag.create_dagrun(
- run_id=DagRun.generate_run_id(
- run_type=DagRunType.ASSET_TRIGGERED, logical_date=None,
run_after=triggered_date
- ),
- logical_date=None,
- data_interval=None,
- run_after=triggered_date,
- run_type=DagRunType.ASSET_TRIGGERED,
- triggered_by=DagRunTriggeredByType.ASSET,
- state=DagRunState.QUEUED,
- creating_job_id=self.job.id,
- session=session,
- )
- Stats.incr("asset.triggered_dagruns")
- dag_run.consumed_asset_events.extend(asset_events)
+ if asset_events:
+ dag_run = dag.create_dagrun(
+ run_id=DagRun.generate_run_id(
+ run_type=DagRunType.ASSET_TRIGGERED,
logical_date=None, run_after=triggered_date
+ ),
+ logical_date=None,
+ data_interval=None,
+ run_after=triggered_date,
+ run_type=DagRunType.ASSET_TRIGGERED,
+ triggered_by=DagRunTriggeredByType.ASSET,
+ state=DagRunState.QUEUED,
+ creating_job_id=self.job.id,
+ session=session,
+ )
+ Stats.incr("asset.triggered_dagruns")
+ dag_run.consumed_asset_events.extend(asset_events)
# Delete only consumed ADRQ rows to avoid dropping newly queued
events
- # (e.g. DagRun triggered by asset A while a new event for asset B
arrives).
+ # (e.g. 1. DagRun triggered by asset A while a new event for asset
B arrives.
+ # 2. DagRun triggered by asset A while new event for asset A
upsert to ADRQ)
adrq_pks = [(record.asset_id, record.target_dag_id) for record in
queued_adrqs]
Review Comment:
I noticed there was already logging in latest code. I think it might be
better to add `self.log.warning("No DagRun created for '%s' at '%s' - asset
events already consumed", dag.dag_id, triggered_date)` if the asset event is
empty
https://github.com/apache/airflow/blob/6e6ab0bd111c683936b35c128b981d61b4130262/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L2118-L2122
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]