maganaluis commented on a change in pull request #8256:
URL: https://github.com/apache/airflow/pull/8256#discussion_r436408970



##########
File path: tests/operators/test_python_operator.py
##########
@@ -340,6 +341,80 @@ def test_echo_env_variables(self):
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
 
+class TestPythonVirtualenvOperator(TestPythonBase):
+    @classmethod
+    def setUpClass(cls):
+        super(TestPythonVirtualenvOperator, cls).setUpClass()
+        with create_session() as session:
+            session.query(DagRun).delete()
+            session.query(TI).delete()
+            session.commit()
+
+    def setUp(self):
+        super(TestPythonVirtualenvOperator, self).setUp()
+
+        def del_env(key):
+            try:
+                del os.environ[key]
+            except KeyError:
+                pass
+
+        del_env('AIRFLOW_CTX_DAG_ID')
+        del_env('AIRFLOW_CTX_TASK_ID')
+        del_env('AIRFLOW_CTX_EXECUTION_DATE')
+        del_env('AIRFLOW_CTX_DAG_RUN_ID')
+        self.dag = DAG(
+            'test_dag',
+            default_args={
+                'owner': 'airflow',
+                'start_date': DEFAULT_DATE})
+        self.addCleanup(self.dag.clear)
+        self.clear_run()
+        self.addCleanup(self.clear_run)
+
+    def tearDown(self):
+        super(TestPythonVirtualenvOperator, self).tearDown()
+        with create_session() as session:
+            session.query(DagRun).delete()
+            session.query(TI).delete()
+            session.commit()
+
+        for var in TI_CONTEXT_ENV_VARS:
+            if var in os.environ:
+                del os.environ[var]
+
+    def clear_run(self):
+        self.run = False
+
+    def do_run(self):
+        self.run = True
+
+    def is_run(self):
+        return self.run
+
+    def test_config_context(self):
+        """
+        This test ensures we can use dag_run from the context
+        to access the configuration at run time that's being
+        passed from the UI, CLI, and REST API.
+        """
+        self.dag.create_dagrun(
+            run_id='manual__' + DEFAULT_DATE.isoformat(),
+            execution_date=DEFAULT_DATE,
+            start_date=DEFAULT_DATE,
+            state=State.RUNNING,
+            external_trigger=False,
+        )
+
+        def pass_function(**kwargs):
+            kwargs['dag_run'].conf
+
+        t = PythonVirtualenvOperator(task_id='config_dag_run', dag=self.dag,
+                                     provide_context=True,
+                                     python_callable=pass_function)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)

Review comment:
       This test runs at about 2 sec, that's not too slow... 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to