This is an automated email from the ASF dual-hosted git repository.
taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new bbfa228868 Deprecate get_hook in DataSyncOperator and use hook instead
(#34427)
bbfa228868 is described below
commit bbfa228868dd20a423d5a05f372c05a6b4e512e6
Author: Hussein Awala <[email protected]>
AuthorDate: Mon Sep 18 21:07:21 2023 +0200
Deprecate get_hook in DataSyncOperator and use hook instead (#34427)
* Deprecate get_hook in DataSyncOperator and use hook instead
* use AirflowProviderDeprecationWarning
---
airflow/providers/amazon/aws/operators/datasync.py | 54 +++++++++++-----------
1 file changed, 26 insertions(+), 28 deletions(-)
diff --git a/airflow/providers/amazon/aws/operators/datasync.py
b/airflow/providers/amazon/aws/operators/datasync.py
index 5688e56c27..455b098184 100644
--- a/airflow/providers/amazon/aws/operators/datasync.py
+++ b/airflow/providers/amazon/aws/operators/datasync.py
@@ -19,9 +19,12 @@ from __future__ import annotations
import logging
import random
+from functools import cached_property
from typing import TYPE_CHECKING, Sequence
-from airflow.exceptions import AirflowException, AirflowTaskTimeout
+from deprecated.classic import deprecated
+
+from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning, AirflowTaskTimeout
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.datasync import DataSyncHook
@@ -176,8 +179,6 @@ class DataSyncOperator(BaseOperator):
f"destination_location_uri={destination_location_uri!r}"
)
- # Others
- self.hook: DataSyncHook | None = None
# Candidates - these are found in AWS as possible things
# for us to use
self.candidate_source_location_arns: list[str] | None = None
@@ -188,18 +189,20 @@ class DataSyncOperator(BaseOperator):
self.destination_location_arn: str | None = None
self.task_execution_arn: str | None = None
- def get_hook(self) -> DataSyncHook:
+ @cached_property
+ def hook(self) -> DataSyncHook:
"""Create and return DataSyncHook.
:return DataSyncHook: An DataSyncHook instance.
"""
- if self.hook:
- return self.hook
-
- self.hook = DataSyncHook(
+ return DataSyncHook(
aws_conn_id=self.aws_conn_id,
wait_interval_seconds=self.wait_interval_seconds,
)
+
+ @deprecated(reason="use `hook` property instead.",
category=AirflowProviderDeprecationWarning)
+ def get_hook(self) -> DataSyncHook:
+ """Create and return DataSyncHook."""
return self.hook
def execute(self, context: Context):
@@ -239,8 +242,6 @@ class DataSyncOperator(BaseOperator):
def _get_tasks_and_locations(self) -> None:
"""Find existing DataSync Task based on source and dest Locations."""
- hook = self.get_hook()
-
self.candidate_source_location_arns =
self._get_location_arns(self.source_location_uri)
self.candidate_destination_location_arns =
self._get_location_arns(self.destination_location_uri)
@@ -254,7 +255,7 @@ class DataSyncOperator(BaseOperator):
return
self.log.info("Finding DataSync TaskArns that have these LocationArns")
- self.candidate_task_arns = hook.get_task_arns_for_location_arns(
+ self.candidate_task_arns = self.hook.get_task_arns_for_location_arns(
self.candidate_source_location_arns,
self.candidate_destination_location_arns,
)
@@ -290,12 +291,10 @@ class DataSyncOperator(BaseOperator):
def _create_datasync_task(self) -> None:
"""Create a AWS DataSyncTask."""
- hook = self.get_hook()
-
self.source_location_arn =
self.choose_location(self.candidate_source_location_arns)
if not self.source_location_arn and self.source_location_uri and
self.create_source_location_kwargs:
self.log.info("Attempting to create source Location")
- self.source_location_arn = hook.create_location(
+ self.source_location_arn = self.hook.create_location(
self.source_location_uri, **self.create_source_location_kwargs
)
if not self.source_location_arn:
@@ -310,7 +309,7 @@ class DataSyncOperator(BaseOperator):
and self.create_destination_location_kwargs
):
self.log.info("Attempting to create destination Location")
- self.destination_location_arn = hook.create_location(
+ self.destination_location_arn = self.hook.create_location(
self.destination_location_uri,
**self.create_destination_location_kwargs
)
if not self.destination_location_arn:
@@ -319,7 +318,7 @@ class DataSyncOperator(BaseOperator):
)
self.log.info("Creating a Task.")
- self.task_arn = hook.create_task(
+ self.task_arn = self.hook.create_task(
self.source_location_arn, self.destination_location_arn,
**self.create_task_kwargs
)
if not self.task_arn:
@@ -331,9 +330,8 @@ class DataSyncOperator(BaseOperator):
if not self.task_arn:
return
- hook = self.get_hook()
self.log.info("Updating TaskArn %s", self.task_arn)
- hook.update_task(self.task_arn, **self.update_task_kwargs)
+ self.hook.update_task(self.task_arn, **self.update_task_kwargs)
self.log.info("Updated TaskArn %s", self.task_arn)
def _execute_datasync_task(self) -> None:
@@ -341,11 +339,9 @@ class DataSyncOperator(BaseOperator):
if not self.task_arn:
raise AirflowException("Missing TaskArn")
- hook = self.get_hook()
-
# Create a task execution:
self.log.info("Starting execution for TaskArn %s", self.task_arn)
- self.task_execution_arn = hook.start_task_execution(self.task_arn,
**self.task_execution_kwargs)
+ self.task_execution_arn =
self.hook.start_task_execution(self.task_arn, **self.task_execution_kwargs)
self.log.info("Started TaskExecutionArn %s", self.task_execution_arn)
if not self.wait_for_completion:
@@ -354,14 +350,18 @@ class DataSyncOperator(BaseOperator):
# Wait for task execution to complete
self.log.info("Waiting for TaskExecutionArn %s",
self.task_execution_arn)
try:
- result = hook.wait_for_task_execution(self.task_execution_arn,
max_iterations=self.max_iterations)
+ result = self.hook.wait_for_task_execution(
+ self.task_execution_arn, max_iterations=self.max_iterations
+ )
except (AirflowTaskTimeout, AirflowException) as e:
self.log.error("Cancelling TaskExecution after Exception: %s", e)
self._cancel_datasync_task_execution()
raise
self.log.info("Completed TaskExecutionArn %s", self.task_execution_arn)
- task_execution_description =
hook.describe_task_execution(task_execution_arn=self.task_execution_arn)
+ task_execution_description = self.hook.describe_task_execution(
+ task_execution_arn=self.task_execution_arn
+ )
self.log.info("task_execution_description=%s",
task_execution_description)
# Log some meaningful statuses
@@ -377,10 +377,9 @@ class DataSyncOperator(BaseOperator):
def _cancel_datasync_task_execution(self):
"""Cancel the submitted DataSync task."""
- hook = self.get_hook()
if self.task_execution_arn:
self.log.info("Cancelling TaskExecutionArn %s",
self.task_execution_arn)
-
hook.cancel_task_execution(task_execution_arn=self.task_execution_arn)
+
self.hook.cancel_task_execution(task_execution_arn=self.task_execution_arn)
self.log.info("Cancelled TaskExecutionArn %s",
self.task_execution_arn)
def on_kill(self):
@@ -392,13 +391,12 @@ class DataSyncOperator(BaseOperator):
if not self.task_arn:
return
- hook = self.get_hook()
# Delete task:
self.log.info("Deleting Task with TaskArn %s", self.task_arn)
- hook.delete_task(self.task_arn)
+ self.hook.delete_task(self.task_arn)
self.log.info("Task Deleted")
def _get_location_arns(self, location_uri) -> list[str]:
- location_arns = self.get_hook().get_location_arns(location_uri)
+ location_arns = self.hook.get_location_arns(location_uri)
self.log.info("Found LocationArns %s for LocationUri %s",
location_arns, location_uri)
return location_arns