Artuz37 commented on code in PR #41422:
URL: https://github.com/apache/airflow/pull/41422#discussion_r1716929554
##########
airflow/providers/amazon/aws/transfers/s3_to_redshift.py:
##########
@@ -197,3 +199,77 @@ def execute(self, context: Context) -> None:
else:
redshift_hook.run(sql, autocommit=self.autocommit)
self.log.info("COPY command complete...")
+
+ def get_openlineage_facets_on_complete(self, task_instance):
+ """Implement on_complete as we will query destination table."""
+ from pathlib import Path
+
+ from airflow.providers.amazon.aws.utils.open_lineage import (
+ get_facets_from_redshift_table,
+ get_identity_column_lineage_facet,
+ )
+ from airflow.providers.common.compat.openlineage.facet import (
+ Dataset,
+ Identifier,
+ LifecycleStateChange,
+ LifecycleStateChangeDatasetFacet,
+ SymlinksDatasetFacet,
+ )
+ from airflow.providers.openlineage.extractors import OperatorLineage
+
+ redshift_hook = self._create_hook()
+ if isinstance(redshift_hook, RedshiftDataHook):
Review Comment:
That's a nice topic. In current state of the operator, used hook can be
RedshiftDataHook or RedshiftSQLHook, which at the core are two completely
different objects. `isinstance` isn't the problem itself, however it is a
result of violated polymorphism. If we want numpy to pass, `redshift_hook` type
needs to be checked before called.
My proposal in this case is to introduce property you suggested, and based
on that, create `redshift_data_hook` or `redshift_sql_hook`. This way, we do
not risk trying to call some method of RedshiftSQLHook on RedshiftDataHook and
vice versa. I pushed commit with this change, let me know what you think
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]