uranusjr commented on a change in pull request #18141:
URL: https://github.com/apache/airflow/pull/18141#discussion_r706131668



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -954,7 +954,7 @@ def _schedule_dag_run(
             unfinished_task_instances = (
                 session.query(TI)
                 .filter(TI.dag_id == dag_run.dag_id)
-                .filter(TI.execution_date == dag_run.execution_date)
+                .filter(TI.run_id == dag_run.run_id)

Review comment:
       Because AIP-39. This code path does not seem to be covered by any 
existing case?

##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -3176,40 +3177,40 @@ def 
test_execute_queries_count_with_harvested_dags(self, expected_query_count, d
 
                     self.scheduler_job._run_scheduler_loop()
 
-    @parameterized.expand(
+    @pytest.mark.parametrize(
+        "expected_query_counts, dag_count, task_count, start_ago, 
schedule_interval, shape",
         [
-            # expected, dag_count, task_count, start_ago, schedule_interval, 
shape
-            # One DAG with one task per DAG file
-            ([9, 9, 9, 9], 1, 1, "1d", "None", "no_structure"),
-            ([9, 9, 9, 9], 1, 1, "1d", "None", "linear"),
-            ([21, 12, 12, 12], 1, 1, "1d", "@once", "no_structure"),
-            ([21, 12, 12, 12], 1, 1, "1d", "@once", "linear"),
-            ([21, 22, 24, 26], 1, 1, "1d", "30m", "no_structure"),
-            ([21, 22, 24, 26], 1, 1, "1d", "30m", "linear"),
-            ([21, 22, 24, 26], 1, 1, "1d", "30m", "binary_tree"),
-            ([21, 22, 24, 26], 1, 1, "1d", "30m", "star"),
-            ([21, 22, 24, 26], 1, 1, "1d", "30m", "grid"),
-            # One DAG with five tasks per DAG  file
-            ([9, 9, 9, 9], 1, 5, "1d", "None", "no_structure"),
-            ([9, 9, 9, 9], 1, 5, "1d", "None", "linear"),
-            ([21, 12, 12, 12], 1, 5, "1d", "@once", "no_structure"),
-            ([22, 13, 13, 13], 1, 5, "1d", "@once", "linear"),
-            ([21, 22, 24, 26], 1, 5, "1d", "30m", "no_structure"),
-            ([22, 24, 27, 30], 1, 5, "1d", "30m", "linear"),
-            ([22, 24, 27, 30], 1, 5, "1d", "30m", "binary_tree"),
-            ([22, 24, 27, 30], 1, 5, "1d", "30m", "star"),
-            ([22, 24, 27, 30], 1, 5, "1d", "30m", "grid"),
-            # 10 DAGs with 10 tasks per DAG file
-            ([9, 9, 9, 9], 10, 10, "1d", "None", "no_structure"),
-            ([9, 9, 9, 9], 10, 10, "1d", "None", "linear"),
-            ([84, 27, 27, 27], 10, 10, "1d", "@once", "no_structure"),
-            ([94, 40, 40, 40], 10, 10, "1d", "@once", "linear"),
-            ([84, 88, 88, 88], 10, 10, "1d", "30m", "no_structure"),
-            ([94, 114, 114, 114], 10, 10, "1d", "30m", "linear"),
-            ([94, 108, 108, 108], 10, 10, "1d", "30m", "binary_tree"),
-            ([94, 108, 108, 108], 10, 10, "1d", "30m", "star"),
-            ([94, 108, 108, 108], 10, 10, "1d", "30m", "grid"),
-        ]
+            # One DAG with one task per DAG file.
+            ([10, 10, 10, 10], 1, 1, "1d", "None", "no_structure"),
+            ([10, 10, 10, 10], 1, 1, "1d", "None", "linear"),
+            ([23, 13, 13, 13], 1, 1, "1d", "@once", "no_structure"),
+            ([23, 13, 13, 13], 1, 1, "1d", "@once", "linear"),
+            ([23, 24, 26, 28], 1, 1, "1d", "30m", "no_structure"),
+            ([23, 24, 26, 28], 1, 1, "1d", "30m", "linear"),
+            ([23, 24, 26, 28], 1, 1, "1d", "30m", "binary_tree"),
+            ([23, 24, 26, 28], 1, 1, "1d", "30m", "star"),
+            ([23, 24, 26, 28], 1, 1, "1d", "30m", "grid"),
+            # One DAG with five tasks per DAG file.
+            ([10, 10, 10, 10], 1, 5, "1d", "None", "no_structure"),
+            ([10, 10, 10, 10], 1, 5, "1d", "None", "linear"),
+            ([23, 13, 13, 13], 1, 5, "1d", "@once", "no_structure"),
+            ([24, 14, 14, 14], 1, 5, "1d", "@once", "linear"),
+            ([23, 24, 26, 28], 1, 5, "1d", "30m", "no_structure"),
+            ([24, 26, 29, 32], 1, 5, "1d", "30m", "linear"),
+            ([24, 26, 29, 32], 1, 5, "1d", "30m", "binary_tree"),
+            ([24, 26, 29, 32], 1, 5, "1d", "30m", "star"),
+            ([24, 26, 29, 32], 1, 5, "1d", "30m", "grid"),
+            # 10 DAGs with 10 tasks per DAG file.
+            ([10, 10, 10, 10], 10, 10, "1d", "None", "no_structure"),
+            ([10, 10, 10, 10], 10, 10, "1d", "None", "linear"),
+            ([95, 28, 28, 28], 10, 10, "1d", "@once", "no_structure"),
+            ([105, 41, 41, 41], 10, 10, "1d", "@once", "linear"),
+            ([95, 99, 99, 99], 10, 10, "1d", "30m", "no_structure"),
+            ([105, 125, 125, 125], 10, 10, "1d", "30m", "linear"),
+            ([105, 119, 119, 119], 10, 10, "1d", "30m", "binary_tree"),
+            ([105, 119, 119, 119], 10, 10, "1d", "30m", "star"),
+            ([105, 119, 119, 119], 10, 10, "1d", "30m", "grid"),
+        ],

Review comment:
       Query count changes in this function are not related to AIP-39 (I 
tracked this breakage to a few months ago).

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -795,7 +795,7 @@ def _create_dagruns_for_dags(self, guard, session):
         guard.commit()
         # END: create dagruns
 
-    def _create_dag_runs(self, dag_models: Iterable[DagModel], session: 
Session) -> None:
+    def _create_dag_runs(self, dag_models: Collection[DagModel], session: 
Session) -> None:

Review comment:
       This annotation was wrong. `dag_models` is iterated over multiple times 
in this function, and an iterable does not satisfy that usage (an iterator can 
only be iterated through once).

##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -3258,25 +3269,25 @@ def test_should_mark_dummy_task_as_success(self):
         dagbag = DagBag(dag_folder=dag_file, include_examples=False, 
read_dags_from_db=False)
         dagbag.sync_to_db()
 
-        self.scheduler_job_job = SchedulerJob(subdir=os.devnull)
-        self.scheduler_job_job.processor_agent = mock.MagicMock()
-        dag = self.scheduler_job_job.dagbag.get_dag("test_only_dummy_tasks")
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.processor_agent = mock.MagicMock()
+        dag = self.scheduler_job.dagbag.get_dag("test_only_dummy_tasks")

Review comment:
       Totally unrelated to AIP-39.

##########
File path: tests/executors/test_celery_executor.py
##########
@@ -529,7 +529,7 @@ def test_send_tasks_to_celery_hang(register_signals):
     executor = celery_executor.CeleryExecutor()
 
     task = MockTask()
-    task_tuples_to_send = [(None, None, None, None, task) for _ in range(26)]
+    task_tuples_to_send = [(None, None, None, task) for _ in range(26)]

Review comment:
       `_send_tasks_to_celery` was modified to accept 4-tuples (not 5) a while 
ago, but this test was never updated. Not related to AIP-39.

##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -1915,37 +1916,39 @@ def test_verify_integrity_if_dag_changed(self, 
dag_maker):
         session.close()
 
     @pytest.mark.quarantined
+    @pytest.mark.need_serialized_dag
     def test_retry_still_in_executor(self, dag_maker):
         """
         Checks if the scheduler does not put a task in limbo, when a task is 
retried
         but is still present in the executor.
         """
         executor = MockExecutor(do_update=False)
-        dagbag = DagBag(dag_folder=os.path.join(settings.DAGS_FOLDER, 
"no_dags.py"), include_examples=False)
-        dagbag.dags.clear()
-
-        with dag_maker(dag_id='test_retry_still_in_executor', 
schedule_interval="@once") as dag:
-            dag_task1 = BashOperator(
-                task_id='test_retry_handling_op',
-                bash_command='exit 1',
-                retries=1,
-            )
 
         with create_session() as session:
-            orm_dag = DagModel(dag_id=dag.dag_id)
-            orm_dag.is_paused = False
-            session.merge(orm_dag)
+            with dag_maker(
+                dag_id='test_retry_still_in_executor',
+                schedule_interval="@once",
+                session=session,
+            ):
+                dag_task1 = BashOperator(
+                    task_id='test_retry_handling_op',
+                    bash_command='exit 1',
+                    retries=1,
+                )
+            dag_maker.dag_model.calculate_dagrun_date_fields(dag_maker.dag, 
None)
 
-        @mock.patch('airflow.dag_processing.processor.DagBag', 
return_value=dagbag)
-        def do_schedule(mock_dagbag):
+        @provide_session
+        def do_schedule(session):
             # Use a empty file since the above mock will return the
             # expected DAGs. Also specify only a single file so that it doesn't
             # try to schedule the above DAG repeatedly.
-            self.scheduler_job = SchedulerJob(
-                num_runs=1, executor=executor, 
subdir=os.path.join(settings.DAGS_FOLDER, "no_dags.py")
-            )
+            self.scheduler_job = SchedulerJob(num_runs=1, executor=executor, 
subdir=os.devnull)
+            self.scheduler_job.dagbag = dag_maker.dagbag
             self.scheduler_job.heartrate = 0
-            self.scheduler_job.run()
+            # Since the DAG is not in the directory watched by scheduler job,
+            # it would've been marked as deleted and not being scheduled.
+            with mock.patch.object(DagModel, "deactivate_deleted_dags"):
+                self.scheduler_job.run()

Review comment:
       This test was broken after we implemented auto DAG deactivation. Not 
related to AIP-39.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to