This is an automated email from the ASF dual-hosted git repository.

taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new bbfa228868 Deprecate get_hook in DataSyncOperator and use hook instead 
(#34427)
bbfa228868 is described below

commit bbfa228868dd20a423d5a05f372c05a6b4e512e6
Author: Hussein Awala <[email protected]>
AuthorDate: Mon Sep 18 21:07:21 2023 +0200

    Deprecate get_hook in DataSyncOperator and use hook instead (#34427)
    
    * Deprecate get_hook in DataSyncOperator and use hook instead
    
    * use AirflowProviderDeprecationWarning
---
 airflow/providers/amazon/aws/operators/datasync.py | 54 +++++++++++-----------
 1 file changed, 26 insertions(+), 28 deletions(-)

diff --git a/airflow/providers/amazon/aws/operators/datasync.py 
b/airflow/providers/amazon/aws/operators/datasync.py
index 5688e56c27..455b098184 100644
--- a/airflow/providers/amazon/aws/operators/datasync.py
+++ b/airflow/providers/amazon/aws/operators/datasync.py
@@ -19,9 +19,12 @@ from __future__ import annotations
 
 import logging
 import random
+from functools import cached_property
 from typing import TYPE_CHECKING, Sequence
 
-from airflow.exceptions import AirflowException, AirflowTaskTimeout
+from deprecated.classic import deprecated
+
+from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning, AirflowTaskTimeout
 from airflow.models import BaseOperator
 from airflow.providers.amazon.aws.hooks.datasync import DataSyncHook
 
@@ -176,8 +179,6 @@ class DataSyncOperator(BaseOperator):
                 f"destination_location_uri={destination_location_uri!r}"
             )
 
-        # Others
-        self.hook: DataSyncHook | None = None
         # Candidates - these are found in AWS as possible things
         # for us to use
         self.candidate_source_location_arns: list[str] | None = None
@@ -188,18 +189,20 @@ class DataSyncOperator(BaseOperator):
         self.destination_location_arn: str | None = None
         self.task_execution_arn: str | None = None
 
-    def get_hook(self) -> DataSyncHook:
+    @cached_property
+    def hook(self) -> DataSyncHook:
         """Create and return DataSyncHook.
 
         :return DataSyncHook: An DataSyncHook instance.
         """
-        if self.hook:
-            return self.hook
-
-        self.hook = DataSyncHook(
+        return DataSyncHook(
             aws_conn_id=self.aws_conn_id,
             wait_interval_seconds=self.wait_interval_seconds,
         )
+
+    @deprecated(reason="use `hook` property instead.", 
category=AirflowProviderDeprecationWarning)
+    def get_hook(self) -> DataSyncHook:
+        """Create and return DataSyncHook."""
         return self.hook
 
     def execute(self, context: Context):
@@ -239,8 +242,6 @@ class DataSyncOperator(BaseOperator):
 
     def _get_tasks_and_locations(self) -> None:
         """Find existing DataSync Task based on source and dest Locations."""
-        hook = self.get_hook()
-
         self.candidate_source_location_arns = 
self._get_location_arns(self.source_location_uri)
 
         self.candidate_destination_location_arns = 
self._get_location_arns(self.destination_location_uri)
@@ -254,7 +255,7 @@ class DataSyncOperator(BaseOperator):
             return
 
         self.log.info("Finding DataSync TaskArns that have these LocationArns")
-        self.candidate_task_arns = hook.get_task_arns_for_location_arns(
+        self.candidate_task_arns = self.hook.get_task_arns_for_location_arns(
             self.candidate_source_location_arns,
             self.candidate_destination_location_arns,
         )
@@ -290,12 +291,10 @@ class DataSyncOperator(BaseOperator):
 
     def _create_datasync_task(self) -> None:
         """Create a AWS DataSyncTask."""
-        hook = self.get_hook()
-
         self.source_location_arn = 
self.choose_location(self.candidate_source_location_arns)
         if not self.source_location_arn and self.source_location_uri and 
self.create_source_location_kwargs:
             self.log.info("Attempting to create source Location")
-            self.source_location_arn = hook.create_location(
+            self.source_location_arn = self.hook.create_location(
                 self.source_location_uri, **self.create_source_location_kwargs
             )
         if not self.source_location_arn:
@@ -310,7 +309,7 @@ class DataSyncOperator(BaseOperator):
             and self.create_destination_location_kwargs
         ):
             self.log.info("Attempting to create destination Location")
-            self.destination_location_arn = hook.create_location(
+            self.destination_location_arn = self.hook.create_location(
                 self.destination_location_uri, 
**self.create_destination_location_kwargs
             )
         if not self.destination_location_arn:
@@ -319,7 +318,7 @@ class DataSyncOperator(BaseOperator):
             )
 
         self.log.info("Creating a Task.")
-        self.task_arn = hook.create_task(
+        self.task_arn = self.hook.create_task(
             self.source_location_arn, self.destination_location_arn, 
**self.create_task_kwargs
         )
         if not self.task_arn:
@@ -331,9 +330,8 @@ class DataSyncOperator(BaseOperator):
         if not self.task_arn:
             return
 
-        hook = self.get_hook()
         self.log.info("Updating TaskArn %s", self.task_arn)
-        hook.update_task(self.task_arn, **self.update_task_kwargs)
+        self.hook.update_task(self.task_arn, **self.update_task_kwargs)
         self.log.info("Updated TaskArn %s", self.task_arn)
 
     def _execute_datasync_task(self) -> None:
@@ -341,11 +339,9 @@ class DataSyncOperator(BaseOperator):
         if not self.task_arn:
             raise AirflowException("Missing TaskArn")
 
-        hook = self.get_hook()
-
         # Create a task execution:
         self.log.info("Starting execution for TaskArn %s", self.task_arn)
-        self.task_execution_arn = hook.start_task_execution(self.task_arn, 
**self.task_execution_kwargs)
+        self.task_execution_arn = 
self.hook.start_task_execution(self.task_arn, **self.task_execution_kwargs)
         self.log.info("Started TaskExecutionArn %s", self.task_execution_arn)
 
         if not self.wait_for_completion:
@@ -354,14 +350,18 @@ class DataSyncOperator(BaseOperator):
         # Wait for task execution to complete
         self.log.info("Waiting for TaskExecutionArn %s", 
self.task_execution_arn)
         try:
-            result = hook.wait_for_task_execution(self.task_execution_arn, 
max_iterations=self.max_iterations)
+            result = self.hook.wait_for_task_execution(
+                self.task_execution_arn, max_iterations=self.max_iterations
+            )
         except (AirflowTaskTimeout, AirflowException) as e:
             self.log.error("Cancelling TaskExecution after Exception: %s", e)
             self._cancel_datasync_task_execution()
             raise
         self.log.info("Completed TaskExecutionArn %s", self.task_execution_arn)
 
-        task_execution_description = 
hook.describe_task_execution(task_execution_arn=self.task_execution_arn)
+        task_execution_description = self.hook.describe_task_execution(
+            task_execution_arn=self.task_execution_arn
+        )
         self.log.info("task_execution_description=%s", 
task_execution_description)
 
         # Log some meaningful statuses
@@ -377,10 +377,9 @@ class DataSyncOperator(BaseOperator):
 
     def _cancel_datasync_task_execution(self):
         """Cancel the submitted DataSync task."""
-        hook = self.get_hook()
         if self.task_execution_arn:
             self.log.info("Cancelling TaskExecutionArn %s", 
self.task_execution_arn)
-            
hook.cancel_task_execution(task_execution_arn=self.task_execution_arn)
+            
self.hook.cancel_task_execution(task_execution_arn=self.task_execution_arn)
             self.log.info("Cancelled TaskExecutionArn %s", 
self.task_execution_arn)
 
     def on_kill(self):
@@ -392,13 +391,12 @@ class DataSyncOperator(BaseOperator):
         if not self.task_arn:
             return
 
-        hook = self.get_hook()
         # Delete task:
         self.log.info("Deleting Task with TaskArn %s", self.task_arn)
-        hook.delete_task(self.task_arn)
+        self.hook.delete_task(self.task_arn)
         self.log.info("Task Deleted")
 
     def _get_location_arns(self, location_uri) -> list[str]:
-        location_arns = self.get_hook().get_location_arns(location_uri)
+        location_arns = self.hook.get_location_arns(location_uri)
         self.log.info("Found LocationArns %s for LocationUri %s", 
location_arns, location_uri)
         return location_arns

Reply via email to