This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b4f3efd36a Add tests to PythonOperator (#30362)
b4f3efd36a is described below

commit b4f3efd36a0566ef9d34baf071d935c0655a02ef
Author: Vincent <[email protected]>
AuthorDate: Sat Apr 8 00:45:35 2023 -0600

    Add tests to PythonOperator (#30362)
    
    * Add tests to airflow/operators/python.py
    
    * Convert log error of _BasePythonVirtualenvOperator._read_result() into a 
custom exception class
    
    * Improve deserialization error handling
    
    ---------
    
    Co-authored-by: Shahar Epstein <[email protected]>
    Co-authored-by: Tzu-ping Chung <[email protected]>
---
 airflow/exceptions.py          | 10 ++++++++++
 airflow/operators/python.py    | 13 +++++++------
 docs/spelling_wordlist.txt     |  1 +
 tests/operators/test_python.py | 30 +++++++++++++++++++++++++++++-
 4 files changed, 47 insertions(+), 7 deletions(-)

diff --git a/airflow/exceptions.py b/airflow/exceptions.py
index 4bf946fd8e..eccd53aa70 100644
--- a/airflow/exceptions.py
+++ b/airflow/exceptions.py
@@ -372,3 +372,13 @@ class 
AirflowProviderDeprecationWarning(DeprecationWarning):
 
     deprecated_provider_since: str | None = None
     "Indicates the provider version that started raising this deprecation 
warning"
+
+
+class DeserializingResultError(ValueError):
+    """Raised when an error is encountered while a pickling library 
deserializes a pickle file."""
+
+    def __str__(self):
+        return (
+            "Error deserializing result. Note that result deserialization "
+            "is not supported across major Python versions."
+        )
diff --git a/airflow/operators/python.py b/airflow/operators/python.py
index 8a3fa58123..db92d855d9 100644
--- a/airflow/operators/python.py
+++ b/airflow/operators/python.py
@@ -33,7 +33,12 @@ from typing import Any, Callable, Collection, Iterable, 
Mapping, Sequence
 
 import dill
 
-from airflow.exceptions import AirflowConfigException, AirflowException, 
RemovedInAirflow3Warning
+from airflow.exceptions import (
+    AirflowConfigException,
+    AirflowException,
+    DeserializingResultError,
+    RemovedInAirflow3Warning,
+)
 from airflow.models.baseoperator import BaseOperator
 from airflow.models.skipmixin import SkipMixin
 from airflow.models.taskinstance import _CURRENT_CONTEXT
@@ -371,11 +376,7 @@ class _BasePythonVirtualenvOperator(PythonOperator, 
metaclass=ABCMeta):
         try:
             return self.pickling_library.loads(path.read_bytes())
         except ValueError:
-            self.log.error(
-                "Error deserializing result. Note that result deserialization "
-                "is not supported across major Python versions."
-            )
-            raise
+            raise DeserializingResultError()
 
     def __deepcopy__(self, memo):
         # module objects can't be copied _at all__
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index d603114eea..f6ef595e8d 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -425,6 +425,7 @@ deserialize
 Deserialized
 deserialized
 deserializer
+deserializes
 deserializing
 dest
 dev
diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py
index 64d824f8f8..d25f712dca 100644
--- a/tests/operators/test_python.py
+++ b/tests/operators/test_python.py
@@ -31,7 +31,7 @@ from unittest import mock
 import pytest
 from slugify import slugify
 
-from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
+from airflow.exceptions import AirflowException, DeserializingResultError, 
RemovedInAirflow3Warning
 from airflow.models import DAG, DagRun, TaskInstance as TI
 from airflow.models.baseoperator import BaseOperator
 from airflow.models.taskinstance import clear_task_instances, 
set_current_context
@@ -277,6 +277,20 @@ class TestPythonOperator(BasePythonTest):
             assert "Done. Returned value was: test_return_value" not in 
caplog.messages
             assert "Done. Returned value not shown" in caplog.messages
 
+    def test_python_operator_templates_exts(self):
+        def func():
+            return "test_return_value"
+
+        python_operator = PythonOperator(
+            task_id="python_operator",
+            python_callable=func,
+            dag=self.dag,
+            show_return_value_in_logs=False,
+            templates_exts=["test_ext"],
+        )
+
+        assert python_operator.template_ext == ["test_ext"]
+
 
 class TestBranchOperator(BasePythonTest):
     opcls = BranchPythonOperator
@@ -932,6 +946,20 @@ class TestPythonVirtualenvOperator(BasePythonTest):
         }
         assert set(context) == declared_keys
 
+    def test_except_value_error(self):
+        def f():
+            return 1
+
+        task = PythonVirtualenvOperator(
+            python_callable=f,
+            task_id="task",
+            dag=self.dag,
+        )
+
+        task.pickling_library.loads = 
mock.Mock(side_effect=DeserializingResultError)
+        with pytest.raises(DeserializingResultError):
+            task._read_result(path=mock.Mock())
+
 
 class TestCurrentContext:
     def test_current_context_no_context_raise(self):

Reply via email to