ashb commented on code in PR #26348:
URL: https://github.com/apache/airflow/pull/26348#discussion_r968868777
##########
tests/models/test_dag.py:
##########
@@ -2125,6 +2132,60 @@ def test_dags_needing_dagruns_not_too_early(self):
session.rollback()
session.close()
+ def test_dags_needing_dagruns_datasets(self, session):
+ dataset = Dataset(uri='hello')
+ dag = DAG(
+ dag_id='my_dag', max_active_runs=1, schedule=[dataset],
start_date=pendulum.now().add(days=-2)
+ )
+ EmptyOperator(task_id='dummy', dag=dag)
+ DAG.bulk_write_to_db([dag], session=session)
+ session.commit()
Review Comment:
(Will need re-formating) By using the `dag_maker` fixture it will
automatically clean up after the test:
```suggestion
def test_dags_needing_dagruns_datasets(self, dag_maker, session):
dataset = Dataset(uri='hello')
with dag_maker(session=session,dag_id='my_dag', max_active_runs=1,
schedule=[dataset], start_date=pendulum.now().add(days=-2)
):
EmptyOperator(task_id='dummy')
```
##########
tests/models/test_dag.py:
##########
@@ -2125,6 +2132,60 @@ def test_dags_needing_dagruns_not_too_early(self):
session.rollback()
session.close()
+ def test_dags_needing_dagruns_datasets(self, session):
+ dataset = Dataset(uri='hello')
+ dag = DAG(
+ dag_id='my_dag', max_active_runs=1, schedule=[dataset],
start_date=pendulum.now().add(days=-2)
+ )
+ EmptyOperator(task_id='dummy', dag=dag)
+ DAG.bulk_write_to_db([dag], session=session)
+ session.commit()
+
+ # there's no queue record yet, so no runs needed at this time.
+ query, _ = DagModel.dags_needing_dagruns(session)
+ dag_models = query.all()
+ assert dag_models == []
+ session.commit()
+
+ # add queue records so we'll need a run
+ dag_model = session.query(DagModel).filter(DagModel.dag_id ==
dag.dag_id).one()
+ dataset_model: DatasetModel = dag_model.schedule_datasets[0]
+ session.add(DatasetDagRunQueue(dataset_id=dataset_model.id,
target_dag_id=dag_model.dag_id))
+ session.commit()
+ query, _ = DagModel.dags_needing_dagruns(session)
+ dag_models = query.all()
+ session.commit()
Review Comment:
```suggestion
```
##########
tests/models/test_dag.py:
##########
@@ -2125,6 +2132,60 @@ def test_dags_needing_dagruns_not_too_early(self):
session.rollback()
session.close()
+ def test_dags_needing_dagruns_datasets(self, session):
+ dataset = Dataset(uri='hello')
+ dag = DAG(
+ dag_id='my_dag', max_active_runs=1, schedule=[dataset],
start_date=pendulum.now().add(days=-2)
+ )
+ EmptyOperator(task_id='dummy', dag=dag)
+ DAG.bulk_write_to_db([dag], session=session)
+ session.commit()
+
+ # there's no queue record yet, so no runs needed at this time.
+ query, _ = DagModel.dags_needing_dagruns(session)
+ dag_models = query.all()
+ assert dag_models == []
+ session.commit()
Review Comment:
```suggestion
```
##########
tests/models/test_dag.py:
##########
@@ -2125,6 +2132,60 @@ def test_dags_needing_dagruns_not_too_early(self):
session.rollback()
session.close()
+ def test_dags_needing_dagruns_datasets(self, session):
+ dataset = Dataset(uri='hello')
+ dag = DAG(
+ dag_id='my_dag', max_active_runs=1, schedule=[dataset],
start_date=pendulum.now().add(days=-2)
+ )
+ EmptyOperator(task_id='dummy', dag=dag)
+ DAG.bulk_write_to_db([dag], session=session)
+ session.commit()
+
+ # there's no queue record yet, so no runs needed at this time.
+ query, _ = DagModel.dags_needing_dagruns(session)
+ dag_models = query.all()
+ assert dag_models == []
+ session.commit()
+
+ # add queue records so we'll need a run
+ dag_model = session.query(DagModel).filter(DagModel.dag_id ==
dag.dag_id).one()
+ dataset_model: DatasetModel = dag_model.schedule_datasets[0]
+ session.add(DatasetDagRunQueue(dataset_id=dataset_model.id,
target_dag_id=dag_model.dag_id))
+ session.commit()
+ query, _ = DagModel.dags_needing_dagruns(session)
+ dag_models = query.all()
+ session.commit()
+ assert dag_models == [dag_model]
+
+ # create run so we don't need a run anymore (due to max active runs)
+ session.add(
+ DagRun(
+ dag_id=dag_model.dag_id,
+ run_id='abc',
+ run_type=DagRunType.DATASET_TRIGGERED,
+ state=DagRunState.QUEUED,
+ )
+ )
+ session.commit()
+ query, _ = DagModel.dags_needing_dagruns(session)
+ dag_models = query.all()
+ assert dag_models == []
+
+ # increase max active runs and we should now need another run
+ dag = DAG(
+ dag_id='my_dag',
+ max_active_runs=2,
+ schedule=[dataset],
+ start_date=pendulum.now().add(days=-2),
+ )
+ EmptyOperator(task_id='dummy', dag=dag)
+ DAG.bulk_write_to_db([dag], session=session)
+ session.commit()
+ query, _ = DagModel.dags_needing_dagruns(session)
+ dag_models = query.all()
+ session.commit()
Review Comment:
```suggestion
```
##########
tests/models/test_dag.py:
##########
@@ -2125,6 +2132,60 @@ def test_dags_needing_dagruns_not_too_early(self):
session.rollback()
session.close()
+ def test_dags_needing_dagruns_datasets(self, session):
+ dataset = Dataset(uri='hello')
+ dag = DAG(
+ dag_id='my_dag', max_active_runs=1, schedule=[dataset],
start_date=pendulum.now().add(days=-2)
+ )
+ EmptyOperator(task_id='dummy', dag=dag)
+ DAG.bulk_write_to_db([dag], session=session)
+ session.commit()
+
+ # there's no queue record yet, so no runs needed at this time.
+ query, _ = DagModel.dags_needing_dagruns(session)
+ dag_models = query.all()
+ assert dag_models == []
+ session.commit()
+
+ # add queue records so we'll need a run
+ dag_model = session.query(DagModel).filter(DagModel.dag_id ==
dag.dag_id).one()
+ dataset_model: DatasetModel = dag_model.schedule_datasets[0]
+ session.add(DatasetDagRunQueue(dataset_id=dataset_model.id,
target_dag_id=dag_model.dag_id))
+ session.commit()
Review Comment:
```suggestion
session.flush()
```
##########
tests/models/test_dag.py:
##########
@@ -2125,6 +2132,60 @@ def test_dags_needing_dagruns_not_too_early(self):
session.rollback()
session.close()
+ def test_dags_needing_dagruns_datasets(self, session):
+ dataset = Dataset(uri='hello')
+ dag = DAG(
+ dag_id='my_dag', max_active_runs=1, schedule=[dataset],
start_date=pendulum.now().add(days=-2)
+ )
+ EmptyOperator(task_id='dummy', dag=dag)
+ DAG.bulk_write_to_db([dag], session=session)
+ session.commit()
+
+ # there's no queue record yet, so no runs needed at this time.
+ query, _ = DagModel.dags_needing_dagruns(session)
+ dag_models = query.all()
+ assert dag_models == []
+ session.commit()
+
+ # add queue records so we'll need a run
+ dag_model = session.query(DagModel).filter(DagModel.dag_id ==
dag.dag_id).one()
+ dataset_model: DatasetModel = dag_model.schedule_datasets[0]
+ session.add(DatasetDagRunQueue(dataset_id=dataset_model.id,
target_dag_id=dag_model.dag_id))
+ session.commit()
+ query, _ = DagModel.dags_needing_dagruns(session)
+ dag_models = query.all()
+ session.commit()
+ assert dag_models == [dag_model]
+
+ # create run so we don't need a run anymore (due to max active runs)
+ session.add(
+ DagRun(
+ dag_id=dag_model.dag_id,
+ run_id='abc',
+ run_type=DagRunType.DATASET_TRIGGERED,
+ state=DagRunState.QUEUED,
+ )
+ )
+ session.commit()
+ query, _ = DagModel.dags_needing_dagruns(session)
+ dag_models = query.all()
+ assert dag_models == []
+
+ # increase max active runs and we should now need another run
+ dag = DAG(
+ dag_id='my_dag',
+ max_active_runs=2,
+ schedule=[dataset],
+ start_date=pendulum.now().add(days=-2),
+ )
+ EmptyOperator(task_id='dummy', dag=dag)
+ DAG.bulk_write_to_db([dag], session=session)
+ session.commit()
Review Comment:
```suggestion
dag_maker.dag_model.max_active_runs = 2
session.flush()
```
##########
tests/models/test_dag.py:
##########
@@ -2125,6 +2132,60 @@ def test_dags_needing_dagruns_not_too_early(self):
session.rollback()
session.close()
+ def test_dags_needing_dagruns_datasets(self, session):
+ dataset = Dataset(uri='hello')
+ dag = DAG(
+ dag_id='my_dag', max_active_runs=1, schedule=[dataset],
start_date=pendulum.now().add(days=-2)
+ )
+ EmptyOperator(task_id='dummy', dag=dag)
+ DAG.bulk_write_to_db([dag], session=session)
+ session.commit()
+
+ # there's no queue record yet, so no runs needed at this time.
+ query, _ = DagModel.dags_needing_dagruns(session)
+ dag_models = query.all()
+ assert dag_models == []
+ session.commit()
+
+ # add queue records so we'll need a run
+ dag_model = session.query(DagModel).filter(DagModel.dag_id ==
dag.dag_id).one()
+ dataset_model: DatasetModel = dag_model.schedule_datasets[0]
+ session.add(DatasetDagRunQueue(dataset_id=dataset_model.id,
target_dag_id=dag_model.dag_id))
+ session.commit()
+ query, _ = DagModel.dags_needing_dagruns(session)
+ dag_models = query.all()
+ session.commit()
+ assert dag_models == [dag_model]
+
+ # create run so we don't need a run anymore (due to max active runs)
+ session.add(
+ DagRun(
+ dag_id=dag_model.dag_id,
+ run_id='abc',
+ run_type=DagRunType.DATASET_TRIGGERED,
+ state=DagRunState.QUEUED,
+ )
+ )
+ session.commit()
Review Comment:
```suggestion
dag_maker.create_dagrun(
run_type=DagRunType.DATASET_TRIGGERED,
state=DagRunState.QUEUED,
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]