This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0ac475f1d8a Add Amazon Bedrock AgentCore Runtime operators (#67984)
0ac475f1d8a is described below
commit 0ac475f1d8a50657ed80b4ef495e02ab2add8c01
Author: Morgan <[email protected]>
AuthorDate: Fri Jun 5 10:20:06 2026 -0300
Add Amazon Bedrock AgentCore Runtime operators (#67984)
---
docs/spelling_wordlist.txt | 1 +
providers/amazon/docs/operators/bedrock.rst | 52 +++++
.../airflow/providers/amazon/aws/hooks/bedrock.py | 40 ++++
.../providers/amazon/aws/operators/bedrock.py | 247 ++++++++++++++++++++
.../providers/amazon/aws/triggers/bedrock.py | 53 ++++-
.../aws/waiters/bedrock-agentcore-control.json | 48 ++++
.../system/amazon/aws/example_bedrock_agentcore.py | 111 +++++++++
.../tests/unit/amazon/aws/hooks/test_bedrock.py | 4 +
.../unit/amazon/aws/operators/test_bedrock.py | 249 ++++++++++++++++++++-
.../tests/unit/amazon/aws/triggers/test_bedrock.py | 47 +++-
.../aws/waiters/test_bedrock_agentcore_control.py | 74 ++++++
11 files changed, 923 insertions(+), 3 deletions(-)
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 7f24b753a17..202d24f00a3 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -23,6 +23,7 @@ adobjects
AdsInsights
adsinsights
afterall
+agentcore
agentic
ai
aio
diff --git a/providers/amazon/docs/operators/bedrock.rst
b/providers/amazon/docs/operators/bedrock.rst
index 6cfd217b400..bdf628d9f79 100644
--- a/providers/amazon/docs/operators/bedrock.rst
+++ b/providers/amazon/docs/operators/bedrock.rst
@@ -74,6 +74,58 @@ To invoke a Claude Sonnet model using the Messages API you
would use:
:start-after: [START howto_operator_invoke_claude_model]
:end-before: [END howto_operator_invoke_claude_model]
+.. _howto/operator:BedrockCreateAgentRuntimeOperator:
+
+Create an Amazon Bedrock AgentCore Runtime
+==========================================
+
+To create an Amazon Bedrock AgentCore Runtime, you can use
+:class:`~airflow.providers.amazon.aws.operators.bedrock.BedrockCreateAgentRuntimeOperator`.
+
+AgentCore Runtime creation is asynchronous. The operator can wait until the
runtime reaches
+``READY`` either synchronously or in deferrable mode.
+
+.. exampleinclude::
/../../amazon/tests/system/amazon/aws/example_bedrock_agentcore.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_bedrock_create_agent_runtime]
+ :end-before: [END howto_operator_bedrock_create_agent_runtime]
+
+.. _howto/operator:BedrockInvokeAgentRuntimeOperator:
+
+Invoke an Amazon Bedrock AgentCore Runtime
+==========================================
+
+To invoke an Amazon Bedrock AgentCore Runtime, you can use
+:class:`~airflow.providers.amazon.aws.operators.bedrock.BedrockInvokeAgentRuntimeOperator`.
+
+The operator waits for the runtime invocation response and returns the
response body along with
+runtime response metadata such as the runtime session ID when present.
+For long-running AgentCore Runtime invocations, configure the botocore client
``read_timeout``
+through ``botocore_config``.
+
+.. exampleinclude::
/../../amazon/tests/system/amazon/aws/example_bedrock_agentcore.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_bedrock_invoke_agent_runtime]
+ :end-before: [END howto_operator_bedrock_invoke_agent_runtime]
+
+.. _howto/operator:BedrockDeleteAgentRuntimeOperator:
+
+Delete an Amazon Bedrock AgentCore Runtime
+==========================================
+
+To delete an Amazon Bedrock AgentCore Runtime, you can use
+:class:`~airflow.providers.amazon.aws.operators.bedrock.BedrockDeleteAgentRuntimeOperator`.
+
+The operator accepts the runtime ID, which can be extracted from the ARN
returned by
+:class:`~airflow.providers.amazon.aws.operators.bedrock.BedrockCreateAgentRuntimeOperator`.
+
+.. exampleinclude::
/../../amazon/tests/system/amazon/aws/example_bedrock_agentcore.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_bedrock_delete_agent_runtime]
+ :end-before: [END howto_operator_bedrock_delete_agent_runtime]
.. _howto/operator:BedrockCustomizeModelOperator:
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/hooks/bedrock.py
b/providers/amazon/src/airflow/providers/amazon/aws/hooks/bedrock.py
index c8fd53a9b7f..9042f0754b7 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/bedrock.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/bedrock.py
@@ -106,3 +106,43 @@ class BedrockAgentRuntimeHook(AwsBaseHook):
def __init__(self, *args, **kwargs) -> None:
kwargs["client_type"] = self.client_type
super().__init__(*args, **kwargs)
+
+
+class BedrockAgentCoreControlHook(AwsBaseHook):
+ """
+ Interact with the Amazon Bedrock AgentCore control plane API.
+
+ Provide thin wrapper around
:external+boto3:py:class:`boto3.client("bedrock-agentcore-control")
<BedrockAgentCoreControl.Client>`.
+
+ Additional arguments (such as ``aws_conn_id``) may be specified and
+ are passed down to the underlying AwsBaseHook.
+
+ .. seealso::
+ - :class:`airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+ """
+
+ client_type = "bedrock-agentcore-control"
+
+ def __init__(self, *args, **kwargs) -> None:
+ kwargs["client_type"] = self.client_type
+ super().__init__(*args, **kwargs)
+
+
+class BedrockAgentCoreHook(AwsBaseHook):
+ """
+ Interact with the Amazon Bedrock AgentCore runtime plane API.
+
+ Provide thin wrapper around
:external+boto3:py:class:`boto3.client("bedrock-agentcore")
<BedrockAgentCore.Client>`.
+
+ Additional arguments (such as ``aws_conn_id``) may be specified and
+ are passed down to the underlying AwsBaseHook.
+
+ .. seealso::
+ - :class:`airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+ """
+
+ client_type = "bedrock-agentcore"
+
+ def __init__(self, *args, **kwargs) -> None:
+ kwargs["client_type"] = self.client_type
+ super().__init__(*args, **kwargs)
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/operators/bedrock.py
b/providers/amazon/src/airflow/providers/amazon/aws/operators/bedrock.py
index d4c6c7fb36e..fe0cecf6b04 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/bedrock.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/bedrock.py
@@ -24,6 +24,8 @@ from typing import TYPE_CHECKING, Any, Literal
from botocore.exceptions import ClientError
from airflow.providers.amazon.aws.hooks.bedrock import (
+ BedrockAgentCoreControlHook,
+ BedrockAgentCoreHook,
BedrockAgentHook,
BedrockAgentRuntimeHook,
BedrockHook,
@@ -31,6 +33,7 @@ from airflow.providers.amazon.aws.hooks.bedrock import (
)
from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator
from airflow.providers.amazon.aws.triggers.bedrock import (
+ BedrockAgentRuntimeReadyTrigger,
BedrockBatchInferenceCompletedTrigger,
BedrockCustomizeModelCompletedTrigger,
BedrockIngestionJobTrigger,
@@ -111,6 +114,250 @@ class
BedrockInvokeModelOperator(AwsBaseOperator[BedrockRuntimeHook]):
return response_body
+class
BedrockCreateAgentRuntimeOperator(AwsBaseOperator[BedrockAgentCoreControlHook]):
+ """
+ Create an Amazon Bedrock AgentCore Runtime.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:BedrockCreateAgentRuntimeOperator`
+
+ :param agent_runtime_name: The name of the AgentCore Runtime. (templated)
+ :param agent_runtime_artifact: The artifact configuration for the
AgentCore Runtime. (templated)
+ :param role_arn: The ARN of the IAM role for the AgentCore Runtime.
(templated)
+ :param network_configuration: The network configuration for the AgentCore
Runtime. (templated)
+ :param create_agent_runtime_kwargs: Any optional parameters to pass to the
API. (templated)
+ :param wait_for_completion: Whether to wait for the AgentCore Runtime to
reach READY. (default: True)
+ :param waiter_delay: Time in seconds to wait between status checks.
(default: 60)
+ :param waiter_max_attempts: Maximum number of attempts to check for
runtime readiness. (default: 20)
+ :param deferrable: If True, the operator will wait asynchronously for the
AgentCore Runtime
+ to reach READY. This implies waiting for completion. This mode
requires aiobotocore
+ module to be installed. (default: False)
+ :param aws_conn_id: The Airflow connection used for AWS credentials.
+ If this is ``None`` or empty then the default boto3 behaviour is used.
If
+ running Airflow in a distributed manner and aws_conn_id is None or
+ empty, then default boto3 configuration would be used (and must be
+ maintained on each worker node).
+ :param region_name: AWS region_name. If not specified then the default
boto3 behaviour is used.
+ :param verify: Whether or not to verify SSL certificates. See:
+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
+ :param botocore_config: Configuration dictionary (key-values) for botocore
client. See:
+
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
+ """
+
+ aws_hook_class = BedrockAgentCoreControlHook
+ template_fields: Sequence[str] = aws_template_fields(
+ "agent_runtime_name",
+ "agent_runtime_artifact",
+ "role_arn",
+ "network_configuration",
+ "create_agent_runtime_kwargs",
+ )
+
+ def __init__(
+ self,
+ *,
+ agent_runtime_name: str,
+ agent_runtime_artifact: dict[str, Any],
+ role_arn: str,
+ network_configuration: dict[str, Any],
+ create_agent_runtime_kwargs: dict[str, Any] | None = None,
+ wait_for_completion: bool = True,
+ waiter_delay: int = 60,
+ waiter_max_attempts: int = 20,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+ self.agent_runtime_name = agent_runtime_name
+ self.agent_runtime_artifact = agent_runtime_artifact
+ self.role_arn = role_arn
+ self.network_configuration = network_configuration
+ self.create_agent_runtime_kwargs = create_agent_runtime_kwargs or {}
+ self.wait_for_completion = wait_for_completion
+ self.waiter_delay = waiter_delay
+ self.waiter_max_attempts = waiter_max_attempts
+ self.deferrable = deferrable
+
+ def execute_complete(self, context: Context, event: dict[str, Any] | None
= None) -> str:
+ validated_event = validate_execute_complete_event(event)
+
+ if validated_event["status"] != "success":
+ raise RuntimeError(f"Error while creating AgentCore Runtime:
{validated_event}")
+
+ self.log.info("Bedrock AgentCore Runtime `%s` is ready.",
validated_event["agent_runtime_arn"])
+ return validated_event["agent_runtime_arn"]
+
+ def execute(self, context: Context) -> str:
+ response = self.hook.conn.create_agent_runtime(
+ agentRuntimeName=self.agent_runtime_name,
+ agentRuntimeArtifact=self.agent_runtime_artifact,
+ roleArn=self.role_arn,
+ networkConfiguration=self.network_configuration,
+ **self.create_agent_runtime_kwargs,
+ )
+ agent_runtime_arn = response["agentRuntimeArn"]
+ agent_runtime_id = response["agentRuntimeId"]
+ agent_runtime_version = response["agentRuntimeVersion"]
+
+ if self.deferrable:
+ self.log.info("Deferring until AgentCore Runtime %s reaches
READY.", agent_runtime_arn)
+ self.defer(
+ trigger=BedrockAgentRuntimeReadyTrigger(
+ agent_runtime_id=agent_runtime_id,
+ agent_runtime_version=agent_runtime_version,
+ agent_runtime_arn=agent_runtime_arn,
+ waiter_delay=self.waiter_delay,
+ waiter_max_attempts=self.waiter_max_attempts,
+ aws_conn_id=self.aws_conn_id,
+ ),
+ method_name="execute_complete",
+ )
+ elif self.wait_for_completion:
+ self.log.info("Waiting for AgentCore Runtime %s to reach READY.",
agent_runtime_arn)
+ self.hook.get_waiter("agent_runtime_ready").wait(
+ agentRuntimeId=agent_runtime_id,
+ agentRuntimeVersion=agent_runtime_version,
+ WaiterConfig={"Delay": self.waiter_delay, "MaxAttempts":
self.waiter_max_attempts},
+ )
+
+ return agent_runtime_arn
+
+
+class BedrockInvokeAgentRuntimeOperator(AwsBaseOperator[BedrockAgentCoreHook]):
+ """
+ Invoke an Amazon Bedrock AgentCore Runtime.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:BedrockInvokeAgentRuntimeOperator`
+
+ :param agent_runtime_arn: The ARN of the AgentCore Runtime to invoke.
(templated)
+ :param payload: The invocation payload. Dict and list payloads are JSON
serialized. (templated)
+ :param content_type: The MIME type of the input payload. (templated)
Default: application/json
+ :param accept: The desired MIME type of the response. (templated) Default:
application/json
+ :param invoke_agent_runtime_kwargs: Any optional parameters to pass to the
API. (templated)
+ :param aws_conn_id: The Airflow connection used for AWS credentials.
+ If this is ``None`` or empty then the default boto3 behaviour is used.
If
+ running Airflow in a distributed manner and aws_conn_id is None or
+ empty, then default boto3 configuration would be used (and must be
+ maintained on each worker node).
+ :param region_name: AWS region_name. If not specified then the default
boto3 behaviour is used.
+ :param verify: Whether or not to verify SSL certificates. See:
+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
+ :param botocore_config: Configuration dictionary (key-values) for botocore
client. See:
+
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
+ """
+
+ aws_hook_class = BedrockAgentCoreHook
+ template_fields: Sequence[str] = aws_template_fields(
+ "agent_runtime_arn",
+ "payload",
+ "content_type",
+ "accept",
+ "invoke_agent_runtime_kwargs",
+ )
+
+ def __init__(
+ self,
+ *,
+ agent_runtime_arn: str,
+ payload: dict[str, Any] | list[Any] | str | bytes,
+ content_type: str | None = "application/json",
+ accept: str | None = "application/json",
+ invoke_agent_runtime_kwargs: dict[str, Any] | None = None,
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+ self.agent_runtime_arn = agent_runtime_arn
+ self.payload = payload
+ self.content_type = content_type
+ self.accept = accept
+ self.invoke_agent_runtime_kwargs = invoke_agent_runtime_kwargs or {}
+
+ @staticmethod
+ def _serialize_payload(payload: dict[str, Any] | list[Any] | str | bytes)
-> bytes:
+ if isinstance(payload, bytes):
+ return payload
+ if isinstance(payload, str):
+ return payload.encode()
+ return json.dumps(payload).encode()
+
+ @staticmethod
+ def _read_response_body(response_body: Any) -> bytes:
+ if hasattr(response_body, "read"):
+ response_body = response_body.read()
+ if isinstance(response_body, bytes):
+ return response_body
+ if isinstance(response_body, str):
+ return response_body.encode()
+ return json.dumps(response_body).encode()
+
+ @staticmethod
+ def _deserialize_response_body(response_body: bytes, content_type: str |
None) -> Any:
+ try:
+ response_text = response_body.decode()
+ except UnicodeDecodeError:
+ return response_body
+
+ if content_type and "json" in content_type.lower():
+ return json.loads(response_text)
+ return response_text
+
+ def execute(self, context: Context) -> dict[str, Any]:
+ response = self.hook.conn.invoke_agent_runtime(
+ **prune_dict(
+ {
+ "agentRuntimeArn": self.agent_runtime_arn,
+ "payload": self._serialize_payload(self.payload),
+ "contentType": self.content_type,
+ "accept": self.accept,
+ **self.invoke_agent_runtime_kwargs,
+ }
+ )
+ )
+ response_body = self._deserialize_response_body(
+ self._read_response_body(response["response"]),
+ response.get("contentType") or self.accept,
+ )
+ return {
+ key: value for key, value in response.items() if key not in
{"ResponseMetadata", "response"}
+ } | {"response": response_body}
+
+
+class
BedrockDeleteAgentRuntimeOperator(AwsBaseOperator[BedrockAgentCoreControlHook]):
+ """
+ Delete an Amazon Bedrock AgentCore Runtime.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:BedrockDeleteAgentRuntimeOperator`
+
+ :param agent_runtime_id: The unique identifier of the AgentCore Runtime to
delete. (templated)
+ :param aws_conn_id: The Airflow connection used for AWS credentials.
+ If this is ``None`` or empty then the default boto3 behaviour is used.
If
+ running Airflow in a distributed manner and aws_conn_id is None or
+ empty, then default boto3 configuration would be used (and must be
+ maintained on each worker node).
+ :param region_name: AWS region_name. If not specified then the default
boto3 behaviour is used.
+ :param verify: Whether or not to verify SSL certificates. See:
+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
+ :param botocore_config: Configuration dictionary (key-values) for botocore
client. See:
+
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
+ """
+
+ aws_hook_class = BedrockAgentCoreControlHook
+ template_fields: Sequence[str] = aws_template_fields("agent_runtime_id")
+
+ def __init__(self, *, agent_runtime_id: str, **kwargs):
+ super().__init__(**kwargs)
+ self.agent_runtime_id = agent_runtime_id
+
+ def execute(self, context: Context) -> None:
+
self.hook.conn.delete_agent_runtime(agentRuntimeId=self.agent_runtime_id)
+ self.log.info("Deleted Bedrock AgentCore Runtime %s.",
self.agent_runtime_id)
+
+
class BedrockCustomizeModelOperator(AwsBaseOperator[BedrockHook]):
"""
Create a fine-tuning job to customize a base model.
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/triggers/bedrock.py
b/providers/amazon/src/airflow/providers/amazon/aws/triggers/bedrock.py
index 70d8254a316..49b4ce17da8 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/triggers/bedrock.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/triggers/bedrock.py
@@ -18,7 +18,11 @@ from __future__ import annotations
from typing import TYPE_CHECKING
-from airflow.providers.amazon.aws.hooks.bedrock import BedrockAgentHook,
BedrockHook
+from airflow.providers.amazon.aws.hooks.bedrock import (
+ BedrockAgentCoreControlHook,
+ BedrockAgentHook,
+ BedrockHook,
+)
from airflow.providers.amazon.aws.triggers.base import AwsBaseWaiterTrigger
from airflow.providers.amazon.version_compat import NOTSET, ArgNotSet
@@ -185,6 +189,53 @@ class BedrockIngestionJobTrigger(AwsBaseWaiterTrigger):
return BedrockAgentHook(aws_conn_id=self.aws_conn_id)
+class BedrockAgentRuntimeReadyTrigger(AwsBaseWaiterTrigger):
+ """
+ Trigger when a Bedrock AgentCore Runtime reaches the READY state.
+
+ :param agent_runtime_id: The unique identifier of the AgentCore Runtime.
+ :param agent_runtime_version: The version of the AgentCore Runtime.
+ :param agent_runtime_arn: The ARN of the AgentCore Runtime.
+ :param waiter_delay: The amount of time in seconds to wait between
attempts. (default: 60)
+ :param waiter_max_attempts: The maximum number of attempts to be made.
(default: 20)
+ :param aws_conn_id: The Airflow connection used for AWS credentials.
+ """
+
+ def __init__(
+ self,
+ *,
+ agent_runtime_id: str,
+ agent_runtime_version: str,
+ agent_runtime_arn: str,
+ waiter_delay: int = 60,
+ waiter_max_attempts: int = 20,
+ aws_conn_id: str | None = None,
+ ) -> None:
+ super().__init__(
+ serialized_fields={
+ "agent_runtime_id": agent_runtime_id,
+ "agent_runtime_version": agent_runtime_version,
+ "agent_runtime_arn": agent_runtime_arn,
+ },
+ waiter_name="agent_runtime_ready",
+ waiter_args={
+ "agentRuntimeId": agent_runtime_id,
+ "agentRuntimeVersion": agent_runtime_version,
+ },
+ failure_message="Bedrock AgentCore Runtime creation failed.",
+ status_message="Status of Bedrock AgentCore Runtime is",
+ status_queries=["status"],
+ return_key="agent_runtime_arn",
+ return_value=agent_runtime_arn,
+ waiter_delay=waiter_delay,
+ waiter_max_attempts=waiter_max_attempts,
+ aws_conn_id=aws_conn_id,
+ )
+
+ def hook(self) -> AwsGenericHook:
+ return BedrockAgentCoreControlHook(aws_conn_id=self.aws_conn_id)
+
+
class BedrockBaseBatchInferenceTrigger(AwsBaseWaiterTrigger):
"""
Trigger when a batch inference job is complete.
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/waiters/bedrock-agentcore-control.json
b/providers/amazon/src/airflow/providers/amazon/aws/waiters/bedrock-agentcore-control.json
new file mode 100644
index 00000000000..0e12b88e3ff
--- /dev/null
+++
b/providers/amazon/src/airflow/providers/amazon/aws/waiters/bedrock-agentcore-control.json
@@ -0,0 +1,48 @@
+{
+ "version": 2,
+ "waiters": {
+ "agent_runtime_ready": {
+ "delay": 60,
+ "maxAttempts": 20,
+ "operation": "GetAgentRuntime",
+ "acceptors": [
+ {
+ "matcher": "path",
+ "argument": "status",
+ "expected": "READY",
+ "state": "success"
+ },
+ {
+ "matcher": "path",
+ "argument": "status",
+ "expected": "CREATING",
+ "state": "retry"
+ },
+ {
+ "matcher": "path",
+ "argument": "status",
+ "expected": "UPDATING",
+ "state": "retry"
+ },
+ {
+ "matcher": "path",
+ "argument": "status",
+ "expected": "CREATE_FAILED",
+ "state": "failure"
+ },
+ {
+ "matcher": "path",
+ "argument": "status",
+ "expected": "UPDATE_FAILED",
+ "state": "failure"
+ },
+ {
+ "matcher": "path",
+ "argument": "status",
+ "expected": "DELETING",
+ "state": "failure"
+ }
+ ]
+ }
+ }
+}
diff --git
a/providers/amazon/tests/system/amazon/aws/example_bedrock_agentcore.py
b/providers/amazon/tests/system/amazon/aws/example_bedrock_agentcore.py
new file mode 100644
index 00000000000..ca93029c93c
--- /dev/null
+++ b/providers/amazon/tests/system/amazon/aws/example_bedrock_agentcore.py
@@ -0,0 +1,111 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime
+
+from airflow.providers.amazon.aws.operators.bedrock import (
+ BedrockCreateAgentRuntimeOperator,
+ BedrockDeleteAgentRuntimeOperator,
+ BedrockInvokeAgentRuntimeOperator,
+)
+
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
+
+if AIRFLOW_V_3_0_PLUS:
+ from airflow.sdk import DAG, chain
+else:
+ from airflow.models.baseoperator import chain # type:
ignore[attr-defined,no-redef]
+ from airflow.models.dag import DAG # type:
ignore[attr-defined,no-redef,assignment]
+
+try:
+ from airflow.sdk import TriggerRule
+except ImportError:
+ from airflow.utils.trigger_rule import TriggerRule # type:
ignore[no-redef,attr-defined]
+
+from system.amazon.aws.utils import SystemTestContextBuilder
+
+# Externally fetched variables:
+ROLE_ARN_KEY = "ROLE_ARN"
+CONTAINER_URI_KEY = "CONTAINER_URI"
+
+sys_test_context_task = (
+
SystemTestContextBuilder().add_variable(ROLE_ARN_KEY).add_variable(CONTAINER_URI_KEY).build()
+)
+
+DAG_ID = "example_bedrock_agentcore"
+
+with DAG(
+ dag_id=DAG_ID,
+ schedule="@once",
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+) as dag:
+ test_context = sys_test_context_task()
+ env_id = test_context["ENV_ID"]
+
+ runtime_name = f"airflow-agentcore-{env_id}"
+
+ # [START howto_operator_bedrock_create_agent_runtime]
+ create_agent_runtime = BedrockCreateAgentRuntimeOperator(
+ task_id="create_agent_runtime",
+ agent_runtime_name=runtime_name,
+ agent_runtime_artifact={
+ "containerConfiguration": {
+ "containerUri": test_context[CONTAINER_URI_KEY],
+ },
+ },
+ role_arn=test_context[ROLE_ARN_KEY],
+ network_configuration={"networkMode": "PUBLIC"},
+ deferrable=True,
+ )
+ # [END howto_operator_bedrock_create_agent_runtime]
+
+ # [START howto_operator_bedrock_invoke_agent_runtime]
+ invoke_agent_runtime = BedrockInvokeAgentRuntimeOperator(
+ task_id="invoke_agent_runtime",
+ agent_runtime_arn=create_agent_runtime.output,
+ payload={"prompt": "Hello from Airflow"},
+ botocore_config={"read_timeout": 300},
+ )
+ # [END howto_operator_bedrock_invoke_agent_runtime]
+
+ # [START howto_operator_bedrock_delete_agent_runtime]
+ delete_agent_runtime = BedrockDeleteAgentRuntimeOperator(
+ task_id="delete_agent_runtime",
+ agent_runtime_id="{{
task_instance.xcom_pull('create_agent_runtime').split('/')[-1] }}",
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+ # [END howto_operator_bedrock_delete_agent_runtime]
+
+ chain(
+ # TEST SETUP
+ test_context,
+ # TEST BODY
+ create_agent_runtime,
+ invoke_agent_runtime,
+ # TEST TEARDOWN
+ delete_agent_runtime,
+ )
+
+ from tests_common.test_utils.watcher import watcher
+
+ list(dag.tasks) >> watcher()
+
+from tests_common.test_utils.system_tests import get_test_run # noqa: E402
+
+test_run = get_test_run(dag)
diff --git a/providers/amazon/tests/unit/amazon/aws/hooks/test_bedrock.py
b/providers/amazon/tests/unit/amazon/aws/hooks/test_bedrock.py
index 19fbdf41043..b8577a99290 100644
--- a/providers/amazon/tests/unit/amazon/aws/hooks/test_bedrock.py
+++ b/providers/amazon/tests/unit/amazon/aws/hooks/test_bedrock.py
@@ -21,6 +21,8 @@ from unittest import mock
import pytest
from airflow.providers.amazon.aws.hooks.bedrock import (
+ BedrockAgentCoreControlHook,
+ BedrockAgentCoreHook,
BedrockAgentHook,
BedrockAgentRuntimeHook,
BedrockHook,
@@ -36,6 +38,8 @@ class TestBedrockHooks:
pytest.param(BedrockRuntimeHook(), "bedrock-runtime",
id="bedrock-runtime"),
pytest.param(BedrockAgentHook(), "bedrock-agent",
id="bedrock-agent"),
pytest.param(BedrockAgentRuntimeHook(), "bedrock-agent-runtime",
id="bedrock-agent-runtime"),
+ pytest.param(BedrockAgentCoreControlHook(),
"bedrock-agentcore-control", id="agentcore-control"),
+ pytest.param(BedrockAgentCoreHook(), "bedrock-agentcore",
id="agentcore"),
],
)
def test_bedrock_hooks(self, test_hook, service_name):
diff --git a/providers/amazon/tests/unit/amazon/aws/operators/test_bedrock.py
b/providers/amazon/tests/unit/amazon/aws/operators/test_bedrock.py
index e0b6f8f7cbb..8a97f09dc2d 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_bedrock.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_bedrock.py
@@ -19,6 +19,7 @@ from __future__ import annotations
import json
from collections.abc import Generator
+from io import BytesIO
from typing import TYPE_CHECKING
from unittest import mock
@@ -26,9 +27,16 @@ import pytest
from botocore.exceptions import ClientError
from moto import mock_aws
-from airflow.providers.amazon.aws.hooks.bedrock import BedrockAgentHook,
BedrockHook, BedrockRuntimeHook
+from airflow.providers.amazon.aws.hooks.bedrock import (
+ BedrockAgentCoreControlHook,
+ BedrockAgentCoreHook,
+ BedrockAgentHook,
+ BedrockHook,
+ BedrockRuntimeHook,
+)
from airflow.providers.amazon.aws.operators.bedrock import (
BedrockBatchInferenceOperator,
+ BedrockCreateAgentRuntimeOperator,
BedrockCreateDataSourceOperator,
BedrockCreateEvaluationJobOperator,
BedrockCreateGuardrailOperator,
@@ -36,8 +44,10 @@ from airflow.providers.amazon.aws.operators.bedrock import (
BedrockCreateKnowledgeBaseOperator,
BedrockCreateProvisionedModelThroughputOperator,
BedrockCustomizeModelOperator,
+ BedrockDeleteAgentRuntimeOperator,
BedrockDeleteGuardrailOperator,
BedrockIngestDataOperator,
+ BedrockInvokeAgentRuntimeOperator,
BedrockInvokeModelOperator,
BedrockRaGOperator,
BedrockUpdateGuardrailOperator,
@@ -84,6 +94,243 @@ class TestBedrockInvokeModelOperator:
assert response["generation"] == self.GENERATED_RESPONSE
+class TestBedrockCreateAgentRuntimeOperator:
+ AGENT_RUNTIME_ARN =
"arn:aws:bedrock-agentcore:us-east-1:123456789012:runtime/test"
+ AGENT_RUNTIME_ID = "runtime_id"
+ AGENT_RUNTIME_VERSION = "1"
+
+ def setup_method(self):
+ self.operator = BedrockCreateAgentRuntimeOperator(
+ task_id="create_agent_runtime",
+ agent_runtime_name="test-runtime",
+ agent_runtime_artifact={"containerConfiguration": {"containerUri":
"image_uri"}},
+ role_arn="role_arn",
+ network_configuration={"networkMode": "PUBLIC"},
+ create_agent_runtime_kwargs={"description": "test runtime"},
+ )
+ self.operator.defer = mock.MagicMock()
+
+ @pytest.mark.parametrize(
+ ("wait_for_completion", "deferrable"),
+ [
+ pytest.param(False, False, id="no_wait"),
+ pytest.param(True, False, id="wait"),
+ pytest.param(False, True, id="defer"),
+ pytest.param(True, True, id="defer_takes_precedence"),
+ ],
+ )
+ @mock.patch.object(BedrockAgentCoreControlHook, "get_waiter")
+ @mock.patch.object(BedrockAgentCoreControlHook, "conn",
new_callable=mock.PropertyMock)
+ def test_create_agent_runtime_wait_combinations(
+ self,
+ mock_conn,
+ mock_get_waiter,
+ wait_for_completion,
+ deferrable,
+ ):
+ mock_client = mock.MagicMock()
+ mock_conn.return_value = mock_client
+ mock_client.create_agent_runtime.return_value = {
+ "agentRuntimeArn": self.AGENT_RUNTIME_ARN,
+ "agentRuntimeId": self.AGENT_RUNTIME_ID,
+ "agentRuntimeVersion": self.AGENT_RUNTIME_VERSION,
+ "status": "CREATING",
+ }
+ self.operator.wait_for_completion = wait_for_completion
+ self.operator.deferrable = deferrable
+
+ response = self.operator.execute({})
+
+ assert response == self.AGENT_RUNTIME_ARN
+ mock_client.create_agent_runtime.assert_called_once_with(
+ agentRuntimeName="test-runtime",
+ agentRuntimeArtifact={"containerConfiguration": {"containerUri":
"image_uri"}},
+ roleArn="role_arn",
+ networkConfiguration={"networkMode": "PUBLIC"},
+ description="test runtime",
+ )
+ assert self.operator.defer.call_count == deferrable
+
+ if wait_for_completion and not deferrable:
+ mock_get_waiter.assert_called_once_with("agent_runtime_ready")
+ mock_get_waiter.return_value.wait.assert_called_once_with(
+ agentRuntimeId=self.AGENT_RUNTIME_ID,
+ agentRuntimeVersion=self.AGENT_RUNTIME_VERSION,
+ WaiterConfig={"Delay": 60, "MaxAttempts": 20},
+ )
+ else:
+ mock_get_waiter.assert_not_called()
+
+ @mock.patch.object(BedrockAgentCoreControlHook, "conn",
new_callable=mock.PropertyMock)
+ def test_create_agent_runtime_no_extra_kwargs(self, mock_conn):
+ mock_client = mock.MagicMock()
+ mock_conn.return_value = mock_client
+ mock_client.create_agent_runtime.return_value = {
+ "agentRuntimeArn": self.AGENT_RUNTIME_ARN,
+ "agentRuntimeId": self.AGENT_RUNTIME_ID,
+ "agentRuntimeVersion": self.AGENT_RUNTIME_VERSION,
+ "status": "CREATING",
+ }
+ operator = BedrockCreateAgentRuntimeOperator(
+ task_id="create_agent_runtime",
+ agent_runtime_name="test-runtime",
+ agent_runtime_artifact={"containerConfiguration": {"containerUri":
"image_uri"}},
+ role_arn="role_arn",
+ network_configuration={"networkMode": "PUBLIC"},
+ wait_for_completion=False,
+ )
+
+ operator.execute({})
+
+ mock_client.create_agent_runtime.assert_called_once_with(
+ agentRuntimeName="test-runtime",
+ agentRuntimeArtifact={"containerConfiguration": {"containerUri":
"image_uri"}},
+ roleArn="role_arn",
+ networkConfiguration={"networkMode": "PUBLIC"},
+ )
+
+ def test_execute_complete_success(self):
+ result = self.operator.execute_complete(
+ {},
+ {"status": "success", "agent_runtime_arn": self.AGENT_RUNTIME_ARN},
+ )
+
+ assert result == self.AGENT_RUNTIME_ARN
+
+ def test_execute_complete_error(self):
+ with pytest.raises(RuntimeError):
+ self.operator.execute_complete(
+ {},
+ {"status": "error", "message": "failed", "agent_runtime_arn":
self.AGENT_RUNTIME_ARN},
+ )
+
+ def test_template_fields(self):
+ validate_template_fields(self.operator)
+
+
+class TestBedrockInvokeAgentRuntimeOperator:
+ AGENT_RUNTIME_ARN =
"arn:aws:bedrock-agentcore:us-east-1:123456789012:runtime/test"
+
+ @mock.patch.object(BedrockAgentCoreHook, "conn",
new_callable=mock.PropertyMock)
+ def test_invoke_agent_runtime_json_response(self, mock_conn):
+ mock_client = mock.MagicMock()
+ mock_conn.return_value = mock_client
+ mock_client.invoke_agent_runtime.return_value = {
+ "runtimeSessionId": "session_id",
+ "contentType": "application/json",
+ "statusCode": 200,
+ "response": BytesIO(b'{"answer": "hello"}'),
+ "ResponseMetadata": {"HTTPStatusCode": 200},
+ }
+ operator = BedrockInvokeAgentRuntimeOperator(
+ task_id="invoke_agent_runtime",
+ agent_runtime_arn=self.AGENT_RUNTIME_ARN,
+ payload={"prompt": "hello"},
+ invoke_agent_runtime_kwargs={"runtimeSessionId": "session_id"},
+ )
+
+ response = operator.execute({})
+
+ assert response == {
+ "runtimeSessionId": "session_id",
+ "contentType": "application/json",
+ "statusCode": 200,
+ "response": {"answer": "hello"},
+ }
+ mock_client.invoke_agent_runtime.assert_called_once_with(
+ agentRuntimeArn=self.AGENT_RUNTIME_ARN,
+ payload=b'{"prompt": "hello"}',
+ contentType="application/json",
+ accept="application/json",
+ runtimeSessionId="session_id",
+ )
+
+ @mock.patch.object(BedrockAgentCoreHook, "conn",
new_callable=mock.PropertyMock)
+ def test_invoke_agent_runtime_text_response(self, mock_conn):
+ mock_client = mock.MagicMock()
+ mock_conn.return_value = mock_client
+ mock_client.invoke_agent_runtime.return_value = {
+ "contentType": "text/plain",
+ "statusCode": 200,
+ "response": BytesIO(b"hello"),
+ }
+ operator = BedrockInvokeAgentRuntimeOperator(
+ task_id="invoke_agent_runtime",
+ agent_runtime_arn=self.AGENT_RUNTIME_ARN,
+ payload="hello",
+ content_type="text/plain",
+ accept="text/plain",
+ )
+
+ response = operator.execute({})
+
+ assert response["response"] == "hello"
+ mock_client.invoke_agent_runtime.assert_called_once_with(
+ agentRuntimeArn=self.AGENT_RUNTIME_ARN,
+ payload=b"hello",
+ contentType="text/plain",
+ accept="text/plain",
+ )
+
+ @mock.patch.object(BedrockAgentCoreHook, "conn",
new_callable=mock.PropertyMock)
+ def test_invoke_agent_runtime_prunes_none_content_type_and_accept(self,
mock_conn):
+ mock_client = mock.MagicMock()
+ mock_conn.return_value = mock_client
+ mock_client.invoke_agent_runtime.return_value = {
+ "statusCode": 200,
+ "response": BytesIO(b"hello"),
+ }
+ operator = BedrockInvokeAgentRuntimeOperator(
+ task_id="invoke_agent_runtime",
+ agent_runtime_arn=self.AGENT_RUNTIME_ARN,
+ payload="hello",
+ content_type=None,
+ accept=None,
+ )
+
+ operator.execute({})
+
+ mock_client.invoke_agent_runtime.assert_called_once_with(
+ agentRuntimeArn=self.AGENT_RUNTIME_ARN,
+ payload=b"hello",
+ )
+
+ def test_template_fields(self):
+ operator = BedrockInvokeAgentRuntimeOperator(
+ task_id="invoke_agent_runtime",
+ agent_runtime_arn=self.AGENT_RUNTIME_ARN,
+ payload={"prompt": "hello"},
+ )
+
+ validate_template_fields(operator)
+
+
+class TestBedrockDeleteAgentRuntimeOperator:
+ AGENT_RUNTIME_ID = "runtime_id"
+
+ @mock.patch.object(BedrockAgentCoreControlHook, "conn",
new_callable=mock.PropertyMock)
+ def test_delete_agent_runtime(self, mock_conn):
+ mock_client = mock.MagicMock()
+ mock_conn.return_value = mock_client
+ mock_client.delete_agent_runtime.return_value = {}
+ operator = BedrockDeleteAgentRuntimeOperator(
+ task_id="delete_agent_runtime",
+ agent_runtime_id=self.AGENT_RUNTIME_ID,
+ )
+
+ operator.execute({})
+
+
mock_client.delete_agent_runtime.assert_called_once_with(agentRuntimeId=self.AGENT_RUNTIME_ID)
+
+ def test_template_fields(self):
+ validate_template_fields(
+ BedrockDeleteAgentRuntimeOperator(
+ task_id="delete_agent_runtime",
+ agent_runtime_id=self.AGENT_RUNTIME_ID,
+ )
+ )
+
+
class TestBedrockCustomizeModelOperator:
CUSTOMIZE_JOB_ARN = "valid_arn"
CUSTOMIZE_JOB_NAME = "testModelJob"
diff --git a/providers/amazon/tests/unit/amazon/aws/triggers/test_bedrock.py
b/providers/amazon/tests/unit/amazon/aws/triggers/test_bedrock.py
index 963f96211e7..a5f6936fdc1 100644
--- a/providers/amazon/tests/unit/amazon/aws/triggers/test_bedrock.py
+++ b/providers/amazon/tests/unit/amazon/aws/triggers/test_bedrock.py
@@ -21,8 +21,13 @@ from unittest.mock import AsyncMock
import pytest
-from airflow.providers.amazon.aws.hooks.bedrock import BedrockAgentHook,
BedrockHook
+from airflow.providers.amazon.aws.hooks.bedrock import (
+ BedrockAgentCoreControlHook,
+ BedrockAgentHook,
+ BedrockHook,
+)
from airflow.providers.amazon.aws.triggers.bedrock import (
+ BedrockAgentRuntimeReadyTrigger,
BedrockBatchInferenceCompletedTrigger,
BedrockBatchInferenceScheduledTrigger,
BedrockCustomizeModelCompletedTrigger,
@@ -173,6 +178,46 @@ class
TestBedrockIngestionJobTrigger(TestBaseBedrockTrigger):
mock_get_waiter().wait.assert_called_once()
+class TestBedrockAgentRuntimeReadyTrigger(TestBaseBedrockTrigger):
+ EXPECTED_WAITER_NAME = "agent_runtime_ready"
+
+ AGENT_RUNTIME_ID = "runtime_id"
+ AGENT_RUNTIME_VERSION = "1"
+ AGENT_RUNTIME_ARN = "runtime_arn"
+
+ def test_serialization(self):
+ """Assert that arguments and classpath are correctly serialized."""
+ trigger = BedrockAgentRuntimeReadyTrigger(
+ agent_runtime_id=self.AGENT_RUNTIME_ID,
+ agent_runtime_version=self.AGENT_RUNTIME_VERSION,
+ agent_runtime_arn=self.AGENT_RUNTIME_ARN,
+ )
+ classpath, kwargs = trigger.serialize()
+ assert classpath == BASE_TRIGGER_CLASSPATH +
"BedrockAgentRuntimeReadyTrigger"
+ assert kwargs.get("agent_runtime_id") == self.AGENT_RUNTIME_ID
+ assert kwargs.get("agent_runtime_version") ==
self.AGENT_RUNTIME_VERSION
+ assert kwargs.get("agent_runtime_arn") == self.AGENT_RUNTIME_ARN
+
+ @pytest.mark.asyncio
+ @mock.patch.object(BedrockAgentCoreControlHook, "get_waiter")
+ @mock.patch.object(BedrockAgentCoreControlHook, "get_async_conn")
+ async def test_run_success(self, mock_async_conn, mock_get_waiter):
+ mock_async_conn.__aenter__.return_value = mock.MagicMock()
+ mock_get_waiter().wait = AsyncMock()
+ trigger = BedrockAgentRuntimeReadyTrigger(
+ agent_runtime_id=self.AGENT_RUNTIME_ID,
+ agent_runtime_version=self.AGENT_RUNTIME_VERSION,
+ agent_runtime_arn=self.AGENT_RUNTIME_ARN,
+ )
+
+ generator = trigger.run()
+ response = await generator.asend(None)
+
+ assert_expected_waiter_type(mock_get_waiter, self.EXPECTED_WAITER_NAME)
+ assert response == TriggerEvent({"status": "success",
"agent_runtime_arn": self.AGENT_RUNTIME_ARN})
+ mock_get_waiter().wait.assert_called_once()
+
+
class TestBedrockBatchInferenceCompletedTrigger(TestBaseBedrockTrigger):
EXPECTED_WAITER_NAME = "batch_inference_complete"
diff --git
a/providers/amazon/tests/unit/amazon/aws/waiters/test_bedrock_agentcore_control.py
b/providers/amazon/tests/unit/amazon/aws/waiters/test_bedrock_agentcore_control.py
new file mode 100644
index 00000000000..cc2cf479126
--- /dev/null
+++
b/providers/amazon/tests/unit/amazon/aws/waiters/test_bedrock_agentcore_control.py
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+import boto3
+import botocore
+import pytest
+
+from airflow.providers.amazon.aws.hooks.bedrock import
BedrockAgentCoreControlHook
+
+
+class TestBedrockAgentCoreControlCustomWaiters:
+ def test_service_waiters(self):
+ assert "agent_runtime_ready" in
BedrockAgentCoreControlHook().list_waiters()
+
+
+class TestBedrockAgentCoreControlCustomWaitersBase:
+ @pytest.fixture(autouse=True)
+ def mock_conn(self, monkeypatch):
+ self.client = boto3.client("bedrock-agentcore-control",
region_name="us-east-1")
+ monkeypatch.setattr(BedrockAgentCoreControlHook, "conn", self.client)
+
+
+class
TestAgentRuntimeReadyWaiter(TestBedrockAgentCoreControlCustomWaitersBase):
+ WAITER_NAME = "agent_runtime_ready"
+ WAITER_ARGS = {"agentRuntimeId": "runtime_id", "agentRuntimeVersion": "1"}
+ SUCCESS_STATES = ["READY"]
+ FAILURE_STATES = ["CREATE_FAILED", "UPDATE_FAILED", "DELETING"]
+ INTERMEDIATE_STATES = ["CREATING", "UPDATING"]
+
+ @pytest.fixture
+ def mock_getter(self):
+ with mock.patch.object(self.client, "get_agent_runtime") as getter:
+ yield getter
+
+ @pytest.mark.parametrize("state", SUCCESS_STATES)
+ def test_agent_runtime_ready_complete(self, state, mock_getter):
+ mock_getter.return_value = {"status": state}
+
+
BedrockAgentCoreControlHook().get_waiter(self.WAITER_NAME).wait(**self.WAITER_ARGS)
+
+ @pytest.mark.parametrize("state", FAILURE_STATES)
+ def test_agent_runtime_ready_failed(self, state, mock_getter):
+ mock_getter.return_value = {"status": state}
+
+ with pytest.raises(botocore.exceptions.WaiterError):
+
BedrockAgentCoreControlHook().get_waiter(self.WAITER_NAME).wait(**self.WAITER_ARGS)
+
+ @pytest.mark.parametrize("state", INTERMEDIATE_STATES)
+ def test_agent_runtime_ready_wait(self, state, mock_getter):
+ wait = {"status": state}
+ success = {"status": "READY"}
+ mock_getter.side_effect = [wait, wait, success]
+
+ BedrockAgentCoreControlHook().get_waiter(self.WAITER_NAME).wait(
+ **self.WAITER_ARGS,
+ WaiterConfig={"Delay": 0.01, "MaxAttempts": 3},
+ )