dingo4dev commented on code in PR #62501:
URL: https://github.com/apache/airflow/pull/62501#discussion_r3418788515
##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -4868,6 +4870,176 @@ def dict_from_obj(obj):
assert created_run.creating_job_id == scheduler_job.id
+ @pytest.mark.need_serialized_dag
+ def test_create_dag_runs_asset_triggered_skips_stale_triggered_date(self,
session, dag_maker):
+ asset = Asset(uri="test://asset-for-stale-trigger-date",
name="asset-for-stale-trigger-date")
+ with dag_maker(dag_id="asset-consumer-stale-trigger-date",
schedule=[asset], session=session):
+ pass
+ dag_model = dag_maker.dag_model
+ asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset.uri))
+
+ queued_at = timezone.utcnow()
+ session.add(AssetDagRunQueue(target_dag_id=dag_model.dag_id,
asset_id=asset_id, created_at=queued_at))
+ session.flush()
+
+ # Simulate another scheduler consuming ADRQ rows after we computed
triggered_date_by_dag.
+
session.execute(delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id
== dag_model.dag_id))
+ session.flush()
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
executors=[self.null_exec])
+ self.job_runner._create_dag_runs_asset_triggered(
+ dag_models=[dag_model],
+ session=session,
+ )
+
+ # We do not create a new DagRun seems the ADRQ has already been
consumed
+ assert session.scalars(select(DagRun).where(DagRun.dag_id ==
dag_model.dag_id)).one_or_none() is None
+
+ @pytest.mark.need_serialized_dag
+ def test_create_dag_runs_asset_triggered_deletes_only_selected_adrq_rows(
+ self, session: Session, dag_maker
+ ):
+ asset_1 = Asset("ready-to-trigger-a-Dag-run")
+ asset_2 = Asset("should-still-exist-after-a-Dag-run-created")
+ with dag_maker(dag_id="asset-consumer-delete-selected",
schedule=asset_1 | asset_2, session=session):
+ pass
+ dag_model = dag_maker.dag_model
+ asset_1_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset_1.name))
+ asset_2_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset_2.name))
+ session.add_all(
+ [
+ AssetEvent(
+ asset_id=asset_1_id,
+ timestamp=timezone.utcnow(),
+ ),
+ # The ADRQ that should triggers the Dag run creation
+ AssetDagRunQueue(
+ asset_id=asset_1_id, target_dag_id=dag_model.dag_id,
created_at=timezone.utcnow()
+ ),
+ AssetEvent(asset_id=asset_2_id, timestamp=timezone.utcnow()),
+ # The ADRQ that arrives after the Dag run creation but before
ADRQ clean up
+ # This situation is simluarted by _lock_only_selected_asset
below
+ AssetDagRunQueue(
+ asset_id=asset_2_id, target_dag_id=dag_model.dag_id,
created_at=timezone.utcnow()
+ ),
+ ]
+ )
+ session.flush()
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
executors=[self.null_exec])
+
+ def _lock_only_selected_asset(query, **_):
+ # Simulate SKIP LOCKED behavior where this scheduler can only
consume one ADRQ row.
+ return query.where(AssetDagRunQueue.asset_id == asset_1_id)
+
+ with patch("airflow.jobs.scheduler_job_runner.with_row_locks",
side_effect=_lock_only_selected_asset):
+ self.job_runner._create_dag_runs_asset_triggered(
+ dag_models=[dag_model],
+ session=session,
+ )
+
+ dr = session.scalars(select(DagRun).where(DagRun.dag_id ==
dag_model.dag_id)).one_or_none()
+ assert dr is not None
+
+ adrq_1 = session.scalars(
+ select(AssetDagRunQueue).where(
+ AssetDagRunQueue.target_dag_id == dag_model.dag_id,
+ AssetDagRunQueue.asset_id == asset_1_id,
+ )
+ ).one_or_none()
+ assert adrq_1 is None
+ adrq_2 = session.scalars(
+ select(AssetDagRunQueue).where(
+ AssetDagRunQueue.target_dag_id == dag_model.dag_id,
+ AssetDagRunQueue.asset_id == asset_2_id,
+ )
+ ).one_or_none()
+ assert adrq_2 is not None
+
+ @pytest.mark.need_serialized_dag
+ def test_create_dag_runs_when_concurrent_asset_events_created(self,
session: Session, dag_maker, caplog):
+ import random
+ from concurrent.futures import ThreadPoolExecutor, as_completed
+
+ ASSET_EVENT_COUNT = 100
+ asset = Asset(name="test_asset")
+ with dag_maker(dag_id="consumer", schedule=asset, session=session):
+ pass
+ dag_model = dag_maker.dag_model
+ with dag_maker(dag_id="asset-producer", start_date=timezone.utcnow(),
session=session):
+ BashOperator(task_id="simulate-asset-outlet", bash_command="echo
1")
+ dr = dag_maker.create_dagrun(run_id="asset-producer-run")
+ asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri
== asset.uri))
+ futures = []
+ consumed_asset_events = []
+
+ def create_asset_events(sleep):
+ import time
+
+ from sqlalchemy import inspect
+
+ with create_session() as session:
+ now = timezone.utcnow()
+ asset_event = AssetEvent(asset_id=asset_id, timestamp=now)
+ session.add(asset_event)
+ session.commit()
+ time.sleep(sleep) # sleep to simulate slow performance
+ asset_manager = AssetManager()
+ dialect_name = inspect(session.get_bind()).dialect.name
+ if dialect_name in ("postgresql", "sqlite"):
+
asset_manager._queue_dagruns_nonpartitioned_conflict_update(
+ asset_id=asset_id,
+ dags_to_queue=[dag_model],
+ event=asset_event,
+ session=session,
+ dialect_name=dialect_name,
+ )
+ elif dialect_name == "mysql":
+ asset_manager._queue_dagruns_nonpartitioned_mysql(
+ asset_id=asset_id, dags_to_queue=[dag_model],
event=asset_event, session=session
+ )
+
+ return asset_event
+
+ with (
+ ThreadPoolExecutor() as exec,
+ caplog.at_level(
+ "WARNING",
+ logger="airflow.jobs.scheduler_job_runner",
+ ),
+ ):
+ for _ in range(ASSET_EVENT_COUNT):
+ future = exec.submit(create_asset_events, random.randint(0, 1))
+ futures.append(future)
Review Comment:
resolved as outdated
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]