This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-stable
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v1-10-stable by this push:
     new 760bcb8  Fix PythonVirtualenvOperator when using provide_context=True 
(#8256)
760bcb8 is described below

commit 760bcb8a0cf2cfbe8d5ff0309721b49d771b1412
Author: Luis Magana <[email protected]>
AuthorDate: Mon Aug 17 14:04:42 2020 -0500

    Fix PythonVirtualenvOperator when using provide_context=True (#8256)
    
    This change will ensure that provide_context=True works on v1.10.12 for 
PythonVirtualenvOperator
    
    Clean up code and simplified the solution to ignore just the unserializable 
items
    adding pytest for PythonVirtualenvOperator
---
 airflow/operators/python_operator.py    | 16 +++++---
 tests/operators/test_python_operator.py | 71 +++++++++++++++++++++++++++++++++
 2 files changed, 82 insertions(+), 5 deletions(-)

diff --git a/airflow/operators/python_operator.py 
b/airflow/operators/python_operator.py
index 392e0fc..a6f5ffd 100644
--- a/airflow/operators/python_operator.py
+++ b/airflow/operators/python_operator.py
@@ -330,13 +330,19 @@ class PythonVirtualenvOperator(PythonOperator):
 
     def _write_args(self, input_filename):
         # serialize args to file
+        if self.use_dill:
+            serializer = dill
+        else:
+            serializer = pickle
+        # some items from context can't be loaded in virtual env
+        # see pr https://github.com/apache/airflow/pull/8256
+        not_serializable = {'dag', 'task', 'ti', 'macros', 'task_instance', 
'var'}
         if self._pass_op_args():
+            kwargs = {key: value for key, value in self.op_kwargs.items()
+                      if key not in not_serializable}
             with open(input_filename, 'wb') as f:
-                arg_dict = ({'args': self.op_args, 'kwargs': self.op_kwargs})
-                if self.use_dill:
-                    dill.dump(arg_dict, f)
-                else:
-                    pickle.dump(arg_dict, f)
+                arg_dict = ({'args': self.op_args, 'kwargs': kwargs})
+                serializer.dump(arg_dict, f)
 
     def _read_result(self, output_filename):
         if os.stat(output_filename).st_size == 0:
diff --git a/tests/operators/test_python_operator.py 
b/tests/operators/test_python_operator.py
index a92213a..13a33b2 100644
--- a/tests/operators/test_python_operator.py
+++ b/tests/operators/test_python_operator.py
@@ -35,9 +35,11 @@ from airflow.models import TaskInstance as TI, DAG, DagRun
 from airflow.models.taskinstance import clear_task_instances
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.python_operator import PythonOperator, 
BranchPythonOperator
+from airflow.operators.python_operator import PythonVirtualenvOperator
 from airflow.operators.python_operator import ShortCircuitOperator
 from airflow.settings import Session
 from airflow.utils import timezone
+from tests.test_utils.db import clear_db_runs, clear_db_dags
 from airflow.utils.db import create_session
 from airflow.utils.state import State
 
@@ -341,6 +343,75 @@ class TestPythonOperator(TestPythonBase):
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
 
+class TestPythonVirtualenvOperator(TestPythonBase):
+    @classmethod
+    def setUpClass(cls):
+        super(TestPythonVirtualenvOperator, cls).setUpClass()
+        clear_db_runs()
+
+    def setUp(self):
+        super(TestPythonVirtualenvOperator, self).setUp()
+
+        def del_env(key):
+            try:
+                del os.environ[key]
+            except KeyError:
+                pass
+
+        del_env('AIRFLOW_CTX_DAG_ID')
+        del_env('AIRFLOW_CTX_TASK_ID')
+        del_env('AIRFLOW_CTX_EXECUTION_DATE')
+        del_env('AIRFLOW_CTX_DAG_RUN_ID')
+        self.dag = DAG(
+            'test_dag',
+            default_args={
+                'owner': 'airflow',
+                'start_date': DEFAULT_DATE})
+        self.addCleanup(self.dag.clear)
+        self.clear_run()
+        self.addCleanup(self.clear_run)
+
+    def tearDown(self):
+        super(TestPythonVirtualenvOperator, self).tearDown()
+        clear_db_runs()
+        clear_db_dags()
+
+        for var in TI_CONTEXT_ENV_VARS:
+            if var in os.environ:
+                del os.environ[var]
+
+    def clear_run(self):
+        self.run = False
+
+    def do_run(self):
+        self.run = True
+
+    def is_run(self):
+        return self.run
+
+    def test_config_context(self):
+        """
+        This test ensures we can use dag_run from the context
+        to access the configuration at run time that's being
+        passed from the UI, CLI, and REST API.
+        """
+        self.dag.create_dagrun(
+            run_id='manual__' + DEFAULT_DATE.isoformat(),
+            execution_date=DEFAULT_DATE,
+            start_date=DEFAULT_DATE,
+            state=State.RUNNING,
+            external_trigger=False,
+        )
+
+        def pass_function(**kwargs):
+            kwargs['dag_run'].conf
+
+        t = PythonVirtualenvOperator(task_id='config_dag_run', dag=self.dag,
+                                     provide_context=True,
+                                     python_callable=pass_function)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+
 class BranchOperatorTest(unittest.TestCase):
     @classmethod
     def setUpClass(cls):

Reply via email to