This is an automated email from the ASF dual-hosted git repository.
onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ebd588b9f4 Add AWS Step Functions links (#36599)
ebd588b9f4 is described below
commit ebd588b9f416ae37e46e9a0877c81215f5519afb
Author: Andrey Anshin <[email protected]>
AuthorDate: Thu Jan 4 22:59:44 2024 +0400
Add AWS Step Functions links (#36599)
---
.../providers/amazon/aws/links/step_function.py | 52 +++++++++++++++
.../amazon/aws/operators/step_function.py | 30 +++++++++
airflow/providers/amazon/provider.yaml | 2 +
.../amazon/aws/links/test_step_function.py | 74 ++++++++++++++++++++++
.../amazon/aws/operators/test_step_function.py | 40 ++++++++++++
5 files changed, 198 insertions(+)
diff --git a/airflow/providers/amazon/aws/links/step_function.py
b/airflow/providers/amazon/aws/links/step_function.py
new file mode 100644
index 0000000000..40720a8ef2
--- /dev/null
+++ b/airflow/providers/amazon/aws/links/step_function.py
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from urllib.parse import quote_plus
+
+from airflow.providers.amazon.aws.links.base_aws import BASE_AWS_CONSOLE_LINK,
BaseAwsLink
+
+
+class StateMachineDetailsLink(BaseAwsLink):
+ """Helper class for constructing link to State Machine details page."""
+
+ name = "State Machine Details"
+ key = "_state_machine_details"
+ format_str = (
+ BASE_AWS_CONSOLE_LINK +
"/states/home?region={region_name}#/statemachines/view/{state_machine_arn}"
+ )
+
+ def format_link(self, *, state_machine_arn: str | None = None, **kwargs)
-> str:
+ if not state_machine_arn:
+ return ""
+ return
super().format_link(state_machine_arn=quote_plus(state_machine_arn), **kwargs)
+
+
+class StateMachineExecutionsDetailsLink(BaseAwsLink):
+ """Helper class for constructing link to State Machine Execution details
page."""
+
+ name = "State Machine Executions Details"
+ key = "_state_machine_executions_details"
+ format_str = (
+ BASE_AWS_CONSOLE_LINK +
"/states/home?region={region_name}#/v2/executions/details/{execution_arn}"
+ )
+
+ def format_link(self, *, execution_arn: str | None = None, **kwargs) ->
str:
+ if not execution_arn:
+ return ""
+ return super().format_link(execution_arn=quote_plus(execution_arn),
**kwargs)
diff --git a/airflow/providers/amazon/aws/operators/step_function.py
b/airflow/providers/amazon/aws/operators/step_function.py
index e02de32bae..067d7e4529 100644
--- a/airflow/providers/amazon/aws/operators/step_function.py
+++ b/airflow/providers/amazon/aws/operators/step_function.py
@@ -23,6 +23,10 @@ from typing import TYPE_CHECKING, Any, Sequence
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.step_function import StepFunctionHook
+from airflow.providers.amazon.aws.links.step_function import (
+ StateMachineDetailsLink,
+ StateMachineExecutionsDetailsLink,
+)
from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator
from airflow.providers.amazon.aws.triggers.step_function import
StepFunctionsExecutionCompleteTrigger
from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
@@ -66,6 +70,7 @@ class
StepFunctionStartExecutionOperator(AwsBaseOperator[StepFunctionHook]):
aws_hook_class = StepFunctionHook
template_fields: Sequence[str] = aws_template_fields("state_machine_arn",
"name", "input")
ui_color = "#f9c915"
+ operator_extra_links = (StateMachineDetailsLink(),
StateMachineExecutionsDetailsLink())
def __init__(
self,
@@ -87,9 +92,25 @@ class
StepFunctionStartExecutionOperator(AwsBaseOperator[StepFunctionHook]):
self.deferrable = deferrable
def execute(self, context: Context):
+ StateMachineDetailsLink.persist(
+ context=context,
+ operator=self,
+ region_name=self.hook.conn_region_name,
+ aws_partition=self.hook.conn_partition,
+ state_machine_arn=self.state_machine_arn,
+ )
+
if not (execution_arn :=
self.hook.start_execution(self.state_machine_arn, self.name, self.input)):
raise AirflowException(f"Failed to start State Machine execution
for: {self.state_machine_arn}")
+ StateMachineExecutionsDetailsLink.persist(
+ context=context,
+ operator=self,
+ region_name=self.hook.conn_region_name,
+ aws_partition=self.hook.conn_partition,
+ execution_arn=execution_arn,
+ )
+
self.log.info("Started State Machine execution for %s: %s",
self.state_machine_arn, execution_arn)
if self.deferrable:
self.defer(
@@ -141,12 +162,21 @@ class
StepFunctionGetExecutionOutputOperator(AwsBaseOperator[StepFunctionHook]):
aws_hook_class = StepFunctionHook
template_fields: Sequence[str] = aws_template_fields("execution_arn")
ui_color = "#f9c915"
+ operator_extra_links = (StateMachineExecutionsDetailsLink(),)
def __init__(self, *, execution_arn: str, **kwargs):
super().__init__(**kwargs)
self.execution_arn = execution_arn
def execute(self, context: Context):
+ StateMachineExecutionsDetailsLink.persist(
+ context=context,
+ operator=self,
+ region_name=self.hook.conn_region_name,
+ aws_partition=self.hook.conn_partition,
+ execution_arn=self.execution_arn,
+ )
+
execution_status = self.hook.describe_execution(self.execution_arn)
response = None
if "output" in execution_status:
diff --git a/airflow/providers/amazon/provider.yaml
b/airflow/providers/amazon/provider.yaml
index ed8a288424..6d97926ad2 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -720,6 +720,8 @@ extra-links:
- airflow.providers.amazon.aws.links.emr.EmrLogsLink
- airflow.providers.amazon.aws.links.glue.GlueJobRunDetailsLink
- airflow.providers.amazon.aws.links.logs.CloudWatchEventsLink
+ - airflow.providers.amazon.aws.links.step_function.StateMachineDetailsLink
+ -
airflow.providers.amazon.aws.links.step_function.StateMachineExecutionsDetailsLink
connection-types:
- hook-class-name: airflow.providers.amazon.aws.hooks.base_aws.AwsGenericHook
diff --git a/tests/providers/amazon/aws/links/test_step_function.py
b/tests/providers/amazon/aws/links/test_step_function.py
new file mode 100644
index 0000000000..3c6c9cc7cd
--- /dev/null
+++ b/tests/providers/amazon/aws/links/test_step_function.py
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+
+from airflow.providers.amazon.aws.links.step_function import (
+ StateMachineDetailsLink,
+ StateMachineExecutionsDetailsLink,
+)
+from tests.providers.amazon.aws.links.test_base_aws import BaseAwsLinksTestCase
+
+
+class TestStateMachineDetailsLink(BaseAwsLinksTestCase):
+ link_class = StateMachineDetailsLink
+
+ @pytest.mark.parametrize(
+ "state_machine_arn, expected_url",
+ [
+ pytest.param("", "", id="empty-arn"),
+ pytest.param(None, "", id="arn-not-set"),
+ pytest.param(
+ "foo:bar",
+
"https://console.aws.amazon.com/states/home?region=eu-west-1#/statemachines/view/foo%3Abar",
+ id="arn-set",
+ ),
+ ],
+ )
+ def test_extra_link(self, state_machine_arn, expected_url: str):
+ self.assert_extra_link_url(
+ expected_url=expected_url,
+ region_name="eu-west-1",
+ aws_partition="aws",
+ state_machine_arn=state_machine_arn,
+ )
+
+
+class TestStateMachineExecutionsDetailsLink(BaseAwsLinksTestCase):
+ link_class = StateMachineExecutionsDetailsLink
+
+ @pytest.mark.parametrize(
+ "execution_arn, expected_url",
+ [
+ pytest.param("", "", id="empty-arn"),
+ pytest.param(None, "", id="arn-not-set"),
+ pytest.param(
+ "spam:egg:000000000",
+ "https://console.aws.amazon.com/states/home?region=eu-west-1"
+ "#/v2/executions/details/spam%3Aegg%3A000000000",
+ id="arn-set",
+ ),
+ ],
+ )
+ def test_extra_link(self, execution_arn, expected_url: str):
+ self.assert_extra_link_url(
+ expected_url=expected_url,
+ region_name="eu-west-1",
+ aws_partition="aws",
+ execution_arn=execution_arn,
+ )
diff --git a/tests/providers/amazon/aws/operators/test_step_function.py
b/tests/providers/amazon/aws/operators/test_step_function.py
index 6845a7f98a..904205418e 100644
--- a/tests/providers/amazon/aws/operators/test_step_function.py
+++ b/tests/providers/amazon/aws/operators/test_step_function.py
@@ -46,6 +46,14 @@ def mocked_context():
class TestStepFunctionGetExecutionOutputOperator:
TASK_ID = "step_function_get_execution_output"
+ @pytest.fixture(autouse=True)
+ def setup_test_cases(self):
+ with mock.patch(
+
"airflow.providers.amazon.aws.links.step_function.StateMachineExecutionsDetailsLink.persist"
+ ) as executions_details_link:
+ self.mocked_executions_details_link = executions_details_link
+ yield
+
def test_init(self):
op = StepFunctionGetExecutionOutputOperator(
task_id=self.TASK_ID,
@@ -86,11 +94,29 @@ class TestStepFunctionGetExecutionOutputOperator:
)
assert op.execute(mocked_context) == expected_output
mocked_hook.describe_execution.assert_called_once_with(EXECUTION_ARN)
+ self.mocked_executions_details_link.assert_called_once_with(
+ aws_partition=mock.ANY,
+ context=mock.ANY,
+ operator=mock.ANY,
+ region_name=mock.ANY,
+ execution_arn=EXECUTION_ARN,
+ )
class TestStepFunctionStartExecutionOperator:
TASK_ID = "step_function_start_execution_task"
+ @pytest.fixture(autouse=True)
+ def setup_test_cases(self):
+ with mock.patch(
+
"airflow.providers.amazon.aws.links.step_function.StateMachineExecutionsDetailsLink.persist"
+ ) as executions_details_link, mock.patch(
+
"airflow.providers.amazon.aws.links.step_function.StateMachineDetailsLink.persist"
+ ) as details_link:
+ self.mocked_executions_details_link = executions_details_link
+ self.mocked_details_link = details_link
+ yield
+
def test_init(self):
op = StepFunctionStartExecutionOperator(
task_id=self.TASK_ID,
@@ -134,6 +160,20 @@ class TestStepFunctionStartExecutionOperator:
)
assert op.execute(mocked_context) == hook_response
mocked_hook.start_execution.assert_called_once_with(STATE_MACHINE_ARN,
NAME, INPUT)
+ self.mocked_details_link.assert_called_once_with(
+ aws_partition=mock.ANY,
+ context=mock.ANY,
+ operator=mock.ANY,
+ region_name=mock.ANY,
+ state_machine_arn=STATE_MACHINE_ARN,
+ )
+ self.mocked_executions_details_link.assert_called_once_with(
+ aws_partition=mock.ANY,
+ context=mock.ANY,
+ operator=mock.ANY,
+ region_name=mock.ANY,
+ execution_arn=EXECUTION_ARN,
+ )
@mock.patch.object(StepFunctionStartExecutionOperator, "hook")
def test_step_function_start_execution_deferrable(self, mocked_hook):