This is an automated email from the ASF dual-hosted git repository.

turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a54418  Move XCom tests to tests/models/test_xcom.py (#9601)
7a54418 is described below

commit 7a5441836bbc8bbcb0b02bf68dbd93d63fe4ec6a
Author: Kaxil Naik <[email protected]>
AuthorDate: Wed Jul 1 07:38:49 2020 +0100

    Move XCom tests to tests/models/test_xcom.py (#9601)
    
    Move XCom tests from `tests/models/test_cleartasks.py` to 
`tests/models/test_xcom.py`
---
 tests/models/test_cleartasks.py | 157 +--------------------------------------
 tests/models/test_xcom.py       | 161 +++++++++++++++++++++++++++++++++++++++-
 2 files changed, 160 insertions(+), 158 deletions(-)

diff --git a/tests/models/test_cleartasks.py b/tests/models/test_cleartasks.py
index 34a745a..e59e862 100644
--- a/tests/models/test_cleartasks.py
+++ b/tests/models/test_cleartasks.py
@@ -17,17 +17,14 @@
 # under the License.
 
 import datetime
-import os
 import unittest
 
 from airflow import settings
-from airflow.models import DAG, TaskInstance as TI, XCom, clear_task_instances
+from airflow.models import DAG, TaskInstance as TI, clear_task_instances
 from airflow.operators.dummy_operator import DummyOperator
-from airflow.utils import timezone
 from airflow.utils.session import create_session
 from airflow.utils.state import State
 from tests.models import DEFAULT_DATE
-from tests.test_utils.config import conf_vars
 
 
 class TestClearTasks(unittest.TestCase):
@@ -236,155 +233,3 @@ class TestClearTasks(unittest.TestCase):
         self.assertEqual(ti2.try_number, 2)
         # try_number (0) + retries(1)
         self.assertEqual(ti2.max_tries, 1)
-
-    @conf_vars({("core", "enable_xcom_pickling"): "False"})
-    def test_xcom_disable_pickle_type(self):
-        json_obj = {"key": "value"}
-        execution_date = timezone.utcnow()
-        key = "xcom_test1"
-        dag_id = "test_dag1"
-        task_id = "test_task1"
-        XCom.set(key=key,
-                 value=json_obj,
-                 dag_id=dag_id,
-                 task_id=task_id,
-                 execution_date=execution_date)
-
-        ret_value = XCom.get_many(key=key,
-                                  dag_ids=dag_id,
-                                  task_ids=task_id,
-                                  execution_date=execution_date).first().value
-
-        self.assertEqual(ret_value, json_obj)
-
-        session = settings.Session()
-        ret_value = session.query(XCom).filter(XCom.key == key, XCom.dag_id == 
dag_id,
-                                               XCom.task_id == task_id,
-                                               XCom.execution_date == 
execution_date
-                                               ).first().value
-
-        self.assertEqual(ret_value, json_obj)
-
-    @conf_vars({("core", "enable_xcom_pickling"): "False"})
-    def test_xcom_get_one_disable_pickle_type(self):
-        json_obj = {"key": "value"}
-        execution_date = timezone.utcnow()
-        key = "xcom_test1"
-        dag_id = "test_dag1"
-        task_id = "test_task1"
-        XCom.set(key=key,
-                 value=json_obj,
-                 dag_id=dag_id,
-                 task_id=task_id,
-                 execution_date=execution_date)
-
-        ret_value = XCom.get_one(key=key,
-                                 dag_id=dag_id,
-                                 task_id=task_id,
-                                 execution_date=execution_date)
-
-        self.assertEqual(ret_value, json_obj)
-
-        session = settings.Session()
-        ret_value = session.query(XCom).filter(XCom.key == key, XCom.dag_id == 
dag_id,
-                                               XCom.task_id == task_id,
-                                               XCom.execution_date == 
execution_date
-                                               ).first().value
-
-        self.assertEqual(ret_value, json_obj)
-
-    @conf_vars({("core", "enable_xcom_pickling"): "True"})
-    def test_xcom_enable_pickle_type(self):
-        json_obj = {"key": "value"}
-        execution_date = timezone.utcnow()
-        key = "xcom_test2"
-        dag_id = "test_dag2"
-        task_id = "test_task2"
-        XCom.set(key=key,
-                 value=json_obj,
-                 dag_id=dag_id,
-                 task_id=task_id,
-                 execution_date=execution_date)
-
-        ret_value = XCom.get_many(key=key,
-                                  dag_ids=dag_id,
-                                  task_ids=task_id,
-                                  execution_date=execution_date).first().value
-
-        self.assertEqual(ret_value, json_obj)
-
-        session = settings.Session()
-        ret_value = session.query(XCom).filter(XCom.key == key, XCom.dag_id == 
dag_id,
-                                               XCom.task_id == task_id,
-                                               XCom.execution_date == 
execution_date
-                                               ).first().value
-
-        self.assertEqual(ret_value, json_obj)
-
-    @conf_vars({("core", "enable_xcom_pickling"): "True"})
-    def test_xcom_get_one_enable_pickle_type(self):
-        json_obj = {"key": "value"}
-        execution_date = timezone.utcnow()
-        key = "xcom_test3"
-        dag_id = "test_dag"
-        task_id = "test_task3"
-        XCom.set(key=key,
-                 value=json_obj,
-                 dag_id=dag_id,
-                 task_id=task_id,
-                 execution_date=execution_date)
-
-        ret_value = XCom.get_one(key=key,
-                                 dag_id=dag_id,
-                                 task_id=task_id,
-                                 execution_date=execution_date)
-
-        self.assertEqual(ret_value, json_obj)
-
-        session = settings.Session()
-        ret_value = session.query(XCom).filter(XCom.key == key, XCom.dag_id == 
dag_id,
-                                               XCom.task_id == task_id,
-                                               XCom.execution_date == 
execution_date
-                                               ).first().value
-
-        self.assertEqual(ret_value, json_obj)
-
-    @conf_vars({("core", "xcom_enable_pickling"): "False"})
-    def test_xcom_disable_pickle_type_fail_on_non_json(self):
-        class PickleRce:
-            def __reduce__(self):
-                return os.system, ("ls -alt",)
-
-        self.assertRaises(TypeError, XCom.set,
-                          key="xcom_test3",
-                          value=PickleRce(),
-                          dag_id="test_dag3",
-                          task_id="test_task3",
-                          execution_date=timezone.utcnow())
-
-    @conf_vars({("core", "xcom_enable_pickling"): "True"})
-    def test_xcom_get_many(self):
-        json_obj = {"key": "value"}
-        execution_date = timezone.utcnow()
-        key = "xcom_test4"
-        dag_id1 = "test_dag4"
-        task_id1 = "test_task4"
-        dag_id2 = "test_dag5"
-        task_id2 = "test_task5"
-
-        XCom.set(key=key,
-                 value=json_obj,
-                 dag_id=dag_id1,
-                 task_id=task_id1,
-                 execution_date=execution_date)
-
-        XCom.set(key=key,
-                 value=json_obj,
-                 dag_id=dag_id2,
-                 task_id=task_id2,
-                 execution_date=execution_date)
-
-        results = XCom.get_many(key=key, execution_date=execution_date)
-
-        for result in results:
-            self.assertEqual(result.value, json_obj)
diff --git a/tests/models/test_xcom.py b/tests/models/test_xcom.py
index 7640750..cb9cf0a 100644
--- a/tests/models/test_xcom.py
+++ b/tests/models/test_xcom.py
@@ -14,8 +14,13 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import os
+import unittest
+
+from airflow import settings
 from airflow.configuration import conf
-from airflow.models.xcom import BaseXCom, resolve_xcom_backend
+from airflow.models.xcom import BaseXCom, XCom, resolve_xcom_backend
+from airflow.utils import timezone
 from tests.test_utils.config import conf_vars
 
 
@@ -25,7 +30,7 @@ class CustomXCom(BaseXCom):
         return "custom_value"
 
 
-class TestXCom:
+class TestXCom(unittest.TestCase):
     @conf_vars({("core", "xcom_backend"): "tests.models.test_xcom.CustomXCom"})
     def test_resolve_xcom_class(self):
         cls = resolve_xcom_backend()
@@ -47,3 +52,155 @@ class TestXCom:
         cls = resolve_xcom_backend()
         assert issubclass(cls, BaseXCom)
         assert cls().serialize_value([1]) == b"[1]"
+
+    @conf_vars({("core", "enable_xcom_pickling"): "False"})
+    def test_xcom_disable_pickle_type(self):
+        json_obj = {"key": "value"}
+        execution_date = timezone.utcnow()
+        key = "xcom_test1"
+        dag_id = "test_dag1"
+        task_id = "test_task1"
+        XCom.set(key=key,
+                 value=json_obj,
+                 dag_id=dag_id,
+                 task_id=task_id,
+                 execution_date=execution_date)
+
+        ret_value = XCom.get_many(key=key,
+                                  dag_ids=dag_id,
+                                  task_ids=task_id,
+                                  execution_date=execution_date).first().value
+
+        self.assertEqual(ret_value, json_obj)
+
+        session = settings.Session()
+        ret_value = session.query(XCom).filter(XCom.key == key, XCom.dag_id == 
dag_id,
+                                               XCom.task_id == task_id,
+                                               XCom.execution_date == 
execution_date
+                                               ).first().value
+
+        self.assertEqual(ret_value, json_obj)
+
+    @conf_vars({("core", "enable_xcom_pickling"): "False"})
+    def test_xcom_get_one_disable_pickle_type(self):
+        json_obj = {"key": "value"}
+        execution_date = timezone.utcnow()
+        key = "xcom_test1"
+        dag_id = "test_dag1"
+        task_id = "test_task1"
+        XCom.set(key=key,
+                 value=json_obj,
+                 dag_id=dag_id,
+                 task_id=task_id,
+                 execution_date=execution_date)
+
+        ret_value = XCom.get_one(key=key,
+                                 dag_id=dag_id,
+                                 task_id=task_id,
+                                 execution_date=execution_date)
+
+        self.assertEqual(ret_value, json_obj)
+
+        session = settings.Session()
+        ret_value = session.query(XCom).filter(XCom.key == key, XCom.dag_id == 
dag_id,
+                                               XCom.task_id == task_id,
+                                               XCom.execution_date == 
execution_date
+                                               ).first().value
+
+        self.assertEqual(ret_value, json_obj)
+
+    @conf_vars({("core", "enable_xcom_pickling"): "True"})
+    def test_xcom_enable_pickle_type(self):
+        json_obj = {"key": "value"}
+        execution_date = timezone.utcnow()
+        key = "xcom_test2"
+        dag_id = "test_dag2"
+        task_id = "test_task2"
+        XCom.set(key=key,
+                 value=json_obj,
+                 dag_id=dag_id,
+                 task_id=task_id,
+                 execution_date=execution_date)
+
+        ret_value = XCom.get_many(key=key,
+                                  dag_ids=dag_id,
+                                  task_ids=task_id,
+                                  execution_date=execution_date).first().value
+
+        self.assertEqual(ret_value, json_obj)
+
+        session = settings.Session()
+        ret_value = session.query(XCom).filter(XCom.key == key, XCom.dag_id == 
dag_id,
+                                               XCom.task_id == task_id,
+                                               XCom.execution_date == 
execution_date
+                                               ).first().value
+
+        self.assertEqual(ret_value, json_obj)
+
+    @conf_vars({("core", "enable_xcom_pickling"): "True"})
+    def test_xcom_get_one_enable_pickle_type(self):
+        json_obj = {"key": "value"}
+        execution_date = timezone.utcnow()
+        key = "xcom_test3"
+        dag_id = "test_dag"
+        task_id = "test_task3"
+        XCom.set(key=key,
+                 value=json_obj,
+                 dag_id=dag_id,
+                 task_id=task_id,
+                 execution_date=execution_date)
+
+        ret_value = XCom.get_one(key=key,
+                                 dag_id=dag_id,
+                                 task_id=task_id,
+                                 execution_date=execution_date)
+
+        self.assertEqual(ret_value, json_obj)
+
+        session = settings.Session()
+        ret_value = session.query(XCom).filter(XCom.key == key, XCom.dag_id == 
dag_id,
+                                               XCom.task_id == task_id,
+                                               XCom.execution_date == 
execution_date
+                                               ).first().value
+
+        self.assertEqual(ret_value, json_obj)
+
+    @conf_vars({("core", "xcom_enable_pickling"): "False"})
+    def test_xcom_disable_pickle_type_fail_on_non_json(self):
+        class PickleRce:
+            def __reduce__(self):
+                return os.system, ("ls -alt",)
+
+        self.assertRaises(TypeError, XCom.set,
+                          key="xcom_test3",
+                          value=PickleRce(),
+                          dag_id="test_dag3",
+                          task_id="test_task3",
+                          execution_date=timezone.utcnow())
+
+    @conf_vars({("core", "xcom_enable_pickling"): "True"})
+    def test_xcom_get_many(self):
+        json_obj = {"key": "value"}
+        execution_date = timezone.utcnow()
+        key = "xcom_test4"
+        dag_id1 = "test_dag4"
+        task_id1 = "test_task4"
+        dag_id2 = "test_dag5"
+        task_id2 = "test_task5"
+
+        XCom.set(key=key,
+                 value=json_obj,
+                 dag_id=dag_id1,
+                 task_id=task_id1,
+                 execution_date=execution_date)
+
+        XCom.set(key=key,
+                 value=json_obj,
+                 dag_id=dag_id2,
+                 task_id=task_id2,
+                 execution_date=execution_date)
+
+        results = XCom.get_many(key=key, execution_date=execution_date)
+
+        for result in results:
+            self.assertEqual(result.value, json_obj)

Reply via email to