dabla commented on issue #38195:
URL: https://github.com/apache/airflow/issues/38195#issuecomment-2058578313
This is how I done it in my custom SQLRowsInsertOperator:
```
@cached_property
def connection(self) -> Connection:
return BaseHook.get_connection(self.conn_id) # type: ignore
@cached_property
def scheme(self) -> str:
return self.connection.extra_dejson["sqlalchemy_scheme"]
@cached_property
def table_name(self) -> str:
return self.get_columns().table_name_with_schema()
@cached_property
def driver(self) -> str:
return self.connection.extra_dejson["driver"]
# TODO: Maybe this should be moved to DbAPiHook
def get_url(self) -> URL:
self.log.info("Connection schema: %s", self.schema)
self.log.info("Connection scheme: %s", self.scheme)
self.log.info("Connection driver: %s", self.driver)
self.log.info("Connection type: %s", self.connection.conn_type)
self.log.info("Connection login: %s", self.connection.login)
self.log.info("Connection password: %s", self.connection.password)
self.log.info("Connection host: %s", self.connection.host)
self.log.info("Connection port: %s", self.connection.port)
return URL.create(
self.scheme,
username=self.connection.login,
password=self.connection.password,
host=self.connection.host,
port=self.connection.port,
)
# TODO: Maybe this method in DbAPiHook should be changed to this instead
of calling get_uri() method
def get_sqlalchemy_engine(self, engine_kwargs=None):
if engine_kwargs is None:
engine_kwargs = {}
return create_engine(self.get_url(), **engine_kwargs)
# TODO: Maybe this should be moved to DbAPiHook
@cached_property
def inspector(self):
engine = self.get_sqlalchemy_engine()
self.log.debug("Engine drivername: %s", engine.url.drivername)
self.log.debug("Engine username: %s", engine.url.username)
self.log.debug("Engine password: %s", engine.url.password)
self.log.debug("Engine host: %s", engine.url.host)
self.log.debug("Engine port: %s", engine.url.port)
self.log.debug("Engine database: %s", engine.url.database)
self.log.debug("Engine dialect: %s", engine.dialect.name)
return Inspector.from_engine(engine)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]