JDarDagran commented on code in PR #31398:
URL: https://github.com/apache/airflow/pull/31398#discussion_r1205439292


##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -290,6 +290,32 @@ def prepare_template(self) -> None:
         if isinstance(self.parameters, str):
             self.parameters = ast.literal_eval(self.parameters)
 
+    def get_openlineage_facets_on_start(self):
+        try:
+            from airflow.providers.openlineage.extractors import 
OperatorLineage
+            from airflow.providers.openlineage.utils.sqlparser import SQLParser
+        except ImportError:
+            return None
+
+        hook: DbApiHook = self.get_db_hook()
+
+        connection = hook.get_connection(getattr(hook, cast(str, 
hook.conn_name_attr)))
+        try:
+            database_info = hook.get_database_info(connection)
+        except AttributeError:
+            self.log.debug("%s has no database info provided", hook)
+            return None
+
+        sql_parser = SQLParser(
+            dialect=hook.get_database_dialect(connection), 
default_schema=hook.get_default_schema()
+        )
+
+        operator_lineage: OperatorLineage = 
sql_parser.generate_openlineage_metadata_from_sql(
+            sql=self.sql, hook=hook, database_info=database_info, 
database=self.database

Review Comment:
   Done.



##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -290,6 +290,32 @@ def prepare_template(self) -> None:
         if isinstance(self.parameters, str):
             self.parameters = ast.literal_eval(self.parameters)
 
+    def get_openlineage_facets_on_start(self):
+        try:
+            from airflow.providers.openlineage.extractors import 
OperatorLineage
+            from airflow.providers.openlineage.utils.sqlparser import SQLParser
+        except ImportError:
+            return None
+
+        hook: DbApiHook = self.get_db_hook()
+
+        connection = hook.get_connection(getattr(hook, cast(str, 
hook.conn_name_attr)))
+        try:
+            database_info = hook.get_database_info(connection)
+        except AttributeError:
+            self.log.debug("%s has no database info provided", hook)
+            return None
+
+        sql_parser = SQLParser(
+            dialect=hook.get_database_dialect(connection), 
default_schema=hook.get_default_schema()

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to