amoghrajesh commented on code in PR #53514:
URL: https://github.com/apache/airflow/pull/53514#discussion_r2244987233


##########
airflow-core/tests/unit/jobs/test_triggerer_job.py:
##########
@@ -687,8 +712,8 @@ def handle_events(self):
 
 @pytest.mark.asyncio
 @pytest.mark.execution_timeout(20)
-async def test_trigger_can_access_variables_connections_and_xcoms(session, 
dag_maker):
-    """Checks that the trigger will successfully access Variables, Connections 
and XComs."""
+async def 
test_trigger_can_call_variables_connections_and_xcoms_methods(session, 
dag_maker):

Review Comment:
   Your tests are passing locally because you are running this particular test 
in total isolation. If you instead run it in the suite, the tests below this 
one will fail and that's because:
   
   - Query objects for this test are bound to main thread session
   - But executed in different thread context
   - Sessions got stuck in weird states
   - Subsequent tests couldn't properly query the database → timeouts
   
   Hence I would propose that you send instead of querying the DB in this test, 
you embed the relevant details in the trigger event and validate that in the 
test as done currently.
   
   For example, refer to this diff:
   ```
   diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py 
b/airflow-core/tests/unit/jobs/test_triggerer_job.py
   index 4457c11eb0..31b1cea2f7 100644
   --- a/airflow-core/tests/unit/jobs/test_triggerer_job.py
   +++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py
   @@ -651,12 +651,6 @@ class CustomTrigger(BaseTrigger):
            variable = await sync_to_async(Variable.get)("test_get_variable")
            self.log.info("Loaded variable %s", variable)
    
   -        await sync_to_async(Variable.set)(key="test_set_variable", 
value="set_value")
   -        self.log.info("Set variable with key test_set_variable")
   -
   -        await sync_to_async(Variable.delete)("test_delete_variable")
   -        self.log.info("Deleted variable with key test_delete_variable")
   -
            xcom = await sync_to_async(XCom.get_one)(
                key="test_xcom",
                dag_id=self.dag_id,
   @@ -666,16 +660,24 @@ class CustomTrigger(BaseTrigger):
            )
            self.log.info("Loaded XCom %s", xcom)
    
   +        set_variable_value = "set_value"
   +        await sync_to_async(Variable.set)(key="test_set_variable", 
value=set_variable_value)
   +        self.log.info("Set variable with key test_set_variable")
   +
   +        set_xcom_value = "set_xcom"
            await sync_to_async(XCom.set)(
                key="test_set_xcom",
                dag_id=self.dag_id,
                run_id=self.run_id,
                task_id=self.task_id,
                map_index=self.map_index,
   -            value="set_xcom",
   +            value=set_xcom_value,
            )
            self.log.info("Set xcom with key test_set_xcom")
    
   +        await sync_to_async(Variable.delete)("test_delete_variable")
   +        self.log.info("Deleted variable with key test_delete_variable")
   +
            await sync_to_async(XCom.delete)(
                key="test_delete_xcom",
                dag_id=self.dag_id,
   @@ -685,7 +687,11 @@ class CustomTrigger(BaseTrigger):
            )
            self.log.info("Delete xcom with key test_delete_xcom")
    
   -        yield TriggerEvent({"connection": attrs.asdict(conn), "variable": 
variable, "xcom": xcom})
   +        yield TriggerEvent({
   +            "connection": attrs.asdict(conn),
   +            "variable": {"get_variable": variable, "set_variable": 
set_variable_value},
   +            "xcom": {"get_xcom": xcom, "set_xcom": set_xcom_value},
   +        })
    
        def serialize(self) -> tuple[str, dict[str, Any]]:
            return (
   @@ -780,50 +786,21 @@ async def 
test_trigger_can_call_variables_connections_and_xcoms_methods(session,
        task_instance.refresh_from_db()
        assert task_instance.state == TaskInstanceState.SCHEDULED
        assert task_instance.next_method != "__fail__"
   -    assert task_instance.next_kwargs == {
   -        "event": {
   -            "connection": {
   -                "conn_id": "test_connection",
   -                "conn_type": "http",
   -                "description": None,
   -                "host": "example.com",
   -                "schema": "https",
   -                "login": "user",
   -                "password": "pass",
   -                "port": 443,
   -                "extra": '{"key": "value"}',
   -            },
   -            "variable": "some_variable_value",
   -            "xcom": '"some_xcom_value"',
   -        }
   -    }
   -    variable_set_val = await 
sync_to_async(Variable.get)("test_set_variable")
   -    assert variable_set_val == "set_value"
   -
   -    variable_delete_val = await 
sync_to_async(Variable.get)(key="test_delete_variable", default_var=None)
   -    assert variable_delete_val is None
    
   -    xcom_set_query = await sync_to_async(XComModel.get_many)(
   -        key="test_set_xcom",
   -        dag_ids=dr.dag_id,
   -        run_id=dr.run_id,
   -        task_ids=task_instance.task_id,
   -        map_indexes=-1,
   -        session=session,
   -    )
   -    xcom_set_model = xcom_set_query.first()
   -    assert xcom_set_model.value == "set_xcom"
   -
   -    xcom_delete_query = await sync_to_async(XComModel.get_many)(
   -        key="test_delete_xcom",
   -        dag_ids=dr.dag_id,
   -        run_id=dr.run_id,
   -        task_ids=task_instance.task_id,
   -        map_indexes=-1,
   -        session=session,
   -    )
   -    xcom_delete_model = xcom_delete_query.first()
   -    assert xcom_delete_model is None
   +    expected_event = {'event': {'connection': {'conn_id': 'test_connection',
   +                          'conn_type': 'http',
   +                          'description': None,
   +                          'extra': '{"key": "value"}',
   +                          'host': 'example.com',
   +                          'login': 'user',
   +                          'password': 'pass',
   +                          'port': 443,
   +                          'schema': 'https'},
   +           'variable': {'get_variable': 'some_variable_value',
   +                        'set_variable': 'set_value'},
   +           'xcom': {'get_xcom': '"some_xcom_value"', 'set_xcom': 
'set_xcom'}}}
   +
   +    assert task_instance.next_kwargs == expected_event
    
    
    class CustomTriggerDagRun(BaseTrigger):
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to