amoghrajesh commented on code in PR #53514:
URL: https://github.com/apache/airflow/pull/53514#discussion_r2244987233
##########
airflow-core/tests/unit/jobs/test_triggerer_job.py:
##########
@@ -687,8 +712,8 @@ def handle_events(self):
@pytest.mark.asyncio
@pytest.mark.execution_timeout(20)
-async def test_trigger_can_access_variables_connections_and_xcoms(session,
dag_maker):
- """Checks that the trigger will successfully access Variables, Connections
and XComs."""
+async def
test_trigger_can_call_variables_connections_and_xcoms_methods(session,
dag_maker):
Review Comment:
Your tests are passing locally because you are running this particular test
in total isolation. If you instead run it in the suite, the tests below this
one will fail and that's because:
- Query objects for this test are bound to main thread session
- But executed in different thread context
- Sessions got stuck in weird states
- Subsequent tests couldn't properly query the database → timeouts
Hence I would propose that you send instead of querying the DB in this test,
you embed the relevant details in the trigger event and validate that in the
test as done currently.
For example, refer to this diff:
```
diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py
b/airflow-core/tests/unit/jobs/test_triggerer_job.py
index 4457c11eb0..31b1cea2f7 100644
--- a/airflow-core/tests/unit/jobs/test_triggerer_job.py
+++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py
@@ -651,12 +651,6 @@ class CustomTrigger(BaseTrigger):
variable = await sync_to_async(Variable.get)("test_get_variable")
self.log.info("Loaded variable %s", variable)
- await sync_to_async(Variable.set)(key="test_set_variable",
value="set_value")
- self.log.info("Set variable with key test_set_variable")
-
- await sync_to_async(Variable.delete)("test_delete_variable")
- self.log.info("Deleted variable with key test_delete_variable")
-
xcom = await sync_to_async(XCom.get_one)(
key="test_xcom",
dag_id=self.dag_id,
@@ -666,16 +660,24 @@ class CustomTrigger(BaseTrigger):
)
self.log.info("Loaded XCom %s", xcom)
+ set_variable_value = "set_value"
+ await sync_to_async(Variable.set)(key="test_set_variable",
value=set_variable_value)
+ self.log.info("Set variable with key test_set_variable")
+
+ set_xcom_value = "set_xcom"
await sync_to_async(XCom.set)(
key="test_set_xcom",
dag_id=self.dag_id,
run_id=self.run_id,
task_id=self.task_id,
map_index=self.map_index,
- value="set_xcom",
+ value=set_xcom_value,
)
self.log.info("Set xcom with key test_set_xcom")
+ await sync_to_async(Variable.delete)("test_delete_variable")
+ self.log.info("Deleted variable with key test_delete_variable")
+
await sync_to_async(XCom.delete)(
key="test_delete_xcom",
dag_id=self.dag_id,
@@ -685,7 +687,11 @@ class CustomTrigger(BaseTrigger):
)
self.log.info("Delete xcom with key test_delete_xcom")
- yield TriggerEvent({"connection": attrs.asdict(conn), "variable":
variable, "xcom": xcom})
+ yield TriggerEvent({
+ "connection": attrs.asdict(conn),
+ "variable": {"get_variable": variable, "set_variable":
set_variable_value},
+ "xcom": {"get_xcom": xcom, "set_xcom": set_xcom_value},
+ })
def serialize(self) -> tuple[str, dict[str, Any]]:
return (
@@ -780,50 +786,21 @@ async def
test_trigger_can_call_variables_connections_and_xcoms_methods(session,
task_instance.refresh_from_db()
assert task_instance.state == TaskInstanceState.SCHEDULED
assert task_instance.next_method != "__fail__"
- assert task_instance.next_kwargs == {
- "event": {
- "connection": {
- "conn_id": "test_connection",
- "conn_type": "http",
- "description": None,
- "host": "example.com",
- "schema": "https",
- "login": "user",
- "password": "pass",
- "port": 443,
- "extra": '{"key": "value"}',
- },
- "variable": "some_variable_value",
- "xcom": '"some_xcom_value"',
- }
- }
- variable_set_val = await
sync_to_async(Variable.get)("test_set_variable")
- assert variable_set_val == "set_value"
-
- variable_delete_val = await
sync_to_async(Variable.get)(key="test_delete_variable", default_var=None)
- assert variable_delete_val is None
- xcom_set_query = await sync_to_async(XComModel.get_many)(
- key="test_set_xcom",
- dag_ids=dr.dag_id,
- run_id=dr.run_id,
- task_ids=task_instance.task_id,
- map_indexes=-1,
- session=session,
- )
- xcom_set_model = xcom_set_query.first()
- assert xcom_set_model.value == "set_xcom"
-
- xcom_delete_query = await sync_to_async(XComModel.get_many)(
- key="test_delete_xcom",
- dag_ids=dr.dag_id,
- run_id=dr.run_id,
- task_ids=task_instance.task_id,
- map_indexes=-1,
- session=session,
- )
- xcom_delete_model = xcom_delete_query.first()
- assert xcom_delete_model is None
+ expected_event = {'event': {'connection': {'conn_id': 'test_connection',
+ 'conn_type': 'http',
+ 'description': None,
+ 'extra': '{"key": "value"}',
+ 'host': 'example.com',
+ 'login': 'user',
+ 'password': 'pass',
+ 'port': 443,
+ 'schema': 'https'},
+ 'variable': {'get_variable': 'some_variable_value',
+ 'set_variable': 'set_value'},
+ 'xcom': {'get_xcom': '"some_xcom_value"', 'set_xcom':
'set_xcom'}}}
+
+ assert task_instance.next_kwargs == expected_event
class CustomTriggerDagRun(BaseTrigger):
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]