uranusjr commented on a change in pull request #19825:
URL: https://github.com/apache/airflow/pull/19825#discussion_r761012500
##########
File path: tests/models/test_xcom.py
##########
@@ -212,29 +146,231 @@ def
test_xcom_init_on_load_uses_orm_deserialize_value(self, mock_orm_deserialize
task_id="task_id",
dag_id="dag_id",
)
-
instance.init_on_load()
mock_orm_deserialize.assert_called_once_with()
@conf_vars({("core", "xcom_backend"): "tests.models.test_xcom.CustomXCom"})
- def test_get_one_doesnt_use_orm_deserialize_value(self, session):
+ def test_get_one_custom_backend_no_use_orm_deserialize_value(self,
dag_run, session):
"""Test that XCom.get_one does not call orm_deserialize_value"""
- json_obj = {"key": "value"}
- execution_date = timezone.utcnow()
- key = XCOM_RETURN_KEY
- dag_id = "test_dag"
- task_id = "test_task"
-
XCom = resolve_xcom_backend()
XCom.set(
- key=key,
- value=json_obj,
- dag_id=dag_id,
- task_id=task_id,
- execution_date=execution_date,
+ key=XCOM_RETURN_KEY,
+ value={"key": "value"},
+ dag_id=dag_run.dag_id,
+ task_id="test_task",
+ run_id=dag_run.run_id,
+ session=session,
+ )
+
+ value = XCom.get_one(
+ dag_id=dag_run.dag_id,
+ task_id="test_task",
+ run_id=dag_run.run_id,
session=session,
)
+ assert value == {"key": "value"}
+ XCom.orm_deserialize_value.assert_not_called()
+
+
+class TestXComGet:
+ @pytest.fixture(
+ autouse=True,
+ params=[
+ pytest.param("true", id="enable_xcom_pickling=true"),
+ pytest.param("false", id="enable_xcom_pickling=false"),
+ ],
+ )
+ def setup_xcom(self, request):
+ with conf_vars({("core", "enable_xcom_pickling"): str(request.param)}):
+ yield
+
+ @pytest.fixture()
+ def push_simple_json_xcom(self, session):
+ def func(*, dag_run: DagRun, task_id: str, key: str, value):
+ return XCom.set(
+ key=key,
+ value=value,
+ dag_id=dag_run.dag_id,
+ task_id=task_id,
+ run_id=dag_run.run_id,
+ session=session,
+ )
+
+ return func
+
+ @pytest.fixture()
+ def setup_for_xcom_get_one(self, dag_run, push_simple_json_xcom):
+ push_simple_json_xcom(dag_run=dag_run, task_id="task_id_1",
key="xcom_1", value={"key": "value"})
+
+ @pytest.mark.usefixtures("setup_for_xcom_get_one")
+ def test_xcom_get_one(self, session, dag_run):
+ stored_value = XCom.get_one(
+ key="xcom_1",
+ dag_id=dag_run.dag_id,
+ task_id="task_id_1",
+ run_id=dag_run.run_id,
+ session=session,
+ )
+ assert stored_value == {"key": "value"}
+
+ @pytest.mark.usefixtures("setup_for_xcom_get_one")
+ def test_xcom_get_one_with_execution_date(self, session, dag_run):
+ with pytest.deprecated_call():
+ stored_value = XCom.get_one(
+ key="xcom_1",
+ dag_id=dag_run.dag_id,
+ task_id="task_id_1",
+ execution_date=dag_run.logical_date,
+ session=session,
+ )
+ assert stored_value == {"key": "value"}
+
+ @pytest.fixture()
+ def dag_runs_for_xcom_get_one_from_prior_date(self, dag_run_factory,
push_simple_json_xcom):
+ date1 = timezone.datetime(2021, 12, 3, 4, 56)
+ dr1 = dag_run_factory(dag_id="dag", execution_date=date1)
+ dr2 = dag_run_factory(dag_id="dag", execution_date=date1 +
datetime.timedelta(days=1))
+
+ # The earlier run pushes an XCom, but not the later run, but the later
+ # run can get this earlier XCom with ``include_prior_dates``.
+ push_simple_json_xcom(dag_run=dr1, task_id="task_1", key="xcom_1",
value={"key": "value"})
+
+ return dr1, dr2
+
+ def test_xcom_get_one_from_prior_date(self, session,
dag_runs_for_xcom_get_one_from_prior_date):
+ _, dr2 = dag_runs_for_xcom_get_one_from_prior_date
+ retrieved_value = XCom.get_one(
+ run_id=dr2.run_id,
+ key="xcom_1",
+ task_id="task_1",
+ dag_id="dag",
+ include_prior_dates=True,
+ session=session,
+ )
+ assert retrieved_value == {"key": "value"}
+
+ def test_xcom_get_one_from_prior_with_execution_date(
+ self,
+ session,
+ dag_runs_for_xcom_get_one_from_prior_date,
+ ):
+ _, dr2 = dag_runs_for_xcom_get_one_from_prior_date
+ with pytest.deprecated_call():
+ retrieved_value = XCom.get_one(
+ execution_date=dr2.execution_date,
+ key="xcom_1",
+ task_id="task_1",
+ dag_id="dag",
+ include_prior_dates=True,
+ session=session,
+ )
+ assert retrieved_value == {"key": "value"}
+
+ @pytest.fixture()
+ def setup_for_xcom_get_many_single_argument_value(self, dag_run,
push_simple_json_xcom):
+ push_simple_json_xcom(dag_run=dag_run, task_id="task_id_1",
key="xcom_1", value={"key": "value"})
+
+ @pytest.mark.usefixtures("setup_for_xcom_get_many_single_argument_value")
+ def test_xcom_get_many_single_argument_value(self, session, dag_run):
+ stored_xcoms = XCom.get_many(
+ key="xcom_1",
+ dag_ids=dag_run.dag_id,
+ task_ids="task_id_1",
+ run_id=dag_run.run_id,
+ session=session,
+ ).all()
+ assert len(stored_xcoms) == 1
+ assert stored_xcoms[0].key == "xcom_1"
+ assert stored_xcoms[0].value == {"key": "value"}
+
+ @pytest.mark.usefixtures("setup_for_xcom_get_many_single_argument_value")
+ def test_xcom_get_many_single_argument_value_with_execution_date(self,
session, dag_run):
+ with pytest.deprecated_call():
+ stored_xcoms = XCom.get_many(
+ execution_date=dag_run.logical_date,
+ key="xcom_1",
+ dag_ids=dag_run.dag_id,
+ task_ids="task_id_1",
+ session=session,
+ ).all()
+ assert len(stored_xcoms) == 1
+ assert stored_xcoms[0].key == "xcom_1"
+ assert stored_xcoms[0].value == {"key": "value"}
+
+ @pytest.fixture()
+ def setup_for_xcom_get_many_multiple_tasks(self, dag_run,
push_simple_json_xcom):
+ push_simple_json_xcom(dag_run=dag_run, key="xcom_1", value={"key1":
"value1"}, task_id="task_id_1")
+ push_simple_json_xcom(dag_run=dag_run, key="xcom_1", value={"key2":
"value2"}, task_id="task_id_2")
+
+ @pytest.mark.usefixtures("setup_for_xcom_get_many_multiple_tasks")
+ def test_xcom_get_many_multiple_tasks(self, session, dag_run):
+ stored_xcoms = XCom.get_many(
+ key="xcom_1",
+ dag_ids=dag_run.dag_id,
+ task_ids=["task_id_1", "task_id_2"],
+ run_id=dag_run.run_id,
+ session=session,
+ )
+ sorted_values = [x.value for x in sorted(stored_xcoms,
key=operator.attrgetter("task_id"))]
+ assert sorted_values == [{"key1": "value1"}, {"key2": "value2"}]
+
+ @pytest.mark.usefixtures("setup_for_xcom_get_many_multiple_tasks")
+ def test_xcom_get_many_multiple_tasks_with_execution_date(self, session,
dag_run):
+ with pytest.deprecated_call():
+ stored_xcoms = XCom.get_many(
+ execution_date=dag_run.logical_date,
+ key="xcom_1",
+ dag_ids=dag_run.dag_id,
+ task_ids=["task_id_1", "task_id_2"],
+ session=session,
+ )
+ sorted_values = [x.value for x in sorted(stored_xcoms,
key=operator.attrgetter("task_id"))]
+ assert sorted_values == [{"key1": "value1"}, {"key2": "value2"}]
+
+ @pytest.fixture()
+ def dag_runs_for_xcom_get_many_from_prior_dates(self, dag_run_factory,
push_simple_json_xcom):
+ date1 = timezone.datetime(2021, 12, 3, 4, 56)
+ date2 = date1 + datetime.timedelta(days=1)
+ dr1 = dag_run_factory(dag_id="dag", execution_date=date1)
+ dr2 = dag_run_factory(dag_id="dag", execution_date=date2)
+ push_simple_json_xcom(dag_run=dr1, task_id="task_1", key="xcom_1",
value={"key1": "value1"})
+ push_simple_json_xcom(dag_run=dr2, task_id="task_1", key="xcom_1",
value={"key2": "value2"})
+ return dr1, dr2
+
+ def test_xcom_get_many_from_prior_dates(self, session,
dag_runs_for_xcom_get_many_from_prior_dates):
+ dr1, dr2 = dag_runs_for_xcom_get_many_from_prior_dates
+ stored_xcoms = XCom.get_many(
+ run_id=dr2.run_id,
+ key="xcom_1",
+ dag_ids="dag",
+ task_ids="task_1",
+ include_prior_dates=True,
+ session=session,
+ )
+
+ # The retrieved XComs should be ordered by logical date, latest first.
+ assert [x.value for x in stored_xcoms] == [{"key2": "value2"},
{"key1": "value1"}]
+ assert [x.execution_date for x in stored_xcoms] == [dr2.logical_date,
dr1.logical_date]
+
+ def test_xcom_get_many_from_prior_dates_with_execution_date(
+ self,
+ session,
+ dag_runs_for_xcom_get_many_from_prior_dates,
+ ):
+ dr1, dr2 = dag_runs_for_xcom_get_many_from_prior_dates
+ with pytest.deprecated_call():
+ stored_xcoms = XCom.get_many(
+ execution_date=dr2.execution_date,
+ key="xcom_1",
+ dag_ids="dag",
+ task_ids="task_1",
+ include_prior_dates=True,
+ session=session,
+ )
+
+ # The retrieved XComs should be ordered by logical date, latest first.
+ assert [x.value for x in stored_xcoms] == [{"key2": "value2"},
{"key1": "value1"}]
+ assert [x.execution_date for x in stored_xcoms] == [dr2.logical_date,
dr1.logical_date]
- value = XCom.get_one(dag_id=dag_id, task_id=task_id,
execution_date=execution_date, session=session)
- assert value == json_obj
+# TODO: Tests for set and clear (both run_id and execution_date).
Review comment:
Still not finished
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]