This is an automated email from the ASF dual-hosted git repository.

onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ebd588b9f4 Add AWS Step Functions links (#36599)
ebd588b9f4 is described below

commit ebd588b9f416ae37e46e9a0877c81215f5519afb
Author: Andrey Anshin <[email protected]>
AuthorDate: Thu Jan 4 22:59:44 2024 +0400

    Add AWS Step Functions links (#36599)
---
 .../providers/amazon/aws/links/step_function.py    | 52 +++++++++++++++
 .../amazon/aws/operators/step_function.py          | 30 +++++++++
 airflow/providers/amazon/provider.yaml             |  2 +
 .../amazon/aws/links/test_step_function.py         | 74 ++++++++++++++++++++++
 .../amazon/aws/operators/test_step_function.py     | 40 ++++++++++++
 5 files changed, 198 insertions(+)

diff --git a/airflow/providers/amazon/aws/links/step_function.py 
b/airflow/providers/amazon/aws/links/step_function.py
new file mode 100644
index 0000000000..40720a8ef2
--- /dev/null
+++ b/airflow/providers/amazon/aws/links/step_function.py
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from urllib.parse import quote_plus
+
+from airflow.providers.amazon.aws.links.base_aws import BASE_AWS_CONSOLE_LINK, 
BaseAwsLink
+
+
+class StateMachineDetailsLink(BaseAwsLink):
+    """Helper class for constructing link to State Machine details page."""
+
+    name = "State Machine Details"
+    key = "_state_machine_details"
+    format_str = (
+        BASE_AWS_CONSOLE_LINK + 
"/states/home?region={region_name}#/statemachines/view/{state_machine_arn}"
+    )
+
+    def format_link(self, *, state_machine_arn: str | None = None, **kwargs) 
-> str:
+        if not state_machine_arn:
+            return ""
+        return 
super().format_link(state_machine_arn=quote_plus(state_machine_arn), **kwargs)
+
+
+class StateMachineExecutionsDetailsLink(BaseAwsLink):
+    """Helper class for constructing link to State Machine Execution details 
page."""
+
+    name = "State Machine Executions Details"
+    key = "_state_machine_executions_details"
+    format_str = (
+        BASE_AWS_CONSOLE_LINK + 
"/states/home?region={region_name}#/v2/executions/details/{execution_arn}"
+    )
+
+    def format_link(self, *, execution_arn: str | None = None, **kwargs) -> 
str:
+        if not execution_arn:
+            return ""
+        return super().format_link(execution_arn=quote_plus(execution_arn), 
**kwargs)
diff --git a/airflow/providers/amazon/aws/operators/step_function.py 
b/airflow/providers/amazon/aws/operators/step_function.py
index e02de32bae..067d7e4529 100644
--- a/airflow/providers/amazon/aws/operators/step_function.py
+++ b/airflow/providers/amazon/aws/operators/step_function.py
@@ -23,6 +23,10 @@ from typing import TYPE_CHECKING, Any, Sequence
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.hooks.step_function import StepFunctionHook
+from airflow.providers.amazon.aws.links.step_function import (
+    StateMachineDetailsLink,
+    StateMachineExecutionsDetailsLink,
+)
 from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator
 from airflow.providers.amazon.aws.triggers.step_function import 
StepFunctionsExecutionCompleteTrigger
 from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
@@ -66,6 +70,7 @@ class 
StepFunctionStartExecutionOperator(AwsBaseOperator[StepFunctionHook]):
     aws_hook_class = StepFunctionHook
     template_fields: Sequence[str] = aws_template_fields("state_machine_arn", 
"name", "input")
     ui_color = "#f9c915"
+    operator_extra_links = (StateMachineDetailsLink(), 
StateMachineExecutionsDetailsLink())
 
     def __init__(
         self,
@@ -87,9 +92,25 @@ class 
StepFunctionStartExecutionOperator(AwsBaseOperator[StepFunctionHook]):
         self.deferrable = deferrable
 
     def execute(self, context: Context):
+        StateMachineDetailsLink.persist(
+            context=context,
+            operator=self,
+            region_name=self.hook.conn_region_name,
+            aws_partition=self.hook.conn_partition,
+            state_machine_arn=self.state_machine_arn,
+        )
+
         if not (execution_arn := 
self.hook.start_execution(self.state_machine_arn, self.name, self.input)):
             raise AirflowException(f"Failed to start State Machine execution 
for: {self.state_machine_arn}")
 
+        StateMachineExecutionsDetailsLink.persist(
+            context=context,
+            operator=self,
+            region_name=self.hook.conn_region_name,
+            aws_partition=self.hook.conn_partition,
+            execution_arn=execution_arn,
+        )
+
         self.log.info("Started State Machine execution for %s: %s", 
self.state_machine_arn, execution_arn)
         if self.deferrable:
             self.defer(
@@ -141,12 +162,21 @@ class 
StepFunctionGetExecutionOutputOperator(AwsBaseOperator[StepFunctionHook]):
     aws_hook_class = StepFunctionHook
     template_fields: Sequence[str] = aws_template_fields("execution_arn")
     ui_color = "#f9c915"
+    operator_extra_links = (StateMachineExecutionsDetailsLink(),)
 
     def __init__(self, *, execution_arn: str, **kwargs):
         super().__init__(**kwargs)
         self.execution_arn = execution_arn
 
     def execute(self, context: Context):
+        StateMachineExecutionsDetailsLink.persist(
+            context=context,
+            operator=self,
+            region_name=self.hook.conn_region_name,
+            aws_partition=self.hook.conn_partition,
+            execution_arn=self.execution_arn,
+        )
+
         execution_status = self.hook.describe_execution(self.execution_arn)
         response = None
         if "output" in execution_status:
diff --git a/airflow/providers/amazon/provider.yaml 
b/airflow/providers/amazon/provider.yaml
index ed8a288424..6d97926ad2 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -720,6 +720,8 @@ extra-links:
   - airflow.providers.amazon.aws.links.emr.EmrLogsLink
   - airflow.providers.amazon.aws.links.glue.GlueJobRunDetailsLink
   - airflow.providers.amazon.aws.links.logs.CloudWatchEventsLink
+  - airflow.providers.amazon.aws.links.step_function.StateMachineDetailsLink
+  - 
airflow.providers.amazon.aws.links.step_function.StateMachineExecutionsDetailsLink
 
 connection-types:
   - hook-class-name: airflow.providers.amazon.aws.hooks.base_aws.AwsGenericHook
diff --git a/tests/providers/amazon/aws/links/test_step_function.py 
b/tests/providers/amazon/aws/links/test_step_function.py
new file mode 100644
index 0000000000..3c6c9cc7cd
--- /dev/null
+++ b/tests/providers/amazon/aws/links/test_step_function.py
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+
+from airflow.providers.amazon.aws.links.step_function import (
+    StateMachineDetailsLink,
+    StateMachineExecutionsDetailsLink,
+)
+from tests.providers.amazon.aws.links.test_base_aws import BaseAwsLinksTestCase
+
+
+class TestStateMachineDetailsLink(BaseAwsLinksTestCase):
+    link_class = StateMachineDetailsLink
+
+    @pytest.mark.parametrize(
+        "state_machine_arn, expected_url",
+        [
+            pytest.param("", "", id="empty-arn"),
+            pytest.param(None, "", id="arn-not-set"),
+            pytest.param(
+                "foo:bar",
+                
"https://console.aws.amazon.com/states/home?region=eu-west-1#/statemachines/view/foo%3Abar";,
+                id="arn-set",
+            ),
+        ],
+    )
+    def test_extra_link(self, state_machine_arn, expected_url: str):
+        self.assert_extra_link_url(
+            expected_url=expected_url,
+            region_name="eu-west-1",
+            aws_partition="aws",
+            state_machine_arn=state_machine_arn,
+        )
+
+
+class TestStateMachineExecutionsDetailsLink(BaseAwsLinksTestCase):
+    link_class = StateMachineExecutionsDetailsLink
+
+    @pytest.mark.parametrize(
+        "execution_arn, expected_url",
+        [
+            pytest.param("", "", id="empty-arn"),
+            pytest.param(None, "", id="arn-not-set"),
+            pytest.param(
+                "spam:egg:000000000",
+                "https://console.aws.amazon.com/states/home?region=eu-west-1";
+                "#/v2/executions/details/spam%3Aegg%3A000000000",
+                id="arn-set",
+            ),
+        ],
+    )
+    def test_extra_link(self, execution_arn, expected_url: str):
+        self.assert_extra_link_url(
+            expected_url=expected_url,
+            region_name="eu-west-1",
+            aws_partition="aws",
+            execution_arn=execution_arn,
+        )
diff --git a/tests/providers/amazon/aws/operators/test_step_function.py 
b/tests/providers/amazon/aws/operators/test_step_function.py
index 6845a7f98a..904205418e 100644
--- a/tests/providers/amazon/aws/operators/test_step_function.py
+++ b/tests/providers/amazon/aws/operators/test_step_function.py
@@ -46,6 +46,14 @@ def mocked_context():
 class TestStepFunctionGetExecutionOutputOperator:
     TASK_ID = "step_function_get_execution_output"
 
+    @pytest.fixture(autouse=True)
+    def setup_test_cases(self):
+        with mock.patch(
+            
"airflow.providers.amazon.aws.links.step_function.StateMachineExecutionsDetailsLink.persist"
+        ) as executions_details_link:
+            self.mocked_executions_details_link = executions_details_link
+            yield
+
     def test_init(self):
         op = StepFunctionGetExecutionOutputOperator(
             task_id=self.TASK_ID,
@@ -86,11 +94,29 @@ class TestStepFunctionGetExecutionOutputOperator:
         )
         assert op.execute(mocked_context) == expected_output
         mocked_hook.describe_execution.assert_called_once_with(EXECUTION_ARN)
+        self.mocked_executions_details_link.assert_called_once_with(
+            aws_partition=mock.ANY,
+            context=mock.ANY,
+            operator=mock.ANY,
+            region_name=mock.ANY,
+            execution_arn=EXECUTION_ARN,
+        )
 
 
 class TestStepFunctionStartExecutionOperator:
     TASK_ID = "step_function_start_execution_task"
 
+    @pytest.fixture(autouse=True)
+    def setup_test_cases(self):
+        with mock.patch(
+            
"airflow.providers.amazon.aws.links.step_function.StateMachineExecutionsDetailsLink.persist"
+        ) as executions_details_link, mock.patch(
+            
"airflow.providers.amazon.aws.links.step_function.StateMachineDetailsLink.persist"
+        ) as details_link:
+            self.mocked_executions_details_link = executions_details_link
+            self.mocked_details_link = details_link
+            yield
+
     def test_init(self):
         op = StepFunctionStartExecutionOperator(
             task_id=self.TASK_ID,
@@ -134,6 +160,20 @@ class TestStepFunctionStartExecutionOperator:
         )
         assert op.execute(mocked_context) == hook_response
         mocked_hook.start_execution.assert_called_once_with(STATE_MACHINE_ARN, 
NAME, INPUT)
+        self.mocked_details_link.assert_called_once_with(
+            aws_partition=mock.ANY,
+            context=mock.ANY,
+            operator=mock.ANY,
+            region_name=mock.ANY,
+            state_machine_arn=STATE_MACHINE_ARN,
+        )
+        self.mocked_executions_details_link.assert_called_once_with(
+            aws_partition=mock.ANY,
+            context=mock.ANY,
+            operator=mock.ANY,
+            region_name=mock.ANY,
+            execution_arn=EXECUTION_ARN,
+        )
 
     @mock.patch.object(StepFunctionStartExecutionOperator, "hook")
     def test_step_function_start_execution_deferrable(self, mocked_hook):

Reply via email to