sunank200 commented on code in PR #31696:
URL: https://github.com/apache/airflow/pull/31696#discussion_r1228052921
##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -290,6 +290,51 @@ def prepare_template(self) -> None:
if isinstance(self.parameters, str):
self.parameters = ast.literal_eval(self.parameters)
+ def get_openlineage_facets_on_start(self):
+ try:
+ from airflow.providers.openlineage.extractors import
OperatorLineage
+ from airflow.providers.openlineage.sqlparser import SQLParser
+ except ImportError:
+ return None
+
+ hook: DbApiHook = self.get_db_hook()
Review Comment:
why are we using separate hooks? Should be not use the existing hook for
this operator?
##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -516,3 +517,40 @@ def test_connection(self):
message = str(e)
return status, message
+
+ def get_database_info(self, connection):
+ from airflow.providers.openlineage.sqlparser import DatabaseInfo
+
+ return DatabaseInfo(
+ scheme=self.get_database_dialect(connection),
authority=self._get_authority(connection)
+ )
+
+ def get_database_dialect(self, connection):
+ """Method used for SQL parsing. Naively tries to use Connection's
conn_type"""
+ return connection.conn_type
+
+ def _get_authority(self, connection):
+ """Returns authority part (without user info) of OpenLineage namespace
URI."""
+ if connection.port and connection.host:
+ authority = f"{connection.host}:{connection.port}"
+ else:
+ parsed = urlparse(connection.get_uri())
+ authority = f"{parsed.hostname}:{parsed.port}"
+ return authority
+
+ def get_default_schema(self):
Review Comment:
have we tested this?
##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -516,3 +517,40 @@ def test_connection(self):
message = str(e)
return status, message
+
+ def get_database_info(self, connection):
+ from airflow.providers.openlineage.sqlparser import DatabaseInfo
+
+ return DatabaseInfo(
+ scheme=self.get_database_dialect(connection),
authority=self._get_authority(connection)
+ )
+
+ def get_database_dialect(self, connection):
+ """Method used for SQL parsing. Naively tries to use Connection's
conn_type"""
+ return connection.conn_type
+
+ def _get_authority(self, connection):
+ """Returns authority part (without user info) of OpenLineage namespace
URI."""
+ if connection.port and connection.host:
+ authority = f"{connection.host}:{connection.port}"
+ else:
+ parsed = urlparse(connection.get_uri())
+ authority = f"{parsed.hostname}:{parsed.port}"
+ return authority
+
+ def get_default_schema(self):
+ """
+ Returns default schema specific to database.
+ See: :class:`~providers.openlineage.utils.sqlparser.SQLParser`
+ """
+ return self.__schema or "public"
+
+ def get_database_specific_lineage(self, task_instance):
Review Comment:
isn't there specific lineage in case of snowflake - eg. query run time
##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -290,6 +290,51 @@ def prepare_template(self) -> None:
if isinstance(self.parameters, str):
self.parameters = ast.literal_eval(self.parameters)
+ def get_openlineage_facets_on_start(self):
+ try:
+ from airflow.providers.openlineage.extractors import
OperatorLineage
+ from airflow.providers.openlineage.sqlparser import SQLParser
+ except ImportError:
+ return None
+
+ hook: DbApiHook = self.get_db_hook()
+
+ connection = hook.get_connection(getattr(hook, cast(str,
hook.conn_name_attr)))
+ try:
+ database_info = hook.get_database_info(connection)
+ except AttributeError:
+ self.log.debug("%s has no database info provided", hook)
+ return None
+
+ try:
+ sql_parser = SQLParser(
+ dialect=hook.get_database_dialect(connection),
default_schema=hook.get_default_schema()
+ )
+ except AttributeError:
+ self.log.debug("%s failed to get database dialect", hook)
+ return None
+
+ operator_lineage: OperatorLineage =
sql_parser.generate_openlineage_metadata_from_sql(
+ sql=self.sql, hook=hook, database_info=database_info,
database=self.database
+ )
+
+ return operator_lineage
+
+ def get_openlineage_facets_on_complete(self, task_instance):
+ operator_lineage = self.get_openlineage_facets_on_start()
+ try:
+ from airflow.providers.openlineage.extractors import
OperatorLineage
+ except ImportError:
+ return operator_lineage
+
+ hook: DbApiHook = self.get_db_hook()
+ try:
+ database_specific_lineage: OperatorLineage =
hook.get_database_specific_lineage(task_instance)
+ except AttributeError:
+ return operator_lineage
+
+ return OperatorLineage.merge(operator_lineage,
database_specific_lineage)
Review Comment:
I did not like the approach to merge the open lineage data. Shouldn't this
be done here instead?
##########
airflow/providers/openlineage/extractors/base.py:
##########
@@ -35,6 +37,22 @@ class OperatorLineage:
run_facets: dict[str, BaseFacet] = Factory(dict)
job_facets: dict[str, BaseFacet] = Factory(dict)
+ @staticmethod
+ def merge(base: OperatorLineage, other: OperatorLineage) ->
OperatorLineage:
Review Comment:
How is this method useful? Should the merge logic be part of method which
actually populates the lineage?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]