sunank200 commented on code in PR #31350:
URL: https://github.com/apache/airflow/pull/31350#discussion_r1197863721


##########
airflow/providers/google/cloud/transfers/gcs_to_gcs.py:
##########
@@ -464,7 +466,37 @@ def _copy_single_object(self, hook, source_object, 
destination_object):
             destination_object,
         )
 
+        self.resolved_source_objects.add(source_object)
+        if not destination_object:
+            self.resolved_target_objects.add(source_object)
+        else:
+            self.resolved_target_objects.add(destination_object)
+
         hook.rewrite(self.source_bucket, source_object, 
self.destination_bucket, destination_object)
 
         if self.move_object:
             hook.delete(self.source_bucket, source_object)
+
+    def get_openlineage_events_on_complete(self, task_instance):
+        """
+        Implementing _on_complete because execute method does preprocessing on 
internals.
+        This means we won't have to normalize self.source_object and 
self.source_objects,
+        destination bucket and so on.
+        """
+        try:
+            from openlineage.client.run import Dataset
+
+            from airflow.providers.openlineage.extractors import 
OperatorLineage
+        except ImportError:
+            return None

Review Comment:
   I think we should raise this error if the open lineage is enabled. WDYT?



##########
airflow/providers/google/cloud/transfers/gcs_to_gcs.py:
##########
@@ -464,7 +466,37 @@ def _copy_single_object(self, hook, source_object, 
destination_object):
             destination_object,
         )
 
+        self.resolved_source_objects.add(source_object)
+        if not destination_object:
+            self.resolved_target_objects.add(source_object)
+        else:
+            self.resolved_target_objects.add(destination_object)
+
         hook.rewrite(self.source_bucket, source_object, 
self.destination_bucket, destination_object)
 
         if self.move_object:
             hook.delete(self.source_bucket, source_object)
+
+    def get_openlineage_events_on_complete(self, task_instance):
+        """
+        Implementing _on_complete because execute method does preprocessing on 
internals.
+        This means we won't have to normalize self.source_object and 
self.source_objects,
+        destination bucket and so on.
+        """
+        try:
+            from openlineage.client.run import Dataset
+
+            from airflow.providers.openlineage.extractors import 
OperatorLineage
+        except ImportError:
+            return None
+
+        return OperatorLineage(
+            inputs=[

Review Comment:
   What about DataSource dataset facet as per 
https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.md#dataset-facets?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to