GitHub user brki closed the discussion with a comment: Is it possible to use a 
Connection stored in the metadata DB from a python shell

Thanks @potiuk .

The DAGs do run on the workers, and they do deal with open connections to DBs. 
I hear what you are saying though, and think it's a good idea to have a feature 
request for an interactive shell.

For info, here is some workaround code that I'm currently using. I then call 
`cli_engine("my_conn_id")` to get a working sqlalchemy engine for that 
connection:

```
from logging import getLogger

from airflow.models.connection import Connection as AirflowConnection
from airflow.providers.common.sql.hooks.sql import DbApiHook
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.settings import Session
from sqlalchemy.engine import Connection, Engine

log = getLogger(__name__)


def cli_get_connection(conn_id: str) -> Connection:
    """
    In Airflow 3, when using a quick one-off script or interactive terminal,
    it does not load user-defined connections from the DB if you try to do
    something like:
        hook = MySqlHook(mysql_conn_id='my_conn_id')
        # it will raise an error that the connection 'my_conn_id' is not found
    This function works around this limitation by manually querying the 
connection
    from the Airflow metadata DB.
    """
    with Session() as session:
        conn = (
            session
            .query(AirflowConnection)
            .filter(AirflowConnection.conn_id == conn_id)
            .first()
        )
    if not conn:
        log.warning(f"Connection with id '{conn_id}' not found in Airflow 
metadata DB")
        # Fall back to try getting one with Connection.get(), which will find
        # a connection if it defined in an environment variable.
        conn = AirflowConnection.get(conn_id)
    return conn


def cli_hook(conn_id: str, hook_class=MySqlHook) -> DbApiHook:
    conn = cli_get_connection(conn_id)
    return hook_class(connection=conn)


def cli_engine(conn_id: str, hook_class=MySqlHook) -> Engine:
    return cli_hook(conn_id, hook_class).get_sqlalchemy_engine()
```

GitHub link: 
https://github.com/apache/airflow/discussions/60334#discussioncomment-15507021

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to