ashb commented on a change in pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#discussion_r747354277



##########
File path: docs/apache-airflow/migrations-ref.rst
##########
@@ -23,7 +23,9 @@ Here's the list of all the Database Migrations that are 
executed via when you ru
 
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | Revision ID                    | Revises ID       | Airflow Version | 
Description                                                                     
      |
 
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
-| ``7b2661a43ba3`` (head)        | ``142555e44c17`` | ``2.2.0``       | Change 
``TaskInstance`` and ``TaskReschedule`` tables from execution_date to run_id.  |
+| ``be2bfac3da23`` (head)        | ``7b2661a43ba3`` | ``2.2.2``       | Add 
has_import_errors column to DagModel                                            
  |

Review comment:
       ```suggestion
   | ``be2bfac3da23`` (head)        | ``7b2661a43ba3`` | ``2.2.3``       | Add 
has_import_errors column to DagModel                                            
  |
   ```

##########
File path: tests/models/test_dag.py
##########
@@ -1898,8 +1951,65 @@ def test_dags_needing_dagruns_only_unpaused(self):
         dag_models = DagModel.dags_needing_dagruns(session).all()
         assert dag_models == []
 
-        session.rollback()
-        session.close()
+    def 
test_dags_needing_dagruns_doesnot_send_dagmodel_with_null_max_active_runs(self):
+        """
+        We check that max_active_runs must not be null for dags
+        being set to scheduler to create dagruns
+        """
+        dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE)
+        DummyOperator(task_id='dummy', dag=dag, owner='airflow')
+
+        session = settings.Session()
+
+        orm_dag = DagModel(
+            dag_id=dag.dag_id,
+            has_task_concurrency_limits=False,
+            next_dagrun=DEFAULT_DATE,
+            next_dagrun_create_after=DEFAULT_DATE + timedelta(days=1),
+            is_active=True,
+        )
+        assert orm_dag.max_active_runs is not None
+        session.add(orm_dag)
+        session.flush()
+
+        needed = DagModel.dags_needing_dagruns(session).all()
+        assert needed == [orm_dag]
+        # set to None and ensure it's not sent to scheduler
+        orm_dag.max_active_runs = None
+        session.merge(orm_dag)
+        session.flush()
+        needed = DagModel.dags_needing_dagruns(session).all()
+        assert needed == []
+
+    def 
test_dags_needing_dagruns_doesnot_send_dagmodel_with_import_errors(self):
+        """
+        We check that has_import_error is false for dags
+        being set to scheduler to create dagruns
+        """
+        dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE)
+        DummyOperator(task_id='dummy', dag=dag, owner='airflow')
+
+        session = settings.Session()
+
+        orm_dag = DagModel(
+            dag_id=dag.dag_id,
+            has_task_concurrency_limits=False,
+            next_dagrun=DEFAULT_DATE,
+            next_dagrun_create_after=DEFAULT_DATE + timedelta(days=1),
+            is_active=True,
+        )
+        assert not orm_dag.has_import_errors
+        session.add(orm_dag)
+        session.flush()
+
+        needed = DagModel.dags_needing_dagruns(session).all()
+        assert needed == [orm_dag]
+        # set to None and ensure it's not sent to scheduler

Review comment:
       ```suggestion
   ```

##########
File path: tests/models/test_dag.py
##########
@@ -1898,8 +1951,65 @@ def test_dags_needing_dagruns_only_unpaused(self):
         dag_models = DagModel.dags_needing_dagruns(session).all()
         assert dag_models == []
 
-        session.rollback()
-        session.close()
+    def 
test_dags_needing_dagruns_doesnot_send_dagmodel_with_null_max_active_runs(self):
+        """
+        We check that max_active_runs must not be null for dags
+        being set to scheduler to create dagruns
+        """
+        dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE)
+        DummyOperator(task_id='dummy', dag=dag, owner='airflow')
+
+        session = settings.Session()
+
+        orm_dag = DagModel(
+            dag_id=dag.dag_id,
+            has_task_concurrency_limits=False,
+            next_dagrun=DEFAULT_DATE,
+            next_dagrun_create_after=DEFAULT_DATE + timedelta(days=1),
+            is_active=True,
+        )
+        assert orm_dag.max_active_runs is not None
+        session.add(orm_dag)
+        session.flush()
+
+        needed = DagModel.dags_needing_dagruns(session).all()
+        assert needed == [orm_dag]
+        # set to None and ensure it's not sent to scheduler
+        orm_dag.max_active_runs = None
+        session.merge(orm_dag)
+        session.flush()
+        needed = DagModel.dags_needing_dagruns(session).all()
+        assert needed == []
+

Review comment:
       ```suggestion
   ```
   
   I don't think we need this behaviour _and_ the explicit flag, so since we 
have added the flag we should just use that.
   
   In which case we should also re-title this PR please.

##########
File path: tests/models/test_dag.py
##########
@@ -1848,9 +1888,6 @@ def test_dags_needing_dagruns_not_too_early(self):
         dag_models = DagModel.dags_needing_dagruns(session).all()
         assert dag_models == []
 
-        session.rollback()
-        session.close()

Review comment:
       But with these rollbacks it _shouldn't_ need a clean method.

##########
File path: tests/models/test_dag.py
##########
@@ -1863,13 +1900,29 @@ def test_max_active_runs_not_none(self):
             next_dagrun_create_after=None,
             is_active=True,
         )
+        # assert max_active_runs updated
+        assert orm_dag.max_active_runs == 16
         session.add(orm_dag)
         session.flush()
-
         assert orm_dag.max_active_runs is not None
 
-        session.rollback()
-        session.close()
+    def test_dagmodel_has_import_error_is_false(self):
+        dag = DAG(dag_id='test_dag', start_date=timezone.datetime(2020, 1, 1))
+        DummyOperator(task_id='dummy', dag=dag, owner='airflow')
+
+        session = settings.Session()
+        orm_dag = DagModel(
+            dag_id=dag.dag_id,
+            next_dagrun=None,
+            next_dagrun_create_after=None,
+            is_active=True,
+        )
+        # assert has_import_error is false
+        assert not orm_dag.has_import_errors
+        session.add(orm_dag)
+        session.flush()
+
+        assert not orm_dag.has_import_errors

Review comment:
       This test isn't really adding much -- it's testing 
`self.has_import_errors = False` in the constructor.
   
   It certainly doesn't need a whole separate test case for this, it can be 
added as an assert somewhere else if we want it. You already test basically 
everything here in 
`test_dags_needing_dagruns_doesnot_send_dagmodel_with_import_errors`, so this 
whole test function isn't needed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to