This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1c9d1c2f8a fix `_DockerDecoratedOperator` module type attribute pickle 
error (#35293)
1c9d1c2f8a is described below

commit 1c9d1c2f8abc33c9235fe7700362793dcc27a137
Author: phi-friday <[email protected]>
AuthorDate: Wed Nov 1 00:24:59 2023 +0900

    fix `_DockerDecoratedOperator` module type attribute pickle error (#35293)
---
 airflow/providers/docker/decorators/docker.py    |  8 ++++-
 tests/providers/docker/decorators/test_docker.py | 43 ++++++++++++++++++++++++
 2 files changed, 50 insertions(+), 1 deletion(-)

diff --git a/airflow/providers/docker/decorators/docker.py 
b/airflow/providers/docker/decorators/docker.py
index 30827bb6fe..153b55d4ab 100644
--- a/airflow/providers/docker/decorators/docker.py
+++ b/airflow/providers/docker/decorators/docker.py
@@ -89,7 +89,7 @@ class _DockerDecoratedOperator(DecoratedOperator, 
DockerOperator):
         command = "placeholder command"
         self.python_command = python_command
         self.expect_airflow = expect_airflow
-        self.pickling_library = dill if use_dill else pickle
+        self.use_dill = use_dill
         super().__init__(
             command=command, retrieve_output=True, 
retrieve_output_path="/tmp/script.out", **kwargs
         )
@@ -143,6 +143,12 @@ class _DockerDecoratedOperator(DecoratedOperator, 
DockerOperator):
         res = remove_task_decorator(res, self.custom_operator_name)
         return res
 
+    @property
+    def pickling_library(self):
+        if self.use_dill:
+            return dill
+        return pickle
+
 
 def docker_task(
     python_callable: Callable | None = None,
diff --git a/tests/providers/docker/decorators/test_docker.py 
b/tests/providers/docker/decorators/test_docker.py
index 26d4cb5b6c..c70fd5a379 100644
--- a/tests/providers/docker/decorators/test_docker.py
+++ b/tests/providers/docker/decorators/test_docker.py
@@ -190,3 +190,46 @@ class TestDockerDecorator:
         teardown_task = dag.task_group.children["f"]
         assert teardown_task.is_teardown
         assert teardown_task.on_failure_fail_dagrun is on_failure_fail_dagrun
+
+    @pytest.mark.parametrize("use_dill", [True, False])
+    def test_deepcopy_with_python_operator(self, dag_maker, use_dill):
+        import copy
+
+        from airflow.providers.docker.decorators.docker import 
_DockerDecoratedOperator
+
+        @task.docker(image="python:3.9-slim", auto_remove="force", 
use_dill=use_dill)
+        def f():
+            import logging
+
+            logger = logging.getLogger("airflow.task")
+            logger.info("info log in docker")
+
+        @task.python()
+        def g():
+            import logging
+
+            logger = logging.getLogger("airflow.task")
+            logger.info("info log in python")
+
+        with dag_maker() as dag:
+            docker_task = f()
+            python_task = g()
+            _ = python_task >> docker_task
+
+        docker_operator = getattr(docker_task, "operator", None)
+        assert isinstance(docker_operator, _DockerDecoratedOperator)
+        task_id = docker_operator.task_id
+
+        assert isinstance(dag, DAG)
+        assert hasattr(dag, "task_dict")
+        assert isinstance(dag.task_dict, dict)
+        assert task_id in dag.task_dict
+
+        some_task = dag.task_dict[task_id]
+        clone_of_docker_operator = copy.deepcopy(docker_operator)
+        assert isinstance(some_task, _DockerDecoratedOperator)
+        assert isinstance(clone_of_docker_operator, _DockerDecoratedOperator)
+        assert some_task.command == clone_of_docker_operator.command
+        assert some_task.expect_airflow == 
clone_of_docker_operator.expect_airflow
+        assert some_task.use_dill == clone_of_docker_operator.use_dill
+        assert some_task.pickling_library is 
clone_of_docker_operator.pickling_library

Reply via email to