uranusjr commented on PR #23972:
URL: https://github.com/apache/airflow/pull/23972#issuecomment-1140369634
A pytest fixture can be implemented and used like this:
```python
@pytest.fixture(scope="session")
def ensure_connection(request):
def _ensure_session(conn_id: str, conn_type: str, ...):
session = settings.Session()
try:
Connection.get_connection_from_secrets(conn_id)
except AirflowNotFoundException:
conn = session.add(Connection(conn_id=conn_id,
conn_type=conn_type, ...))
session.commit()
def _delete_session():
session.delete(conn)
session.commit()
request.addfinalizer(_delete_session)
return _ensure_session
@mock.patch("airflow.operators.sql.BaseSQLOperator.get_db_hook")
def test_branch_single_value_with_dag_run(self, mock_get_db_hook,
ensure_connection):
ensure_connection("mysql_default") # Only need to add this line at the
beginning.
```
The fixture implementation should be relatively straightforward, except for
some magic like `request` and `scope`, but you don’t need to worry about them
if you are not interested. (If you are interested, however, [the official
documentation](https://docs.pytest.org/en/stable/explanation/fixtures.html)
should have answers for all your questions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]