ashb commented on code in PR #44972:
URL: https://github.com/apache/airflow/pull/44972#discussion_r1891811381
##########
tests/dag_processing/test_processor.py:
##########
@@ -75,243 +70,158 @@ def disable_load_example():
@pytest.mark.usefixtures("disable_load_example")
class TestDagFileProcessor:
- @staticmethod
- def clean_db():
- clear_db_runs()
- clear_db_dags()
- clear_db_jobs()
- clear_db_serialized_dags()
-
- def setup_class(self):
- self.clean_db()
-
- def setup_method(self):
- # Speed up some tests by not running the tasks, just look at what we
- # enqueue!
- self.null_exec = MockExecutor()
- self.scheduler_job = None
-
- def teardown_method(self) -> None:
- if self.scheduler_job and
self.scheduler_job.job_runner.processor_agent:
- self.scheduler_job.job_runner.processor_agent.end()
- self.scheduler_job = None
- self.clean_db()
-
- def _process_file(self, file_path, dag_directory, session):
- dag_file_processor =
DagFileProcessor(dag_directory=str(dag_directory), log=mock.MagicMock())
-
- dag_file_processor.process_file(file_path, [])
-
- @patch.object(TaskInstance, "handle_failure")
- def test_execute_on_failure_callbacks(self, mock_ti_handle_failure):
- dagbag = DagBag(dag_folder="/dev/null", include_examples=True,
read_dags_from_db=False)
- dag_file_processor = DagFileProcessor(dag_directory=TEST_DAGS_FOLDER,
log=mock.MagicMock())
- with create_session() as session:
- session.query(TaskInstance).delete()
- dag = dagbag.get_dag("example_branch_operator")
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST}
if AIRFLOW_V_3_0_PLUS else {}
- dagrun = dag.create_dagrun(
- state=State.RUNNING,
- logical_date=DEFAULT_DATE,
- run_type=DagRunType.SCHEDULED,
- data_interval=dag.infer_automated_data_interval(DEFAULT_DATE),
- session=session,
- **triggered_by_kwargs,
- )
- task = dag.get_task(task_id="run_this_first")
- ti = TaskInstance(task, run_id=dagrun.run_id, state=State.RUNNING)
- session.add(ti)
-
- requests = [
- TaskCallbackRequest(
- full_filepath="A",
simple_task_instance=SimpleTaskInstance.from_ti(ti), msg="Message"
- )
- ]
- dag_file_processor.execute_callbacks(dagbag, requests,
dag_file_processor.UNIT_TEST_MODE, session)
- mock_ti_handle_failure.assert_called_once_with(
- error="Message", test_mode=conf.getboolean("core",
"unit_test_mode"), session=session
+ def _process_file(
+ self, file_path, callback_requests: list[CallbackRequest] | None = None
+ ) -> DagFileParsingResult:
+ return _parse_file(
+ DagFileParseRequest(file=file_path, requests_fd=1,
callback_requests=callback_requests or []),
+ log=structlog.get_logger(),
)
+ @pytest.mark.xfail(reason="TODO: AIP-72")
@pytest.mark.parametrize(
["has_serialized_dag"],
[pytest.param(True, id="dag_in_db"), pytest.param(False,
id="no_dag_found")],
)
@patch.object(TaskInstance, "handle_failure")
def test_execute_on_failure_callbacks_without_dag(self,
mock_ti_handle_failure, has_serialized_dag):
dagbag = DagBag(dag_folder="/dev/null", include_examples=True,
read_dags_from_db=False)
- dag_file_processor = DagFileProcessor(dag_directory=TEST_DAGS_FOLDER,
log=mock.MagicMock())
with create_session() as session:
session.query(TaskInstance).delete()
dag = dagbag.get_dag("example_branch_operator")
+ assert dag is not None
dag.sync_to_db()
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST}
if AIRFLOW_V_3_0_PLUS else {}
dagrun = dag.create_dagrun(
- state=State.RUNNING,
+ state=DagRunState.RUNNING,
logical_date=DEFAULT_DATE,
run_type=DagRunType.SCHEDULED,
data_interval=dag.infer_automated_data_interval(DEFAULT_DATE),
+ triggered_by=DagRunTriggeredByType.TEST,
session=session,
- **triggered_by_kwargs,
)
task = dag.get_task(task_id="run_this_first")
- ti = TaskInstance(task, run_id=dagrun.run_id, state=State.QUEUED)
+ ti = TaskInstance(task, run_id=dagrun.run_id,
state=TaskInstanceState.QUEUED)
session.add(ti)
if has_serialized_dag:
assert SerializedDagModel.write_dag(dag, session=session) is
True
session.flush()
- requests = [
- TaskCallbackRequest(
- full_filepath="A",
simple_task_instance=SimpleTaskInstance.from_ti(ti), msg="Message"
- )
- ]
- dag_file_processor.execute_callbacks_without_dag(requests, True,
session)
+ requests = [TaskCallbackRequest(full_filepath="A", ti=ti,
msg="Message")]
+ self._process_file(dag.fileloc, requests)
mock_ti_handle_failure.assert_called_once_with(
error="Message", test_mode=conf.getboolean("core",
"unit_test_mode"), session=session
)
- def test_failure_callbacks_should_not_drop_hostname(self):
- dagbag = DagBag(dag_folder="/dev/null", include_examples=True,
read_dags_from_db=False)
- dag_file_processor = DagFileProcessor(dag_directory=TEST_DAGS_FOLDER,
log=mock.MagicMock())
- dag_file_processor.UNIT_TEST_MODE = False
+ def test_dagbag_import_errors_captured(self, spy_agency: SpyAgency):
+ @spy_agency.spy_for(DagBag.collect_dags, owner=DagBag)
+ def fake_collect_dags(dagbag: DagBag, *args, **kwargs):
+ dagbag.import_errors["a.py"] = "Import error"
+
+ resp = self._process_file("a.py")
+
+ assert not resp.serialized_dags
+ assert resp.import_errors is not None
+ assert "a.py" in resp.import_errors
+
+
+# @conf_vars({("logging", "dag_processor_log_target"): "stdout"})
Review Comment:
Leaving these comments out for now as I want to overhaul the logging in
https://github.com/apache/airflow/issues/45072
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]