This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new dcf3d39a15 Fix kubernetes task decorator pickle error (#31110)
dcf3d39a15 is described below

commit dcf3d39a1536b667ac0caaf61d5ecac3c67b42b0
Author: JCoder01 <[email protected]>
AuthorDate: Mon May 8 13:56:02 2023 -0400

    Fix kubernetes task decorator pickle error (#31110)
---
 .../cncf/kubernetes/decorators/kubernetes.py       |  7 ++++---
 .../cncf/kubernetes/decorators/test_kubernetes.py  | 24 ++++++++++++++++++++++
 2 files changed, 28 insertions(+), 3 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/decorators/kubernetes.py 
b/airflow/providers/cncf/kubernetes/decorators/kubernetes.py
index af3416b30f..337a54797d 100644
--- a/airflow/providers/cncf/kubernetes/decorators/kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/decorators/kubernetes.py
@@ -69,7 +69,7 @@ class _KubernetesDecoratedOperator(DecoratedOperator, 
KubernetesPodOperator):
     shallow_copy_attrs: Sequence[str] = ("python_callable",)
 
     def __init__(self, namespace: str = "default", use_dill: bool = False, 
**kwargs) -> None:
-        self.pickling_library = dill if use_dill else pickle
+        self.use_dill = use_dill
         super().__init__(
             namespace=namespace,
             name=kwargs.pop("name", f"k8s_airflow_pod_{uuid.uuid4().hex}"),
@@ -112,17 +112,18 @@ class _KubernetesDecoratedOperator(DecoratedOperator, 
KubernetesPodOperator):
 
     def execute(self, context: Context):
         with TemporaryDirectory(prefix="venv") as tmp_dir:
+            pickling_library = dill if self.use_dill else pickle
             script_filename = os.path.join(tmp_dir, "script.py")
             input_filename = os.path.join(tmp_dir, "script.in")
 
             with open(input_filename, "wb") as file:
-                self.pickling_library.dump({"args": self.op_args, "kwargs": 
self.op_kwargs}, file)
+                pickling_library.dump({"args": self.op_args, "kwargs": 
self.op_kwargs}, file)
 
             py_source = self.get_python_source()
             jinja_context = {
                 "op_args": self.op_args,
                 "op_kwargs": self.op_kwargs,
-                "pickling_library": self.pickling_library.__name__,
+                "pickling_library": pickling_library.__name__,
                 "python_callable": self.python_callable.__name__,
                 "python_callable_source": py_source,
                 "string_args_global": False,
diff --git a/tests/providers/cncf/kubernetes/decorators/test_kubernetes.py 
b/tests/providers/cncf/kubernetes/decorators/test_kubernetes.py
index 8493ba067f..0cc9a72c12 100644
--- a/tests/providers/cncf/kubernetes/decorators/test_kubernetes.py
+++ b/tests/providers/cncf/kubernetes/decorators/test_kubernetes.py
@@ -208,3 +208,27 @@ def test_kubernetes_with_marked_as_teardown(
     assert len(dag.task_group.children) == 1
     teardown_task = dag.task_group.children["f"]
     assert teardown_task._is_teardown
+
+
+def test_kubernetes_with_mini_scheduler(
+    dag_maker, session, mock_create_pod: mock.Mock, mock_hook: mock.Mock
+) -> None:
+    with dag_maker(session=session):
+
+        @task.kubernetes(
+            image="python:3.10-slim-buster",
+            in_cluster=False,
+            cluster_context="default",
+            config_file="/tmp/fake_file",
+        )
+        def f(arg1, arg2, kwarg1=None, kwarg2=None):
+            return {"key1": "value1", "key2": "value2"}
+
+        f1 = f.override(task_id="my_task_id", do_xcom_push=True)("arg1", 
"arg2", kwarg1="kwarg1")
+        f.override(task_id="my_task_id2", do_xcom_push=False)("arg1", "arg2", 
kwarg1=f1)
+
+    dr = dag_maker.create_dagrun()
+    (ti, _) = dr.task_instances
+
+    # check that mini-scheduler works
+    ti.schedule_downstream_tasks()

Reply via email to