This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v1-10-stable
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v1-10-stable by this push:
new 760bcb8 Fix PythonVirtualenvOperator when using provide_context=True
(#8256)
760bcb8 is described below
commit 760bcb8a0cf2cfbe8d5ff0309721b49d771b1412
Author: Luis Magana <[email protected]>
AuthorDate: Mon Aug 17 14:04:42 2020 -0500
Fix PythonVirtualenvOperator when using provide_context=True (#8256)
This change will ensure that provide_context=True works on v1.10.12 for
PythonVirtualenvOperator
Clean up code and simplified the solution to ignore just the unserializable
items
adding pytest for PythonVirtualenvOperator
---
airflow/operators/python_operator.py | 16 +++++---
tests/operators/test_python_operator.py | 71 +++++++++++++++++++++++++++++++++
2 files changed, 82 insertions(+), 5 deletions(-)
diff --git a/airflow/operators/python_operator.py
b/airflow/operators/python_operator.py
index 392e0fc..a6f5ffd 100644
--- a/airflow/operators/python_operator.py
+++ b/airflow/operators/python_operator.py
@@ -330,13 +330,19 @@ class PythonVirtualenvOperator(PythonOperator):
def _write_args(self, input_filename):
# serialize args to file
+ if self.use_dill:
+ serializer = dill
+ else:
+ serializer = pickle
+ # some items from context can't be loaded in virtual env
+ # see pr https://github.com/apache/airflow/pull/8256
+ not_serializable = {'dag', 'task', 'ti', 'macros', 'task_instance',
'var'}
if self._pass_op_args():
+ kwargs = {key: value for key, value in self.op_kwargs.items()
+ if key not in not_serializable}
with open(input_filename, 'wb') as f:
- arg_dict = ({'args': self.op_args, 'kwargs': self.op_kwargs})
- if self.use_dill:
- dill.dump(arg_dict, f)
- else:
- pickle.dump(arg_dict, f)
+ arg_dict = ({'args': self.op_args, 'kwargs': kwargs})
+ serializer.dump(arg_dict, f)
def _read_result(self, output_filename):
if os.stat(output_filename).st_size == 0:
diff --git a/tests/operators/test_python_operator.py
b/tests/operators/test_python_operator.py
index a92213a..13a33b2 100644
--- a/tests/operators/test_python_operator.py
+++ b/tests/operators/test_python_operator.py
@@ -35,9 +35,11 @@ from airflow.models import TaskInstance as TI, DAG, DagRun
from airflow.models.taskinstance import clear_task_instances
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator,
BranchPythonOperator
+from airflow.operators.python_operator import PythonVirtualenvOperator
from airflow.operators.python_operator import ShortCircuitOperator
from airflow.settings import Session
from airflow.utils import timezone
+from tests.test_utils.db import clear_db_runs, clear_db_dags
from airflow.utils.db import create_session
from airflow.utils.state import State
@@ -341,6 +343,75 @@ class TestPythonOperator(TestPythonBase):
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+class TestPythonVirtualenvOperator(TestPythonBase):
+ @classmethod
+ def setUpClass(cls):
+ super(TestPythonVirtualenvOperator, cls).setUpClass()
+ clear_db_runs()
+
+ def setUp(self):
+ super(TestPythonVirtualenvOperator, self).setUp()
+
+ def del_env(key):
+ try:
+ del os.environ[key]
+ except KeyError:
+ pass
+
+ del_env('AIRFLOW_CTX_DAG_ID')
+ del_env('AIRFLOW_CTX_TASK_ID')
+ del_env('AIRFLOW_CTX_EXECUTION_DATE')
+ del_env('AIRFLOW_CTX_DAG_RUN_ID')
+ self.dag = DAG(
+ 'test_dag',
+ default_args={
+ 'owner': 'airflow',
+ 'start_date': DEFAULT_DATE})
+ self.addCleanup(self.dag.clear)
+ self.clear_run()
+ self.addCleanup(self.clear_run)
+
+ def tearDown(self):
+ super(TestPythonVirtualenvOperator, self).tearDown()
+ clear_db_runs()
+ clear_db_dags()
+
+ for var in TI_CONTEXT_ENV_VARS:
+ if var in os.environ:
+ del os.environ[var]
+
+ def clear_run(self):
+ self.run = False
+
+ def do_run(self):
+ self.run = True
+
+ def is_run(self):
+ return self.run
+
+ def test_config_context(self):
+ """
+ This test ensures we can use dag_run from the context
+ to access the configuration at run time that's being
+ passed from the UI, CLI, and REST API.
+ """
+ self.dag.create_dagrun(
+ run_id='manual__' + DEFAULT_DATE.isoformat(),
+ execution_date=DEFAULT_DATE,
+ start_date=DEFAULT_DATE,
+ state=State.RUNNING,
+ external_trigger=False,
+ )
+
+ def pass_function(**kwargs):
+ kwargs['dag_run'].conf
+
+ t = PythonVirtualenvOperator(task_id='config_dag_run', dag=self.dag,
+ provide_context=True,
+ python_callable=pass_function)
+ t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+
class BranchOperatorTest(unittest.TestCase):
@classmethod
def setUpClass(cls):