This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b4f3efd36a Add tests to PythonOperator (#30362)
b4f3efd36a is described below
commit b4f3efd36a0566ef9d34baf071d935c0655a02ef
Author: Vincent <[email protected]>
AuthorDate: Sat Apr 8 00:45:35 2023 -0600
Add tests to PythonOperator (#30362)
* Add tests to airflow/operators/python.py
* Convert log error of _BasePythonVirtualenvOperator._read_result() into a
custom exception class
* Improve deserialization error handling
---------
Co-authored-by: Shahar Epstein <[email protected]>
Co-authored-by: Tzu-ping Chung <[email protected]>
---
airflow/exceptions.py | 10 ++++++++++
airflow/operators/python.py | 13 +++++++------
docs/spelling_wordlist.txt | 1 +
tests/operators/test_python.py | 30 +++++++++++++++++++++++++++++-
4 files changed, 47 insertions(+), 7 deletions(-)
diff --git a/airflow/exceptions.py b/airflow/exceptions.py
index 4bf946fd8e..eccd53aa70 100644
--- a/airflow/exceptions.py
+++ b/airflow/exceptions.py
@@ -372,3 +372,13 @@ class
AirflowProviderDeprecationWarning(DeprecationWarning):
deprecated_provider_since: str | None = None
"Indicates the provider version that started raising this deprecation
warning"
+
+
+class DeserializingResultError(ValueError):
+ """Raised when an error is encountered while a pickling library
deserializes a pickle file."""
+
+ def __str__(self):
+ return (
+ "Error deserializing result. Note that result deserialization "
+ "is not supported across major Python versions."
+ )
diff --git a/airflow/operators/python.py b/airflow/operators/python.py
index 8a3fa58123..db92d855d9 100644
--- a/airflow/operators/python.py
+++ b/airflow/operators/python.py
@@ -33,7 +33,12 @@ from typing import Any, Callable, Collection, Iterable,
Mapping, Sequence
import dill
-from airflow.exceptions import AirflowConfigException, AirflowException,
RemovedInAirflow3Warning
+from airflow.exceptions import (
+ AirflowConfigException,
+ AirflowException,
+ DeserializingResultError,
+ RemovedInAirflow3Warning,
+)
from airflow.models.baseoperator import BaseOperator
from airflow.models.skipmixin import SkipMixin
from airflow.models.taskinstance import _CURRENT_CONTEXT
@@ -371,11 +376,7 @@ class _BasePythonVirtualenvOperator(PythonOperator,
metaclass=ABCMeta):
try:
return self.pickling_library.loads(path.read_bytes())
except ValueError:
- self.log.error(
- "Error deserializing result. Note that result deserialization "
- "is not supported across major Python versions."
- )
- raise
+ raise DeserializingResultError()
def __deepcopy__(self, memo):
# module objects can't be copied _at all__
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index d603114eea..f6ef595e8d 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -425,6 +425,7 @@ deserialize
Deserialized
deserialized
deserializer
+deserializes
deserializing
dest
dev
diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py
index 64d824f8f8..d25f712dca 100644
--- a/tests/operators/test_python.py
+++ b/tests/operators/test_python.py
@@ -31,7 +31,7 @@ from unittest import mock
import pytest
from slugify import slugify
-from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
+from airflow.exceptions import AirflowException, DeserializingResultError,
RemovedInAirflow3Warning
from airflow.models import DAG, DagRun, TaskInstance as TI
from airflow.models.baseoperator import BaseOperator
from airflow.models.taskinstance import clear_task_instances,
set_current_context
@@ -277,6 +277,20 @@ class TestPythonOperator(BasePythonTest):
assert "Done. Returned value was: test_return_value" not in
caplog.messages
assert "Done. Returned value not shown" in caplog.messages
+ def test_python_operator_templates_exts(self):
+ def func():
+ return "test_return_value"
+
+ python_operator = PythonOperator(
+ task_id="python_operator",
+ python_callable=func,
+ dag=self.dag,
+ show_return_value_in_logs=False,
+ templates_exts=["test_ext"],
+ )
+
+ assert python_operator.template_ext == ["test_ext"]
+
class TestBranchOperator(BasePythonTest):
opcls = BranchPythonOperator
@@ -932,6 +946,20 @@ class TestPythonVirtualenvOperator(BasePythonTest):
}
assert set(context) == declared_keys
+ def test_except_value_error(self):
+ def f():
+ return 1
+
+ task = PythonVirtualenvOperator(
+ python_callable=f,
+ task_id="task",
+ dag=self.dag,
+ )
+
+ task.pickling_library.loads =
mock.Mock(side_effect=DeserializingResultError)
+ with pytest.raises(DeserializingResultError):
+ task._read_result(path=mock.Mock())
+
class TestCurrentContext:
def test_current_context_no_context_raise(self):