This is an automated email from the ASF dual-hosted git repository.
turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 7a54418 Move XCom tests to tests/models/test_xcom.py (#9601)
7a54418 is described below
commit 7a5441836bbc8bbcb0b02bf68dbd93d63fe4ec6a
Author: Kaxil Naik <[email protected]>
AuthorDate: Wed Jul 1 07:38:49 2020 +0100
Move XCom tests to tests/models/test_xcom.py (#9601)
Move XCom tests from `tests/models/test_cleartasks.py` to
`tests/models/test_xcom.py`
---
tests/models/test_cleartasks.py | 157 +--------------------------------------
tests/models/test_xcom.py | 161 +++++++++++++++++++++++++++++++++++++++-
2 files changed, 160 insertions(+), 158 deletions(-)
diff --git a/tests/models/test_cleartasks.py b/tests/models/test_cleartasks.py
index 34a745a..e59e862 100644
--- a/tests/models/test_cleartasks.py
+++ b/tests/models/test_cleartasks.py
@@ -17,17 +17,14 @@
# under the License.
import datetime
-import os
import unittest
from airflow import settings
-from airflow.models import DAG, TaskInstance as TI, XCom, clear_task_instances
+from airflow.models import DAG, TaskInstance as TI, clear_task_instances
from airflow.operators.dummy_operator import DummyOperator
-from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.state import State
from tests.models import DEFAULT_DATE
-from tests.test_utils.config import conf_vars
class TestClearTasks(unittest.TestCase):
@@ -236,155 +233,3 @@ class TestClearTasks(unittest.TestCase):
self.assertEqual(ti2.try_number, 2)
# try_number (0) + retries(1)
self.assertEqual(ti2.max_tries, 1)
-
- @conf_vars({("core", "enable_xcom_pickling"): "False"})
- def test_xcom_disable_pickle_type(self):
- json_obj = {"key": "value"}
- execution_date = timezone.utcnow()
- key = "xcom_test1"
- dag_id = "test_dag1"
- task_id = "test_task1"
- XCom.set(key=key,
- value=json_obj,
- dag_id=dag_id,
- task_id=task_id,
- execution_date=execution_date)
-
- ret_value = XCom.get_many(key=key,
- dag_ids=dag_id,
- task_ids=task_id,
- execution_date=execution_date).first().value
-
- self.assertEqual(ret_value, json_obj)
-
- session = settings.Session()
- ret_value = session.query(XCom).filter(XCom.key == key, XCom.dag_id ==
dag_id,
- XCom.task_id == task_id,
- XCom.execution_date ==
execution_date
- ).first().value
-
- self.assertEqual(ret_value, json_obj)
-
- @conf_vars({("core", "enable_xcom_pickling"): "False"})
- def test_xcom_get_one_disable_pickle_type(self):
- json_obj = {"key": "value"}
- execution_date = timezone.utcnow()
- key = "xcom_test1"
- dag_id = "test_dag1"
- task_id = "test_task1"
- XCom.set(key=key,
- value=json_obj,
- dag_id=dag_id,
- task_id=task_id,
- execution_date=execution_date)
-
- ret_value = XCom.get_one(key=key,
- dag_id=dag_id,
- task_id=task_id,
- execution_date=execution_date)
-
- self.assertEqual(ret_value, json_obj)
-
- session = settings.Session()
- ret_value = session.query(XCom).filter(XCom.key == key, XCom.dag_id ==
dag_id,
- XCom.task_id == task_id,
- XCom.execution_date ==
execution_date
- ).first().value
-
- self.assertEqual(ret_value, json_obj)
-
- @conf_vars({("core", "enable_xcom_pickling"): "True"})
- def test_xcom_enable_pickle_type(self):
- json_obj = {"key": "value"}
- execution_date = timezone.utcnow()
- key = "xcom_test2"
- dag_id = "test_dag2"
- task_id = "test_task2"
- XCom.set(key=key,
- value=json_obj,
- dag_id=dag_id,
- task_id=task_id,
- execution_date=execution_date)
-
- ret_value = XCom.get_many(key=key,
- dag_ids=dag_id,
- task_ids=task_id,
- execution_date=execution_date).first().value
-
- self.assertEqual(ret_value, json_obj)
-
- session = settings.Session()
- ret_value = session.query(XCom).filter(XCom.key == key, XCom.dag_id ==
dag_id,
- XCom.task_id == task_id,
- XCom.execution_date ==
execution_date
- ).first().value
-
- self.assertEqual(ret_value, json_obj)
-
- @conf_vars({("core", "enable_xcom_pickling"): "True"})
- def test_xcom_get_one_enable_pickle_type(self):
- json_obj = {"key": "value"}
- execution_date = timezone.utcnow()
- key = "xcom_test3"
- dag_id = "test_dag"
- task_id = "test_task3"
- XCom.set(key=key,
- value=json_obj,
- dag_id=dag_id,
- task_id=task_id,
- execution_date=execution_date)
-
- ret_value = XCom.get_one(key=key,
- dag_id=dag_id,
- task_id=task_id,
- execution_date=execution_date)
-
- self.assertEqual(ret_value, json_obj)
-
- session = settings.Session()
- ret_value = session.query(XCom).filter(XCom.key == key, XCom.dag_id ==
dag_id,
- XCom.task_id == task_id,
- XCom.execution_date ==
execution_date
- ).first().value
-
- self.assertEqual(ret_value, json_obj)
-
- @conf_vars({("core", "xcom_enable_pickling"): "False"})
- def test_xcom_disable_pickle_type_fail_on_non_json(self):
- class PickleRce:
- def __reduce__(self):
- return os.system, ("ls -alt",)
-
- self.assertRaises(TypeError, XCom.set,
- key="xcom_test3",
- value=PickleRce(),
- dag_id="test_dag3",
- task_id="test_task3",
- execution_date=timezone.utcnow())
-
- @conf_vars({("core", "xcom_enable_pickling"): "True"})
- def test_xcom_get_many(self):
- json_obj = {"key": "value"}
- execution_date = timezone.utcnow()
- key = "xcom_test4"
- dag_id1 = "test_dag4"
- task_id1 = "test_task4"
- dag_id2 = "test_dag5"
- task_id2 = "test_task5"
-
- XCom.set(key=key,
- value=json_obj,
- dag_id=dag_id1,
- task_id=task_id1,
- execution_date=execution_date)
-
- XCom.set(key=key,
- value=json_obj,
- dag_id=dag_id2,
- task_id=task_id2,
- execution_date=execution_date)
-
- results = XCom.get_many(key=key, execution_date=execution_date)
-
- for result in results:
- self.assertEqual(result.value, json_obj)
diff --git a/tests/models/test_xcom.py b/tests/models/test_xcom.py
index 7640750..cb9cf0a 100644
--- a/tests/models/test_xcom.py
+++ b/tests/models/test_xcom.py
@@ -14,8 +14,13 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+import os
+import unittest
+
+from airflow import settings
from airflow.configuration import conf
-from airflow.models.xcom import BaseXCom, resolve_xcom_backend
+from airflow.models.xcom import BaseXCom, XCom, resolve_xcom_backend
+from airflow.utils import timezone
from tests.test_utils.config import conf_vars
@@ -25,7 +30,7 @@ class CustomXCom(BaseXCom):
return "custom_value"
-class TestXCom:
+class TestXCom(unittest.TestCase):
@conf_vars({("core", "xcom_backend"): "tests.models.test_xcom.CustomXCom"})
def test_resolve_xcom_class(self):
cls = resolve_xcom_backend()
@@ -47,3 +52,155 @@ class TestXCom:
cls = resolve_xcom_backend()
assert issubclass(cls, BaseXCom)
assert cls().serialize_value([1]) == b"[1]"
+
+ @conf_vars({("core", "enable_xcom_pickling"): "False"})
+ def test_xcom_disable_pickle_type(self):
+ json_obj = {"key": "value"}
+ execution_date = timezone.utcnow()
+ key = "xcom_test1"
+ dag_id = "test_dag1"
+ task_id = "test_task1"
+ XCom.set(key=key,
+ value=json_obj,
+ dag_id=dag_id,
+ task_id=task_id,
+ execution_date=execution_date)
+
+ ret_value = XCom.get_many(key=key,
+ dag_ids=dag_id,
+ task_ids=task_id,
+ execution_date=execution_date).first().value
+
+ self.assertEqual(ret_value, json_obj)
+
+ session = settings.Session()
+ ret_value = session.query(XCom).filter(XCom.key == key, XCom.dag_id ==
dag_id,
+ XCom.task_id == task_id,
+ XCom.execution_date ==
execution_date
+ ).first().value
+
+ self.assertEqual(ret_value, json_obj)
+
+ @conf_vars({("core", "enable_xcom_pickling"): "False"})
+ def test_xcom_get_one_disable_pickle_type(self):
+ json_obj = {"key": "value"}
+ execution_date = timezone.utcnow()
+ key = "xcom_test1"
+ dag_id = "test_dag1"
+ task_id = "test_task1"
+ XCom.set(key=key,
+ value=json_obj,
+ dag_id=dag_id,
+ task_id=task_id,
+ execution_date=execution_date)
+
+ ret_value = XCom.get_one(key=key,
+ dag_id=dag_id,
+ task_id=task_id,
+ execution_date=execution_date)
+
+ self.assertEqual(ret_value, json_obj)
+
+ session = settings.Session()
+ ret_value = session.query(XCom).filter(XCom.key == key, XCom.dag_id ==
dag_id,
+ XCom.task_id == task_id,
+ XCom.execution_date ==
execution_date
+ ).first().value
+
+ self.assertEqual(ret_value, json_obj)
+
+ @conf_vars({("core", "enable_xcom_pickling"): "True"})
+ def test_xcom_enable_pickle_type(self):
+ json_obj = {"key": "value"}
+ execution_date = timezone.utcnow()
+ key = "xcom_test2"
+ dag_id = "test_dag2"
+ task_id = "test_task2"
+ XCom.set(key=key,
+ value=json_obj,
+ dag_id=dag_id,
+ task_id=task_id,
+ execution_date=execution_date)
+
+ ret_value = XCom.get_many(key=key,
+ dag_ids=dag_id,
+ task_ids=task_id,
+ execution_date=execution_date).first().value
+
+ self.assertEqual(ret_value, json_obj)
+
+ session = settings.Session()
+ ret_value = session.query(XCom).filter(XCom.key == key, XCom.dag_id ==
dag_id,
+ XCom.task_id == task_id,
+ XCom.execution_date ==
execution_date
+ ).first().value
+
+ self.assertEqual(ret_value, json_obj)
+
+ @conf_vars({("core", "enable_xcom_pickling"): "True"})
+ def test_xcom_get_one_enable_pickle_type(self):
+ json_obj = {"key": "value"}
+ execution_date = timezone.utcnow()
+ key = "xcom_test3"
+ dag_id = "test_dag"
+ task_id = "test_task3"
+ XCom.set(key=key,
+ value=json_obj,
+ dag_id=dag_id,
+ task_id=task_id,
+ execution_date=execution_date)
+
+ ret_value = XCom.get_one(key=key,
+ dag_id=dag_id,
+ task_id=task_id,
+ execution_date=execution_date)
+
+ self.assertEqual(ret_value, json_obj)
+
+ session = settings.Session()
+ ret_value = session.query(XCom).filter(XCom.key == key, XCom.dag_id ==
dag_id,
+ XCom.task_id == task_id,
+ XCom.execution_date ==
execution_date
+ ).first().value
+
+ self.assertEqual(ret_value, json_obj)
+
+ @conf_vars({("core", "xcom_enable_pickling"): "False"})
+ def test_xcom_disable_pickle_type_fail_on_non_json(self):
+ class PickleRce:
+ def __reduce__(self):
+ return os.system, ("ls -alt",)
+
+ self.assertRaises(TypeError, XCom.set,
+ key="xcom_test3",
+ value=PickleRce(),
+ dag_id="test_dag3",
+ task_id="test_task3",
+ execution_date=timezone.utcnow())
+
+ @conf_vars({("core", "xcom_enable_pickling"): "True"})
+ def test_xcom_get_many(self):
+ json_obj = {"key": "value"}
+ execution_date = timezone.utcnow()
+ key = "xcom_test4"
+ dag_id1 = "test_dag4"
+ task_id1 = "test_task4"
+ dag_id2 = "test_dag5"
+ task_id2 = "test_task5"
+
+ XCom.set(key=key,
+ value=json_obj,
+ dag_id=dag_id1,
+ task_id=task_id1,
+ execution_date=execution_date)
+
+ XCom.set(key=key,
+ value=json_obj,
+ dag_id=dag_id2,
+ task_id=task_id2,
+ execution_date=execution_date)
+
+ results = XCom.get_many(key=key, execution_date=execution_date)
+
+ for result in results:
+ self.assertEqual(result.value, json_obj)