This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6d49170640e Adding DataSync links (#46292)
6d49170640e is described below
commit 6d49170640e417440d498829f68361d244a57f92
Author: ellisms <[email protected]>
AuthorDate: Thu Jan 30 23:46:48 2025 -0500
Adding DataSync links (#46292)
---
.../airflow/providers/amazon/aws/links/datasync.py | 37 +++++++++++++++
.../providers/amazon/aws/operators/datasync.py | 42 ++++++++++++++++-
.../src/airflow/providers/amazon/provider.yaml | 2 +
providers/tests/amazon/aws/links/test_datasync.py | 52 ++++++++++++++++++++++
.../tests/amazon/aws/operators/test_datasync.py | 22 +++++++++
5 files changed, 153 insertions(+), 2 deletions(-)
diff --git a/providers/src/airflow/providers/amazon/aws/links/datasync.py
b/providers/src/airflow/providers/amazon/aws/links/datasync.py
new file mode 100644
index 00000000000..f9a643aa99e
--- /dev/null
+++ b/providers/src/airflow/providers/amazon/aws/links/datasync.py
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from airflow.providers.amazon.aws.links.base_aws import BASE_AWS_CONSOLE_LINK,
BaseAwsLink
+
+
+class DataSyncTaskLink(BaseAwsLink):
+ """Helper class for constructing AWS DataSync Task console link."""
+
+ name = "DataSync Task"
+ key = "datasync_task"
+ format_str = BASE_AWS_CONSOLE_LINK +
"/datasync/home?region={region_name}#" + "/tasks/{task_id}"
+
+
+class DataSyncTaskExecutionLink(BaseAwsLink):
+ """Helper class for constructing AWS DataSync TaskExecution console
link."""
+
+ name = "DataSync Task Execution"
+ key = "datasync_task_execution"
+ format_str = (
+ BASE_AWS_CONSOLE_LINK +
"/datasync/home?region={region_name}#/history/{task_id}/{task_execution_id}"
+ )
diff --git a/providers/src/airflow/providers/amazon/aws/operators/datasync.py
b/providers/src/airflow/providers/amazon/aws/operators/datasync.py
index d5c97843f16..7b2b7282efc 100644
--- a/providers/src/airflow/providers/amazon/aws/operators/datasync.py
+++ b/providers/src/airflow/providers/amazon/aws/operators/datasync.py
@@ -25,6 +25,7 @@ from typing import TYPE_CHECKING, Any
from airflow.exceptions import AirflowException, AirflowTaskTimeout
from airflow.providers.amazon.aws.hooks.datasync import DataSyncHook
+from airflow.providers.amazon.aws.links.datasync import
DataSyncTaskExecutionLink, DataSyncTaskLink
from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator
from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
@@ -130,6 +131,8 @@ class DataSyncOperator(AwsBaseOperator[DataSyncHook]):
}
ui_color = "#44b5e2"
+ operator_extra_links = (DataSyncTaskLink(), DataSyncTaskExecutionLink())
+
def __init__(
self,
*,
@@ -215,6 +218,23 @@ class DataSyncOperator(AwsBaseOperator[DataSyncHook]):
if not self.task_arn:
raise AirflowException("DataSync TaskArn could not be identified
or created.")
+ task_id = self.task_arn.split("/")[-1]
+
+ task_url = DataSyncTaskLink.format_str.format(
+
aws_domain=DataSyncTaskLink.get_aws_domain(self.hook.conn_partition),
+ region_name=self.hook.conn_region_name,
+ task_id=task_id,
+ )
+
+ DataSyncTaskLink.persist(
+ context=context,
+ operator=self,
+ region_name=self.hook.conn_region_name,
+ aws_partition=self.hook.conn_partition,
+ task_id=task_id,
+ )
+ self.log.info("You can view this DataSync task at %s", task_url)
+
self.log.info("Using DataSync TaskArn %s", self.task_arn)
# Update the DataSync Task
@@ -222,7 +242,7 @@ class DataSyncOperator(AwsBaseOperator[DataSyncHook]):
self._update_datasync_task()
# Execute the DataSync Task
- self._execute_datasync_task()
+ self._execute_datasync_task(context=context)
if not self.task_execution_arn:
raise AirflowException("Nothing was executed")
@@ -327,7 +347,7 @@ class DataSyncOperator(AwsBaseOperator[DataSyncHook]):
self.hook.update_task(self.task_arn, **self.update_task_kwargs)
self.log.info("Updated TaskArn %s", self.task_arn)
- def _execute_datasync_task(self) -> None:
+ def _execute_datasync_task(self, context: Context) -> None:
"""Create and monitor an AWS DataSync TaskExecution for a Task."""
if not self.task_arn:
raise AirflowException("Missing TaskArn")
@@ -337,6 +357,24 @@ class DataSyncOperator(AwsBaseOperator[DataSyncHook]):
self.task_execution_arn =
self.hook.start_task_execution(self.task_arn, **self.task_execution_kwargs)
self.log.info("Started TaskExecutionArn %s", self.task_execution_arn)
+ # Create the execution extra link
+ execution_url = DataSyncTaskExecutionLink.format_str.format(
+
aws_domain=DataSyncTaskExecutionLink.get_aws_domain(self.hook.conn_partition),
+ region_name=self.hook.conn_region_name,
+ task_id=self.task_arn.split("/")[-1],
+ task_execution_id=self.task_execution_arn.split("/")[-1],
+ )
+ DataSyncTaskExecutionLink.persist(
+ context=context,
+ operator=self,
+ region_name=self.hook.conn_region_name,
+ aws_partition=self.hook.conn_partition,
+ task_id=self.task_arn.split("/")[-1],
+ task_execution_id=self.task_execution_arn.split("/")[-1],
+ )
+
+ self.log.info("You can view this DataSync task execution at %s",
execution_url)
+
if not self.wait_for_completion:
return
diff --git a/providers/src/airflow/providers/amazon/provider.yaml
b/providers/src/airflow/providers/amazon/provider.yaml
index 824c9b08dee..43569a28827 100644
--- a/providers/src/airflow/providers/amazon/provider.yaml
+++ b/providers/src/airflow/providers/amazon/provider.yaml
@@ -889,6 +889,8 @@ extra-links:
-
airflow.providers.amazon.aws.links.step_function.StateMachineExecutionsDetailsLink
-
airflow.providers.amazon.aws.links.comprehend.ComprehendPiiEntitiesDetectionLink
-
airflow.providers.amazon.aws.links.comprehend.ComprehendDocumentClassifierLink
+ - airflow.providers.amazon.aws.links.datasync.DataSyncTaskLink
+ - airflow.providers.amazon.aws.links.datasync.DataSyncTaskExecutionLink
connection-types:
diff --git a/providers/tests/amazon/aws/links/test_datasync.py
b/providers/tests/amazon/aws/links/test_datasync.py
new file mode 100644
index 00000000000..9ff1610ac30
--- /dev/null
+++ b/providers/tests/amazon/aws/links/test_datasync.py
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from airflow.providers.amazon.aws.links.datasync import
DataSyncTaskExecutionLink, DataSyncTaskLink
+
+from providers.tests.amazon.aws.links.test_base_aws import BaseAwsLinksTestCase
+
+TASK_ID = "task-0b36221bf94ad2bdd"
+EXECUTION_ID = "exec-00000000000000004"
+
+
+class TestDataSyncTaskLink(BaseAwsLinksTestCase):
+ link_class = DataSyncTaskLink
+
+ def test_extra_link(self):
+ task_id = TASK_ID
+ self.assert_extra_link_url(
+
expected_url=(f"https://console.aws.amazon.com/datasync/home?region=us-east-1#/tasks/{TASK_ID}"),
+ region_name="us-east-1",
+ aws_partition="aws",
+ task_id=task_id,
+ )
+
+
+class TestDataSyncTaskExecutionLink(BaseAwsLinksTestCase):
+ link_class = DataSyncTaskExecutionLink
+
+ def test_extra_link(self):
+ self.assert_extra_link_url(
+ expected_url=(
+
f"https://console.aws.amazon.com/datasync/home?region=us-east-1#/history/{TASK_ID}/{EXECUTION_ID}"
+ ),
+ region_name="us-east-1",
+ aws_partition="aws",
+ task_id=TASK_ID,
+ task_execution_id=EXECUTION_ID,
+ )
diff --git a/providers/tests/amazon/aws/operators/test_datasync.py
b/providers/tests/amazon/aws/operators/test_datasync.py
index 6b6b64caa13..81c8ea7445d 100644
--- a/providers/tests/amazon/aws/operators/test_datasync.py
+++ b/providers/tests/amazon/aws/operators/test_datasync.py
@@ -25,6 +25,7 @@ from moto import mock_aws
from airflow.exceptions import AirflowException
from airflow.models import DAG, DagRun, TaskInstance
from airflow.providers.amazon.aws.hooks.datasync import DataSyncHook
+from airflow.providers.amazon.aws.links.datasync import DataSyncTaskLink
from airflow.providers.amazon.aws.operators.datasync import DataSyncOperator
from airflow.utils import timezone
from airflow.utils.state import DagRunState
@@ -748,6 +749,27 @@ class TestDataSyncOperator(DataSyncTestCaseBase):
# ### Check mocks:
mock_get_conn.assert_not_called()
+ def test_task_extra_links(self, mock_get_conn):
+ mock_get_conn.return_value = self.client
+ self.set_up_operator()
+
+ region = "us-east-1"
+ aws_domain = DataSyncTaskLink.get_aws_domain("aws")
+ task_id = self.task_arn.split("/")[-1]
+
+ base_url =
f"https://console.{aws_domain}/datasync/home?region={region}#"
+ task_url = f"{base_url}/tasks/{task_id}"
+
+ with mock.patch.object(self.datasync.log, "info") as mock_logging:
+ result = self.datasync.execute(None)
+ task_execution_arn = result["TaskExecutionArn"]
+ execution_id = task_execution_arn.split("/")[-1]
+ execution_url = f"{base_url}/history/{task_id}/{execution_id}"
+
+ assert self.datasync.task_arn == self.task_arn
+ mock_logging.assert_any_call("You can view this DataSync task at %s",
task_url)
+ mock_logging.assert_any_call("You can view this DataSync task
execution at %s", execution_url)
+
def test_execute_task(self, mock_get_conn):
# ### Set up mocks:
mock_get_conn.return_value = self.client