dabla commented on issue #38195:
URL: https://github.com/apache/airflow/issues/38195#issuecomment-2058578313

   This is how I done it in my custom SQLRowsInsertOperator:
   
   ```
       @cached_property
       def connection(self) -> Connection:
           return BaseHook.get_connection(self.conn_id)  # type: ignore
   
       @cached_property
       def scheme(self) -> str:
           return self.connection.extra_dejson["sqlalchemy_scheme"]
   
       @cached_property
       def table_name(self) -> str:
           return self.get_columns().table_name_with_schema()
   
       @cached_property
       def driver(self) -> str:
           return self.connection.extra_dejson["driver"]
   
       # TODO: Maybe this should be moved to DbAPiHook
       def get_url(self) -> URL:
           self.log.info("Connection schema: %s", self.schema)
           self.log.info("Connection scheme: %s", self.scheme)
           self.log.info("Connection driver: %s", self.driver)
           self.log.info("Connection type: %s", self.connection.conn_type)
           self.log.info("Connection login: %s", self.connection.login)
           self.log.info("Connection password: %s", self.connection.password)
           self.log.info("Connection host: %s", self.connection.host)
           self.log.info("Connection port: %s", self.connection.port)
   
           return URL.create(
               self.scheme,
               username=self.connection.login,
               password=self.connection.password,
               host=self.connection.host,
               port=self.connection.port,
           )
   
      # TODO: Maybe this method in DbAPiHook should be changed to this instead 
of calling get_uri() method
       def get_sqlalchemy_engine(self, engine_kwargs=None):
           if engine_kwargs is None:
               engine_kwargs = {}
           return create_engine(self.get_url(), **engine_kwargs)
   
       # TODO: Maybe this should be moved to DbAPiHook
       @cached_property
       def inspector(self):
           engine = self.get_sqlalchemy_engine()
           self.log.debug("Engine drivername: %s", engine.url.drivername)
           self.log.debug("Engine username: %s", engine.url.username)
           self.log.debug("Engine password: %s", engine.url.password)
           self.log.debug("Engine host: %s", engine.url.host)
           self.log.debug("Engine port: %s", engine.url.port)
           self.log.debug("Engine database: %s", engine.url.database)
           self.log.debug("Engine dialect: %s", engine.dialect.name)
   
           return Inspector.from_engine(engine)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to