This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d9e25c40f1 Fix failing tests in dag_processor/test_job_runner for db
isolation mode (#41013)
d9e25c40f1 is described below
commit d9e25c40f158e6a5c516f0fd6066b945ecc52054
Author: Jarek Potiuk <[email protected]>
AuthorDate: Wed Jul 24 22:04:03 2024 +0200
Fix failing tests in dag_processor/test_job_runner for db isolation mode
(#41013)
This PR fixes all the tests for dag_processing for db isolation mode.
Some of them required slight changes (for example session object
is not used by internal-api components and asserts for it won't work).
Some of them required fixes in the codebase as they found real errors.
Logging at the internal-api server for request/request parameterss sent
and responses have been changed to info from debug - this is
useful information to show when internal api server works and we can
lower the level when we are closing to releasing 2.10.0
---
airflow/api_internal/endpoints/rpc_api_endpoint.py | 3 ++-
airflow/serialization/enums.py | 3 +++
airflow/serialization/serialized_objects.py | 13 +++++++++++++
tests/dag_processing/test_job_runner.py | 7 ++++---
4 files changed, 22 insertions(+), 4 deletions(-)
diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py
b/airflow/api_internal/endpoints/rpc_api_endpoint.py
index c3d322f01f..4d6fc150b0 100644
--- a/airflow/api_internal/endpoints/rpc_api_endpoint.py
+++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py
@@ -220,13 +220,14 @@ def internal_airflow_api(body: dict[str, Any]) ->
APIResponse:
except Exception:
return log_and_build_error_response(message="Error deserializing
parameters.", status=400)
- log.debug("Calling method %s\nparams: %s", method_name, params)
+ log.info("Calling method %s\nparams: %s", method_name, params)
try:
# Session must be created there as it may be needed by serializer for
lazy-loaded fields.
with create_session() as session:
output = handler(**params, session=session)
output_json = BaseSerialization.serialize(output,
use_pydantic_models=True)
response = json.dumps(output_json) if output_json is not None else
None
+ log.info("Sending response: %s", response)
return Response(response=response, headers={"Content-Type":
"application/json"})
except Exception:
return log_and_build_error_response(message=f"Error executing method
'{method_name}'.", status=500)
diff --git a/airflow/serialization/enums.py b/airflow/serialization/enums.py
index 7f8bead6ca..7d84fea373 100644
--- a/airflow/serialization/enums.py
+++ b/airflow/serialization/enums.py
@@ -68,3 +68,6 @@ class DagAttributeTypes(str, Enum):
CONNECTION = "connection"
TASK_CONTEXT = "task_context"
ARG_NOT_SET = "arg_not_set"
+ TASK_CALLBACK_REQUEST = "task_callback_request"
+ DAG_CALLBACK_REQUEST = "dag_callback_request"
+ SLA_CALLBACK_REQUEST = "sla_callback_request"
diff --git a/airflow/serialization/serialized_objects.py
b/airflow/serialization/serialized_objects.py
index bb85983d95..90797d9535 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -35,6 +35,7 @@ import lazy_object_proxy
from dateutil import relativedelta
from pendulum.tz.timezone import FixedTimezone, Timezone
+from airflow.callbacks.callback_requests import DagCallbackRequest,
SlaCallbackRequest, TaskCallbackRequest
from airflow.compat.functools import cache
from airflow.configuration import conf
from airflow.datasets import BaseDataset, Dataset, DatasetAlias, DatasetAll,
DatasetAny
@@ -676,6 +677,12 @@ class BaseSerialization:
)
elif isinstance(var, Connection):
return cls._encode(var.to_dict(validate=True),
type_=DAT.CONNECTION)
+ elif isinstance(var, TaskCallbackRequest):
+ return cls._encode(var.to_json(), type_=DAT.TASK_CALLBACK_REQUEST)
+ elif isinstance(var, DagCallbackRequest):
+ return cls._encode(var.to_json(), type_=DAT.DAG_CALLBACK_REQUEST)
+ elif isinstance(var, SlaCallbackRequest):
+ return cls._encode(var.to_json(), type_=DAT.SLA_CALLBACK_REQUEST)
elif var.__class__ == Context:
d = {}
for k, v in var._context.items():
@@ -793,6 +800,12 @@ class BaseSerialization:
return SimpleTaskInstance(**cls.deserialize(var))
elif type_ == DAT.CONNECTION:
return Connection(**var)
+ elif type_ == DAT.TASK_CALLBACK_REQUEST:
+ return TaskCallbackRequest.from_json(var)
+ elif type_ == DAT.DAG_CALLBACK_REQUEST:
+ return DagCallbackRequest.from_json(var)
+ elif type_ == DAT.SLA_CALLBACK_REQUEST:
+ return SlaCallbackRequest.from_json(var)
elif use_pydantic_models and _ENABLE_AIP_44:
return _type_to_class[type_][0].model_validate(var)
elif type_ == DAT.ARG_NOT_SET:
diff --git a/tests/dag_processing/test_job_runner.py
b/tests/dag_processing/test_job_runner.py
index b937a29ce2..83adc23d25 100644
--- a/tests/dag_processing/test_job_runner.py
+++ b/tests/dag_processing/test_job_runner.py
@@ -576,13 +576,12 @@ class TestDagProcessorJobRunner:
> (freezed_base_time -
manager.processor.get_last_finish_time("file_1.py")).total_seconds()
)
- @mock.patch("sqlalchemy.orm.session.Session.delete")
@mock.patch("zipfile.is_zipfile", return_value=True)
@mock.patch("airflow.utils.file.might_contain_dag", return_value=True)
@mock.patch("airflow.utils.file.find_path_from_directory",
return_value=True)
@mock.patch("airflow.utils.file.os.path.isfile", return_value=True)
def test_file_paths_in_queue_sorted_by_priority(
- self, mock_isfile, mock_find_path, mock_might_contain_dag,
mock_zipfile, session_delete
+ self, mock_isfile, mock_find_path, mock_might_contain_dag, mock_zipfile
):
from airflow.models.dagbag import DagPriorityParsingRequest
@@ -614,7 +613,9 @@ class TestDagProcessorJobRunner:
assert manager.processor._file_path_queue == deque(
["file_1.py", "file_2.py", "file_3.py", "file_4.py"]
)
- assert session_delete.call_args[0][0].fileloc ==
parsing_request.fileloc
+ with create_session() as session2:
+ parsing_request_after =
session2.query(DagPriorityParsingRequest).get(parsing_request.id)
+ assert parsing_request_after is None
def test_scan_stale_dags(self):
"""