ashb commented on code in PR #44898:
URL: https://github.com/apache/airflow/pull/44898#discussion_r1884008896
##########
tests/dag_processing/test_processor.py:
##########
@@ -246,183 +243,6 @@ def test_process_file_should_failure_callback(self,
monkeypatch, tmp_path, get_t
msg = " ".join([str(k) for k in ti.key.primary]) + " fired callback"
assert msg in callback_file.read_text()
- @conf_vars({("core", "dagbag_import_error_tracebacks"): "False"})
- def
test_add_unparseable_file_before_sched_start_creates_import_error(self,
tmp_path):
- unparseable_filename = tmp_path.joinpath(TEMP_DAG_FILENAME).as_posix()
- with open(unparseable_filename, "w") as unparseable_file:
- unparseable_file.writelines(UNPARSEABLE_DAG_FILE_CONTENTS)
-
- with create_session() as session:
- self._process_file(unparseable_filename, dag_directory=tmp_path,
session=session)
- import_errors = session.query(ParseImportError).all()
-
- assert len(import_errors) == 1
- import_error = import_errors[0]
- assert import_error.filename == unparseable_filename
- assert import_error.stacktrace == f"invalid syntax
({TEMP_DAG_FILENAME}, line 1)"
- session.rollback()
-
- @conf_vars({("core", "dagbag_import_error_tracebacks"): "False"})
- def test_add_unparseable_zip_file_creates_import_error(self, tmp_path):
- zip_filename = (tmp_path / "test_zip.zip").as_posix()
- invalid_dag_filename = os.path.join(zip_filename, TEMP_DAG_FILENAME)
- with ZipFile(zip_filename, "w") as zip_file:
- zip_file.writestr(TEMP_DAG_FILENAME, UNPARSEABLE_DAG_FILE_CONTENTS)
-
- with create_session() as session:
- self._process_file(zip_filename, dag_directory=tmp_path,
session=session)
- import_errors = session.query(ParseImportError).all()
-
- assert len(import_errors) == 1
- import_error = import_errors[0]
- assert import_error.filename == invalid_dag_filename
- assert import_error.stacktrace == f"invalid syntax
({TEMP_DAG_FILENAME}, line 1)"
- session.rollback()
-
- @conf_vars({("core", "dagbag_import_error_tracebacks"): "False"})
- def test_dag_model_has_import_error_is_true_when_import_error_exists(self,
tmp_path, session):
- dag_file = os.path.join(TEST_DAGS_FOLDER,
"test_example_bash_operator.py")
- temp_dagfile = tmp_path.joinpath(TEMP_DAG_FILENAME).as_posix()
- with open(dag_file) as main_dag, open(temp_dagfile, "w") as next_dag:
- for line in main_dag:
- next_dag.write(line)
- # first we parse the dag
- self._process_file(temp_dagfile, dag_directory=tmp_path,
session=session)
- # assert DagModel.has_import_errors is false
- dm = session.query(DagModel).filter(DagModel.fileloc ==
temp_dagfile).first()
- assert not dm.has_import_errors
- # corrupt the file
- with open(temp_dagfile, "a") as file:
- file.writelines(UNPARSEABLE_DAG_FILE_CONTENTS)
-
- self._process_file(temp_dagfile, dag_directory=tmp_path,
session=session)
- import_errors = session.query(ParseImportError).all()
-
- assert len(import_errors) == 1
- import_error = import_errors[0]
- assert import_error.filename == temp_dagfile
- assert import_error.stacktrace
- dm = session.query(DagModel).filter(DagModel.fileloc ==
temp_dagfile).first()
- assert dm.has_import_errors
-
- def test_no_import_errors_with_parseable_dag(self, tmp_path):
- parseable_filename = tmp_path / TEMP_DAG_FILENAME
- parseable_filename.write_text(PARSEABLE_DAG_FILE_CONTENTS)
-
- with create_session() as session:
- self._process_file(parseable_filename.as_posix(),
dag_directory=tmp_path, session=session)
- import_errors = session.query(ParseImportError).all()
-
- assert len(import_errors) == 0
-
- session.rollback()
-
- def test_no_import_errors_with_parseable_dag_in_zip(self, tmp_path):
Review Comment:
There's plenty of tests of zipfile parsing in dagbag, I'll add `
assert not dagbag.import_errors` to one of them
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]