This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a4e3fbed1a Introduce Amazon Kinesis Analytics V2 (Managed Service for 
Apache Flink application)  (#40765)
a4e3fbed1a is described below

commit a4e3fbed1a07b5685820a468e2d2ebb986b7d6b4
Author: GPK <[email protected]>
AuthorDate: Thu Jul 18 19:18:48 2024 +0100

    Introduce Amazon Kinesis Analytics V2 (Managed Service for Apache Flink 
application)  (#40765)
---
 .../amazon/aws/hooks/kinesis_analytics.py          |  37 ++
 .../amazon/aws/operators/kinesis_analytics.py      | 348 +++++++++++++++
 .../amazon/aws/sensors/kinesis_analytics.py        | 242 ++++++++++
 .../amazon/aws/triggers/kinesis_analytics.py       |  69 +++
 .../amazon/aws/waiters/kinesisanalyticsv2.json     | 151 +++++++
 airflow/providers/amazon/provider.yaml             |  18 +
 .../operators/kinesis_analytics.rst                | 115 +++++
 .../aws/[email protected]   | Bin 0 -> 16946 bytes
 docs/spelling_wordlist.txt                         |   1 +
 .../amazon/aws/hooks/test_kinesis_analytics.py     |  31 ++
 .../amazon/aws/operators/test_kinesis_analytics.py | 485 +++++++++++++++++++++
 .../amazon/aws/sensors/test_kinesis_analytics.py   | 172 ++++++++
 .../amazon/aws/triggers/test_kinesis_analytics.py  |  78 ++++
 .../amazon/aws/waiters/test_kinesis_analytics.py   | 106 +++++
 .../amazon/aws/example_kinesis_analytics.py        | 272 ++++++++++++
 15 files changed, 2125 insertions(+)

diff --git a/airflow/providers/amazon/aws/hooks/kinesis_analytics.py 
b/airflow/providers/amazon/aws/hooks/kinesis_analytics.py
new file mode 100644
index 0000000000..6f346dec45
--- /dev/null
+++ b/airflow/providers/amazon/aws/hooks/kinesis_analytics.py
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class KinesisAnalyticsV2Hook(AwsBaseHook):
+    """
+    Interact with Amazon Kinesis Analytics V2.
+
+    Provide thin wrapper around 
:external+boto3:py:class:`boto3.client("kinesisanalyticsv2") 
<KinesisAnalyticsV2.Client>`.
+
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        - :class:`airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+    """
+
+    def __init__(self, *args, **kwargs) -> None:
+        kwargs["client_type"] = "kinesisanalyticsv2"
+        super().__init__(*args, **kwargs)
diff --git a/airflow/providers/amazon/aws/operators/kinesis_analytics.py 
b/airflow/providers/amazon/aws/operators/kinesis_analytics.py
new file mode 100644
index 0000000000..727aa714c6
--- /dev/null
+++ b/airflow/providers/amazon/aws/operators/kinesis_analytics.py
@@ -0,0 +1,348 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any, Sequence
+
+from botocore.exceptions import ClientError
+
+from airflow.configuration import conf
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.kinesis_analytics import 
KinesisAnalyticsV2Hook
+from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator
+from airflow.providers.amazon.aws.triggers.kinesis_analytics import (
+    KinesisAnalyticsV2ApplicationOperationCompleteTrigger,
+)
+from airflow.providers.amazon.aws.utils import validate_execute_complete_event
+from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class 
KinesisAnalyticsV2CreateApplicationOperator(AwsBaseOperator[KinesisAnalyticsV2Hook]):
+    """
+    Creates an AWS Managed Service for Apache Flink application.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:KinesisAnalyticsV2CreateApplicationOperator`
+
+    :param application_name: The name of application. (templated)
+    :param runtime_environment: The runtime environment for the application. 
(templated)
+    :param service_execution_role: The IAM role used by the application to 
access services. (templated)
+    :param create_application_kwargs: Create application extra properties. 
(templated)
+    :param application_description: A summary description of the application. 
(templated)
+
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is ``None`` or empty then the default boto3 behaviour is used. 
If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :param region_name: AWS region_name. If not specified then the default 
boto3 behaviour is used.
+    :param verify: Whether to verify SSL certificates. See:
+        
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
+    :param botocore_config: Configuration dictionary (key-values) for botocore 
client. See:
+        
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
+    """
+
+    aws_hook_class = KinesisAnalyticsV2Hook
+    ui_color = "#44b5e2"
+
+    template_fields: Sequence[str] = aws_template_fields(
+        "application_name",
+        "runtime_environment",
+        "service_execution_role",
+        "create_application_kwargs",
+        "application_description",
+    )
+    template_fields_renderers: dict = {
+        "create_application_kwargs": "json",
+    }
+
+    def __init__(
+        self,
+        application_name: str,
+        runtime_environment: str,
+        service_execution_role: str,
+        create_application_kwargs: dict[str, Any] | None = None,
+        application_description: str = "Managed Service for Apache Flink 
application created from Airflow",
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.application_name = application_name
+        self.runtime_environment = runtime_environment
+        self.service_execution_role = service_execution_role
+        self.create_application_kwargs = create_application_kwargs or {}
+        self.application_description = application_description
+
+    def execute(self, context: Context) -> dict[str, str]:
+        self.log.info("Creating AWS Managed Service for Apache Flink 
application %s.", self.application_name)
+        try:
+            response = self.hook.conn.create_application(
+                ApplicationName=self.application_name,
+                ApplicationDescription=self.application_description,
+                RuntimeEnvironment=self.runtime_environment,
+                ServiceExecutionRole=self.service_execution_role,
+                **self.create_application_kwargs,
+            )
+        except ClientError as error:
+            raise AirflowException(
+                f"AWS Managed Service for Apache Flink application creation 
failed: {error.response['Error']['Message']}"
+            )
+
+        self.log.info(
+            "AWS Managed Service for Apache Flink application created 
successfully %s.",
+            self.application_name,
+        )
+
+        return {"ApplicationARN": 
response["ApplicationDetail"]["ApplicationARN"]}
+
+
+class 
KinesisAnalyticsV2StartApplicationOperator(AwsBaseOperator[KinesisAnalyticsV2Hook]):
+    """
+    Starts an AWS Managed Service for Apache Flink application.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:KinesisAnalyticsV2StartApplicationOperator`
+
+    :param application_name: The name of application. (templated)
+    :param run_configuration: Application properties to start Apache Flink 
Job. (templated)
+
+    :param wait_for_completion: Whether to wait for job to stop. (default: 
True)
+    :param waiter_delay: Time in seconds to wait between status checks. 
(default: 60)
+    :param waiter_max_attempts: Maximum number of attempts to check for job 
completion. (default: 20)
+    :param deferrable: If True, the operator will wait asynchronously for the 
job to stop.
+        This implies waiting for completion. This mode requires aiobotocore 
module to be installed.
+        (default: False)
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is ``None`` or empty then the default boto3 behaviour is used. 
If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :param region_name: AWS region_name. If not specified then the default 
boto3 behaviour is used.
+    :param verify: Whether to verify SSL certificates. See:
+        
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
+    :param botocore_config: Configuration dictionary (key-values) for botocore 
client. See:
+        
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
+    """
+
+    aws_hook_class = KinesisAnalyticsV2Hook
+    ui_color = "#44b5e2"
+
+    template_fields: Sequence[str] = aws_template_fields(
+        "application_name",
+        "run_configuration",
+    )
+    template_fields_renderers: dict = {
+        "run_configuration": "json",
+    }
+
+    def __init__(
+        self,
+        application_name: str,
+        run_configuration: dict[str, Any] | None = None,
+        wait_for_completion: bool = True,
+        waiter_delay: int = 60,
+        waiter_max_attempts: int = 20,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.application_name = application_name
+        self.run_configuration = run_configuration or {}
+        self.wait_for_completion = wait_for_completion
+        self.waiter_delay = waiter_delay
+        self.waiter_max_attempts = waiter_max_attempts
+        self.deferrable = deferrable
+
+    def execute(self, context: Context) -> dict[str, Any]:
+        msg = "AWS Managed Service for Apache Flink application"
+
+        try:
+            self.log.info("Starting %s %s.", msg, self.application_name)
+            self.hook.conn.start_application(
+                ApplicationName=self.application_name, 
RunConfiguration=self.run_configuration
+            )
+        except ClientError as error:
+            raise AirflowException(
+                f"Failed to start {msg} {self.application_name}: 
{error.response['Error']['Message']}"
+            )
+
+        describe_response = 
self.hook.conn.describe_application(ApplicationName=self.application_name)
+
+        if self.deferrable:
+            self.log.info("Deferring for %s to start: %s.", msg, 
self.application_name)
+            self.defer(
+                trigger=KinesisAnalyticsV2ApplicationOperationCompleteTrigger(
+                    application_name=self.application_name,
+                    waiter_name="application_start_complete",
+                    aws_conn_id=self.aws_conn_id,
+                    waiter_delay=self.waiter_delay,
+                    waiter_max_attempts=self.waiter_max_attempts,
+                    region_name=self.region_name,
+                    verify=self.verify,
+                    botocore_config=self.botocore_config,
+                ),
+                method_name="execute_complete",
+            )
+        if self.wait_for_completion:
+            self.log.info("Waiting for %s to start: %s.", msg, 
self.application_name)
+
+            self.hook.get_waiter("application_start_complete").wait(
+                ApplicationName=self.application_name,
+                WaiterConfig={"Delay": self.waiter_delay, "MaxAttempts": 
self.waiter_max_attempts},
+            )
+
+        self.log.info("%s started successfully %s.", msg, 
self.application_name)
+
+        return {"ApplicationARN": 
describe_response["ApplicationDetail"]["ApplicationARN"]}
+
+    def execute_complete(self, context: Context, event: dict[str, Any] | None 
= None) -> dict[str, Any]:
+        event = validate_execute_complete_event(event)
+
+        if event["status"] != "success":
+            raise AirflowException(
+                "Error while starting AWS Managed Service for Apache Flink 
application: %s", event
+            )
+
+        response = self.hook.conn.describe_application(
+            ApplicationName=event["application_name"],
+        )
+
+        self.log.info(
+            "AWS Managed Service for Apache Flink application %s started 
successfully.",
+            event["application_name"],
+        )
+
+        return {"ApplicationARN": 
response["ApplicationDetail"]["ApplicationARN"]}
+
+
+class 
KinesisAnalyticsV2StopApplicationOperator(AwsBaseOperator[KinesisAnalyticsV2Hook]):
+    """
+    Stop an AWS Managed Service for Apache Flink application.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:KinesisAnalyticsV2StopApplicationOperator`
+
+    :param application_name: The name of your application. (templated)
+    :param force: Set to true to force the application to stop. If you set 
Force to true, Managed Service for
+        Apache Flink stops the application without taking a snapshot. 
(templated)
+
+    :param wait_for_completion: Whether to wait for job to stop. (default: 
True)
+    :param waiter_delay: Time in seconds to wait between status checks. 
(default: 60)
+    :param waiter_max_attempts: Maximum number of attempts to check for job 
completion. (default: 20)
+    :param deferrable: If True, the operator will wait asynchronously for the 
job to stop.
+        This implies waiting for completion. This mode requires aiobotocore 
module to be installed.
+        (default: False)
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is ``None`` or empty then the default boto3 behaviour is used. 
If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :param region_name: AWS region_name. If not specified then the default 
boto3 behaviour is used.
+    :param verify: Whether to verify SSL certificates. See:
+        
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
+    :param botocore_config: Configuration dictionary (key-values) for botocore 
client. See:
+        
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
+    """
+
+    aws_hook_class = KinesisAnalyticsV2Hook
+    ui_color = "#44b5e2"
+
+    template_fields: Sequence[str] = aws_template_fields(
+        "application_name",
+        "force",
+    )
+
+    def __init__(
+        self,
+        application_name: str,
+        force: bool = False,
+        wait_for_completion: bool = True,
+        waiter_delay: int = 60,
+        waiter_max_attempts: int = 20,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.application_name = application_name
+        self.force = force
+        self.wait_for_completion = wait_for_completion
+        self.waiter_delay = waiter_delay
+        self.waiter_max_attempts = waiter_max_attempts
+        self.deferrable = deferrable
+
+    def execute(self, context: Context) -> dict[str, Any]:
+        msg = "AWS Managed Service for Apache Flink application"
+
+        try:
+            self.log.info("Stopping %s %s.", msg, self.application_name)
+
+            
self.hook.conn.stop_application(ApplicationName=self.application_name, 
Force=self.force)
+        except ClientError as error:
+            raise AirflowException(
+                f"Failed to stop {msg} {self.application_name}: 
{error.response['Error']['Message']}"
+            )
+
+        describe_response = 
self.hook.conn.describe_application(ApplicationName=self.application_name)
+
+        if self.deferrable:
+            self.log.info("Deferring for %s to stop: %s.", msg, 
self.application_name)
+            self.defer(
+                trigger=KinesisAnalyticsV2ApplicationOperationCompleteTrigger(
+                    application_name=self.application_name,
+                    waiter_name="application_stop_complete",
+                    aws_conn_id=self.aws_conn_id,
+                    waiter_delay=self.waiter_delay,
+                    waiter_max_attempts=self.waiter_max_attempts,
+                    region_name=self.region_name,
+                    verify=self.verify,
+                    botocore_config=self.botocore_config,
+                ),
+                method_name="execute_complete",
+            )
+        if self.wait_for_completion:
+            self.log.info("Waiting for %s to stop: %s.", msg, 
self.application_name)
+
+            self.hook.get_waiter("application_stop_complete").wait(
+                ApplicationName=self.application_name,
+                WaiterConfig={"Delay": self.waiter_delay, "MaxAttempts": 
self.waiter_max_attempts},
+            )
+
+        self.log.info("%s stopped successfully %s.", msg, 
self.application_name)
+
+        return {"ApplicationARN": 
describe_response["ApplicationDetail"]["ApplicationARN"]}
+
+    def execute_complete(self, context: Context, event: dict[str, Any] | None 
= None) -> dict[str, Any]:
+        event = validate_execute_complete_event(event)
+
+        if event["status"] != "success":
+            raise AirflowException("Error while stopping AWS Managed Service 
for Apache Flink application")
+
+        response = self.hook.conn.describe_application(
+            ApplicationName=event["application_name"],
+        )
+
+        self.log.info(
+            "AWS Managed Service for Apache Flink application %s stopped 
successfully.",
+            event["application_name"],
+        )
+
+        return {"ApplicationARN": 
response["ApplicationDetail"]["ApplicationARN"]}
diff --git a/airflow/providers/amazon/aws/sensors/kinesis_analytics.py 
b/airflow/providers/amazon/aws/sensors/kinesis_analytics.py
new file mode 100644
index 0000000000..37a2dc733d
--- /dev/null
+++ b/airflow/providers/amazon/aws/sensors/kinesis_analytics.py
@@ -0,0 +1,242 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any, Sequence
+
+from airflow.configuration import conf
+from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.providers.amazon.aws.hooks.kinesis_analytics import 
KinesisAnalyticsV2Hook
+from airflow.providers.amazon.aws.sensors.base_aws import AwsBaseSensor
+from airflow.providers.amazon.aws.triggers.kinesis_analytics import (
+    KinesisAnalyticsV2ApplicationOperationCompleteTrigger,
+)
+from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class 
KinesisAnalyticsV2StartApplicationCompletedSensor(AwsBaseSensor[KinesisAnalyticsV2Hook]):
+    """
+    Waits for AWS Managed Service for Apache Flink application to start.
+
+    .. seealso::
+        For more information on how to use this sensor, take a look at the 
guide:
+        :ref:`howto/sensor:KinesisAnalyticsV2StartApplicationCompletedSensor`
+
+    :param application_name: Application name.
+
+    :param deferrable: If True, the sensor will operate in deferrable mode. 
This mode requires aiobotocore
+        module to be installed.
+        (default: False, but can be overridden in config file by setting 
default_deferrable to True)
+    :param poke_interval: Polling period in seconds to check for the status of 
the job. (default: 120)
+    :param max_retries: Number of times before returning the current state. 
(default: 75)
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is ``None`` or empty then the default boto3 behaviour is used. 
If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :param region_name: AWS region_name. If not specified then the default 
boto3 behaviour is used.
+    :param verify: Whether to verify SSL certificates. See:
+        
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
+    :param botocore_config: Configuration dictionary (key-values) for botocore 
client. See:
+        
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
+
+    """
+
+    aws_hook_class = KinesisAnalyticsV2Hook
+    ui_color = "#66c3ff"
+
+    INTERMEDIATE_STATES: tuple[str, ...] = ("STARTING", "UPDATING", 
"AUTOSCALING")
+    FAILURE_STATES: tuple[str, ...] = (
+        "DELETING",
+        "STOPPING",
+        "READY",
+        "FORCE_STOPPING",
+        "ROLLING_BACK",
+        "MAINTENANCE",
+        "ROLLED_BACK",
+    )
+    SUCCESS_STATES: tuple[str, ...] = ("RUNNING",)
+    FAILURE_MESSAGE = "AWS Managed Service for Apache Flink application start 
failed."
+
+    template_fields: Sequence[str] = aws_template_fields("application_name")
+
+    def __init__(
+        self,
+        *,
+        application_name: str,
+        max_retries: int = 75,
+        poke_interval: int = 120,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.application_name = application_name
+        self.max_retries = max_retries
+        self.poke_interval = poke_interval
+        self.deferrable = deferrable
+
+    def execute(self, context: Context) -> Any:
+        if self.deferrable:
+            self.defer(
+                trigger=KinesisAnalyticsV2ApplicationOperationCompleteTrigger(
+                    application_name=self.application_name,
+                    waiter_name="application_start_complete",
+                    aws_conn_id=self.aws_conn_id,
+                    waiter_delay=int(self.poke_interval),
+                    waiter_max_attempts=self.max_retries,
+                    region_name=self.region_name,
+                    verify=self.verify,
+                    botocore_config=self.botocore_config,
+                ),
+                method_name="poke",
+            )
+        else:
+            super().execute(context=context)
+
+    def poke(self, context: Context, **kwargs) -> bool:
+        status = 
self.hook.conn.describe_application(ApplicationName=self.application_name)[
+            "ApplicationDetail"
+        ]["ApplicationStatus"]
+
+        self.log.info(
+            "Poking for AWS Managed Service for Apache Flink application: %s 
status: %s",
+            self.application_name,
+            status,
+        )
+
+        if status in self.FAILURE_STATES:
+            # TODO: remove this if block when min_airflow_version is set to 
higher than 2.7.1
+            if self.soft_fail:
+                raise AirflowSkipException(self.FAILURE_MESSAGE)
+            raise AirflowException(self.FAILURE_MESSAGE)
+
+        if status in self.SUCCESS_STATES:
+            self.log.info(
+                "AWS Managed Service for Apache Flink application started 
successfully `%s`.",
+                self.application_name,
+            )
+            return True
+
+        return False
+
+
+class 
KinesisAnalyticsV2StopApplicationCompletedSensor(AwsBaseSensor[KinesisAnalyticsV2Hook]):
+    """
+    Waits for AWS Managed Service for Apache Flink application to stop.
+
+    .. seealso::
+        For more information on how to use this sensor, take a look at the 
guide:
+        :ref:`howto/sensor:KinesisAnalyticsV2StopApplicationCompletedSensor`
+
+    :param application_name: Application name.
+
+    :param deferrable: If True, the sensor will operate in deferrable mode. 
This mode requires aiobotocore
+        module to be installed.
+        (default: False, but can be overridden in config file by setting 
default_deferrable to True)
+    :param poke_interval: Polling period in seconds to check for the status of 
the job. (default: 120)
+    :param max_retries: Number of times before returning the current state. 
(default: 75)
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is ``None`` or empty then the default boto3 behaviour is used. 
If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :param region_name: AWS region_name. If not specified then the default 
boto3 behaviour is used.
+    :param verify: Whether to verify SSL certificates. See:
+        
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
+    :param botocore_config: Configuration dictionary (key-values) for botocore 
client. See:
+        
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
+
+    """
+
+    aws_hook_class = KinesisAnalyticsV2Hook
+    ui_color = "#66c3ff"
+
+    INTERMEDIATE_STATES: tuple[str, ...] = (
+        "STARTING",
+        "UPDATING",
+        "AUTOSCALING",
+        "RUNNING",
+        "STOPPING",
+        "FORCE_STOPPING",
+    )
+    FAILURE_STATES: tuple[str, ...] = ("DELETING", "ROLLING_BACK", 
"MAINTENANCE", "ROLLED_BACK")
+    SUCCESS_STATES: tuple[str, ...] = ("READY",)
+    FAILURE_MESSAGE = "AWS Managed Service for Apache Flink application stop 
failed."
+
+    template_fields: Sequence[str] = aws_template_fields("application_name")
+
+    def __init__(
+        self,
+        *,
+        application_name: str,
+        max_retries: int = 75,
+        poke_interval: int = 120,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.application_name = application_name
+        self.max_retries = max_retries
+        self.poke_interval = poke_interval
+        self.deferrable = deferrable
+
+    def execute(self, context: Context) -> Any:
+        if self.deferrable:
+            self.defer(
+                trigger=KinesisAnalyticsV2ApplicationOperationCompleteTrigger(
+                    application_name=self.application_name,
+                    waiter_name="application_stop_complete",
+                    aws_conn_id=self.aws_conn_id,
+                    waiter_delay=int(self.poke_interval),
+                    waiter_max_attempts=self.max_retries,
+                    region_name=self.region_name,
+                    verify=self.verify,
+                    botocore_config=self.botocore_config,
+                ),
+                method_name="poke",
+            )
+        else:
+            super().execute(context=context)
+
+    def poke(self, context: Context, **kwargs) -> bool:
+        status = 
self.hook.conn.describe_application(ApplicationName=self.application_name)[
+            "ApplicationDetail"
+        ]["ApplicationStatus"]
+
+        self.log.info(
+            "Poking for AWS Managed Service for Apache Flink application: %s 
status: %s",
+            self.application_name,
+            status,
+        )
+
+        if status in self.FAILURE_STATES:
+            # TODO: remove this if block when min_airflow_version is set to 
higher than 2.7.1
+            if self.soft_fail:
+                raise AirflowSkipException(self.FAILURE_MESSAGE)
+            raise AirflowException(self.FAILURE_MESSAGE)
+
+        if status in self.SUCCESS_STATES:
+            self.log.info(
+                "AWS Managed Service for Apache Flink application stopped 
successfully `%s`.",
+                self.application_name,
+            )
+            return True
+
+        return False
diff --git a/airflow/providers/amazon/aws/triggers/kinesis_analytics.py 
b/airflow/providers/amazon/aws/triggers/kinesis_analytics.py
new file mode 100644
index 0000000000..195a2285de
--- /dev/null
+++ b/airflow/providers/amazon/aws/triggers/kinesis_analytics.py
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from airflow.providers.amazon.aws.hooks.kinesis_analytics import 
KinesisAnalyticsV2Hook
+from airflow.providers.amazon.aws.triggers.base import AwsBaseWaiterTrigger
+
+if TYPE_CHECKING:
+    from airflow.providers.amazon.aws.hooks.base_aws import AwsGenericHook
+
+
+class 
KinesisAnalyticsV2ApplicationOperationCompleteTrigger(AwsBaseWaiterTrigger):
+    """
+    Trigger when a Managed Service for Apache Flink application Start or Stop 
is complete.
+
+    :param application_name: Application name.
+    :param waiter_name: The name of the waiter for stop or start application.
+    :param waiter_delay: The amount of time in seconds to wait between 
attempts. (default: 120)
+    :param waiter_max_attempts: The maximum number of attempts to be made. 
(default: 75)
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+    """
+
+    def __init__(
+        self,
+        application_name: str,
+        waiter_name: str,
+        waiter_delay: int = 120,
+        waiter_max_attempts: int = 75,
+        aws_conn_id: str | None = "aws_default",
+        **kwargs,
+    ) -> None:
+        super().__init__(
+            serialized_fields={"application_name": application_name, 
"waiter_name": waiter_name},
+            waiter_name=waiter_name,
+            waiter_args={"ApplicationName": application_name},
+            failure_message=f"AWS Managed Service for Apache Flink Application 
{application_name} failed.",
+            status_message=f"Status of AWS Managed Service for Apache Flink 
Application {application_name} is",
+            status_queries=["ApplicationDetail.ApplicationStatus"],
+            return_key="application_name",
+            return_value=application_name,
+            waiter_delay=waiter_delay,
+            waiter_max_attempts=waiter_max_attempts,
+            aws_conn_id=aws_conn_id,
+            **kwargs,
+        )
+
+    def hook(self) -> AwsGenericHook:
+        return KinesisAnalyticsV2Hook(
+            aws_conn_id=self.aws_conn_id,
+            region_name=self.region_name,
+            verify=self.verify,
+            config=self.botocore_config,
+        )
diff --git a/airflow/providers/amazon/aws/waiters/kinesisanalyticsv2.json 
b/airflow/providers/amazon/aws/waiters/kinesisanalyticsv2.json
new file mode 100644
index 0000000000..5f6f0ec6e7
--- /dev/null
+++ b/airflow/providers/amazon/aws/waiters/kinesisanalyticsv2.json
@@ -0,0 +1,151 @@
+{
+    "version": 2,
+    "waiters": {
+        "application_start_complete": {
+            "delay": 120,
+            "maxAttempts": 75,
+            "operation": "DescribeApplication",
+            "acceptors": [
+                {
+                    "matcher": "path",
+                    "argument": "ApplicationDetail.ApplicationStatus",
+                    "expected": "STARTING",
+                    "state": "retry"
+                },
+                {
+                    "matcher": "path",
+                    "argument": "ApplicationDetail.ApplicationStatus",
+                    "expected": "UPDATING",
+                    "state": "retry"
+                },
+                {
+                    "matcher": "path",
+                    "argument": "ApplicationDetail.ApplicationStatus",
+                    "expected": "AUTOSCALING",
+                    "state": "retry"
+                },
+                {
+                    "matcher": "path",
+                    "argument": "ApplicationDetail.ApplicationStatus",
+                    "expected": "DELETING",
+                    "state": "failure"
+                },
+                {
+                    "matcher": "path",
+                    "argument": "ApplicationDetail.ApplicationStatus",
+                    "expected": "STOPPING",
+                    "state": "failure"
+                },
+                {
+                    "matcher": "path",
+                    "argument": "ApplicationDetail.ApplicationStatus",
+                    "expected": "READY",
+                    "state": "failure"
+                },
+                {
+                    "matcher": "path",
+                    "argument": "ApplicationDetail.ApplicationStatus",
+                    "expected": "FORCE_STOPPING",
+                    "state": "failure"
+                },
+                {
+                    "matcher": "path",
+                    "argument": "ApplicationDetail.ApplicationStatus",
+                    "expected": "ROLLING_BACK",
+                    "state": "failure"
+                },
+                {
+                    "matcher": "path",
+                    "argument": "ApplicationDetail.ApplicationStatus",
+                    "expected": "MAINTENANCE",
+                    "state": "failure"
+                },
+                {
+                    "matcher": "path",
+                    "argument": "ApplicationDetail.ApplicationStatus",
+                    "expected": "ROLLED_BACK",
+                    "state": "failure"
+                },
+                {
+                    "matcher": "path",
+                    "argument": "ApplicationDetail.ApplicationStatus",
+                    "expected": "RUNNING",
+                    "state": "success"
+                }
+            ]
+        },
+        "application_stop_complete": {
+            "delay": 120,
+            "maxAttempts": 75,
+            "operation": "DescribeApplication",
+            "acceptors": [
+                {
+                    "matcher": "path",
+                    "argument": "ApplicationDetail.ApplicationStatus",
+                    "expected": "STARTING",
+                    "state": "retry"
+                },
+                {
+                    "matcher": "path",
+                    "argument": "ApplicationDetail.ApplicationStatus",
+                    "expected": "UPDATING",
+                    "state": "retry"
+                },
+                {
+                    "matcher": "path",
+                    "argument": "ApplicationDetail.ApplicationStatus",
+                    "expected": "AUTOSCALING",
+                    "state": "retry"
+                },
+                {
+                    "matcher": "path",
+                    "argument": "ApplicationDetail.ApplicationStatus",
+                    "expected": "RUNNING",
+                    "state": "retry"
+                },
+                {
+                    "matcher": "path",
+                    "argument": "ApplicationDetail.ApplicationStatus",
+                    "expected": "STOPPING",
+                    "state": "retry"
+                },
+                {
+                    "matcher": "path",
+                    "argument": "ApplicationDetail.ApplicationStatus",
+                    "expected": "FORCE_STOPPING",
+                    "state": "retry"
+                },
+                {
+                    "matcher": "path",
+                    "argument": "ApplicationDetail.ApplicationStatus",
+                    "expected": "DELETING",
+                    "state": "failure"
+                },
+                {
+                    "matcher": "path",
+                    "argument": "ApplicationDetail.ApplicationStatus",
+                    "expected": "ROLLING_BACK",
+                    "state": "failure"
+                },
+                {
+                    "matcher": "path",
+                    "argument": "ApplicationDetail.ApplicationStatus",
+                    "expected": "MAINTENANCE",
+                    "state": "failure"
+                },
+                {
+                    "matcher": "path",
+                    "argument": "ApplicationDetail.ApplicationStatus",
+                    "expected": "ROLLED_BACK",
+                    "state": "failure"
+                },
+                {
+                    "matcher": "path",
+                    "argument": "ApplicationDetail.ApplicationStatus",
+                    "expected": "READY",
+                    "state": "success"
+                }
+            ]
+        }
+    }
+}
diff --git a/airflow/providers/amazon/provider.yaml 
b/airflow/providers/amazon/provider.yaml
index 478aeb230c..a7b4d4272f 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -248,6 +248,12 @@ integrations:
     external-doc-url: https://aws.amazon.com/kinesis/data-firehose/
     logo: /integration-logos/aws/[email protected]
     tags: [aws]
+  - integration-name: Amazon Managed Service for Apache Flink
+    how-to-guide:
+      - /docs/apache-airflow-providers-amazon/operators/kinesis_analytics.rst
+    external-doc-url: https://aws.amazon.com/managed-service-apache-flink/
+    logo: /integration-logos/aws/[email protected]
+    tags: [aws]
   - integration-name: Amazon OpenSearch Serverless
     how-to-guide:
       - 
/docs/apache-airflow-providers-amazon/operators/opensearchserverless.rst
@@ -435,6 +441,9 @@ operators:
   - integration-name: AWS Lambda
     python-modules:
       - airflow.providers.amazon.aws.operators.lambda_function
+  - integration-name: Amazon Managed Service for Apache Flink
+    python-modules:
+      - airflow.providers.amazon.aws.operators.kinesis_analytics
   - integration-name: Amazon Simple Storage Service (S3)
     python-modules:
       - airflow.providers.amazon.aws.operators.s3
@@ -521,6 +530,9 @@ sensors:
   - integration-name: AWS Lambda
     python-modules:
       - airflow.providers.amazon.aws.sensors.lambda_function
+  - integration-name: Amazon Managed Service for Apache Flink
+    python-modules:
+      - airflow.providers.amazon.aws.sensors.kinesis_analytics
   - integration-name: Amazon OpenSearch Serverless
     python-modules:
       - airflow.providers.amazon.aws.sensors.opensearch_serverless
@@ -624,6 +636,9 @@ hooks:
   - integration-name: AWS Lambda
     python-modules:
       - airflow.providers.amazon.aws.hooks.lambda_function
+  - integration-name: Amazon Managed Service for Apache Flink
+    python-modules:
+      - airflow.providers.amazon.aws.hooks.kinesis_analytics
   - integration-name: Amazon CloudWatch Logs
     python-modules:
       - airflow.providers.amazon.aws.hooks.logs
@@ -703,6 +718,9 @@ triggers:
   - integration-name: AWS Lambda
     python-modules:
       - airflow.providers.amazon.aws.triggers.lambda_function
+  - integration-name: Amazon Managed Service for Apache Flink
+    python-modules:
+      - airflow.providers.amazon.aws.triggers.kinesis_analytics
   - integration-name: Amazon OpenSearch Serverless
     python-modules:
       - airflow.providers.amazon.aws.triggers.opensearch_serverless
diff --git 
a/docs/apache-airflow-providers-amazon/operators/kinesis_analytics.rst 
b/docs/apache-airflow-providers-amazon/operators/kinesis_analytics.rst
new file mode 100644
index 0000000000..d7156feabb
--- /dev/null
+++ b/docs/apache-airflow-providers-amazon/operators/kinesis_analytics.rst
@@ -0,0 +1,115 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+========================================
+Amazon Managed Service for Apache Flink
+========================================
+
+`Amazon Managed Service for Apache Flink 
<https://aws.amazon.com/managed-service-apache-flink/>`__ is a fully managed 
service that you can use to process and analyze streaming data using
+Java, Python, SQL, or Scala. The service enables you to quickly author and run 
Java, SQL, or Scala code against streaming sources to perform time series 
analytics,
+feed real-time dashboards, and create real-time metrics.
+
+Prerequisite Tasks
+------------------
+
+.. include:: ../_partials/prerequisite_tasks.rst
+
+Generic Parameters
+------------------
+
+.. include:: ../_partials/generic_parameters.rst
+
+Operators
+---------
+
+.. _howto/operator:KinesisAnalyticsV2CreateApplicationOperator:
+
+Create an Amazon Managed Service for Apache Flink Application
+==============================================================
+
+To create an Amazon Managed Service for Apache Flink application, you can use
+:class:`~airflow.providers.amazon.aws.operators.kinesis_analytics.KinesisAnalyticsV2CreateApplicationOperator`.
+
+.. exampleinclude:: 
/../../tests/system/providers/amazon/aws/example_kinesis_analytics.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_create_application]
+    :end-before: [END howto_operator_create_application]
+
+.. _howto/operator:KinesisAnalyticsV2StartApplicationOperator:
+
+Start an Amazon Managed Service for Apache Flink Application
+=============================================================
+
+To start an Amazon Managed Service for Apache Flink application, you can use
+:class:`~airflow.providers.amazon.aws.operators.kinesis_analytics.KinesisAnalyticsV2StartApplicationOperator`.
+
+.. exampleinclude:: 
/../../tests/system/providers/amazon/aws/example_kinesis_analytics.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_start_application]
+    :end-before: [END howto_operator_start_application]
+
+.. _howto/operator:KinesisAnalyticsV2StopApplicationOperator:
+
+Stop an Amazon Managed Service for Apache Flink Application
+=============================================================
+
+To stop an Amazon Managed Service for Apache Flink application, you can use
+:class:`~airflow.providers.amazon.aws.operators.kinesis_analytics.KinesisAnalyticsV2StopApplicationOperator`.
+
+.. exampleinclude:: 
/../../tests/system/providers/amazon/aws/example_kinesis_analytics.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_stop_application]
+    :end-before: [END howto_operator_stop_application]
+
+Sensors
+-------
+
+.. _howto/sensor:KinesisAnalyticsV2StartApplicationCompletedSensor:
+
+Wait for an Amazon Managed Service for Apache Flink Application to start
+========================================================================
+
+To wait on the state of an Amazon Managed Service for Apache Flink Application 
to start you can use
+:class:`~airflow.providers.amazon.aws.sensors.kinesis_analytics.KinesisAnalyticsV2StartApplicationCompletedSensor`.
+
+.. exampleinclude:: 
/../../tests/system/providers/amazon/aws/example_kinesis_analytics.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_sensor_start_application]
+    :end-before: [END howto_sensor_start_application]
+
+.. _howto/sensor:KinesisAnalyticsV2StopApplicationCompletedSensor:
+
+Wait for an Amazon Managed Service for Apache Flink Application to stop
+========================================================================
+
+To wait on the state of an Amazon Managed Service for Apache Flink Application 
to stop you can use
+:class:`~airflow.providers.amazon.aws.sensors.kinesis_analytics.KinesisAnalyticsV2StopApplicationCompletedSensor`.
+
+.. exampleinclude:: 
/../../tests/system/providers/amazon/aws/example_kinesis_analytics.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_sensor_stop_application]
+    :end-before: [END howto_sensor_stop_application]
+
+Reference
+---------
+
+* `AWS boto3 library documentation for Amazon Managed Service for Apache Flink 
(Kinesis Analytics) 
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesisanalyticsv2.html>`__
diff --git 
a/docs/integration-logos/aws/[email protected] 
b/docs/integration-logos/aws/[email protected]
new file mode 100644
index 0000000000..0d22a7c5f2
Binary files /dev/null and 
b/docs/integration-logos/aws/[email protected] differ
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 99589be246..b8b3c60a0d 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -906,6 +906,7 @@ KiB
 Kibana
 killMode
 Kinesis
+kinesis
 kinit
 kms
 knn
diff --git a/tests/providers/amazon/aws/hooks/test_kinesis_analytics.py 
b/tests/providers/amazon/aws/hooks/test_kinesis_analytics.py
new file mode 100644
index 0000000000..4ed152cd3e
--- /dev/null
+++ b/tests/providers/amazon/aws/hooks/test_kinesis_analytics.py
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+
+from airflow.providers.amazon.aws.hooks.kinesis_analytics import 
KinesisAnalyticsV2Hook
+
+
+class TestKinesisAnalyticsV2Hook:
+    @pytest.mark.parametrize(
+        "test_hook, service_name",
+        [pytest.param(KinesisAnalyticsV2Hook(), "kinesisanalyticsv2", 
id="kinesisanalyticsv2")],
+    )
+    def test_kinesis_analytics_v2_hook(self, test_hook, service_name):
+        kinesis_analytics_hook = KinesisAnalyticsV2Hook()
+        assert kinesis_analytics_hook.conn is not None
diff --git a/tests/providers/amazon/aws/operators/test_kinesis_analytics.py 
b/tests/providers/amazon/aws/operators/test_kinesis_analytics.py
new file mode 100644
index 0000000000..36734d4f56
--- /dev/null
+++ b/tests/providers/amazon/aws/operators/test_kinesis_analytics.py
@@ -0,0 +1,485 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Generator
+from unittest import mock
+
+import pytest
+from boto3 import client
+from moto import mock_aws
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.kinesis_analytics import 
KinesisAnalyticsV2Hook
+from airflow.providers.amazon.aws.operators.kinesis_analytics import (
+    KinesisAnalyticsV2CreateApplicationOperator,
+    KinesisAnalyticsV2StartApplicationOperator,
+    KinesisAnalyticsV2StopApplicationOperator,
+)
+
+if TYPE_CHECKING:
+    from airflow.providers.amazon.aws.hooks.base_aws import BaseAwsConnection
+
+
+class TestKinesisAnalyticsV2CreateApplicationOperator:
+    APPLICATION_ARN = 
"arn:aws:kinesisanalytics:us-east-1:918313644466:application/demo"
+    ROLE_ARN = "arn:aws:iam::123456789012:role/KinesisExecutionRole"
+
+    @pytest.fixture
+    def mock_conn(self) -> Generator[BaseAwsConnection, None, None]:
+        with mock.patch.object(KinesisAnalyticsV2Hook, "conn") as _conn:
+            _conn.create_application.return_value = {
+                "ApplicationDetail": {"ApplicationARN": self.APPLICATION_ARN}
+            }
+            yield _conn
+
+    @pytest.fixture
+    def kinesis_analytics_v2_hook(self) -> Generator[KinesisAnalyticsV2Hook, 
None, None]:
+        with mock_aws():
+            hook = KinesisAnalyticsV2Hook(aws_conn_id="aws_default")
+            yield hook
+
+    def test_init(self):
+        op = KinesisAnalyticsV2CreateApplicationOperator(
+            task_id="create_application_operator",
+            application_name="demo",
+            runtime_environment="FLINK_18_9",
+            service_execution_role=self.ROLE_ARN,
+            aws_conn_id="fake-conn-id",
+            region_name="eu-west-2",
+            verify=True,
+            botocore_config={"read_timeout": 42},
+        )
+
+        assert op.application_name == "demo"
+        assert op.runtime_environment == "FLINK_18_9"
+        assert op.service_execution_role == self.ROLE_ARN
+        assert op.hook.client_type == "kinesisanalyticsv2"
+        assert op.hook.resource_type is None
+        assert op.hook.aws_conn_id == "fake-conn-id"
+        assert op.hook._region_name == "eu-west-2"
+        assert op.hook._verify is True
+        assert op.hook._config is not None
+        assert op.hook._config.read_timeout == 42
+
+        op = KinesisAnalyticsV2CreateApplicationOperator(
+            task_id="create_application_operator",
+            application_name="demo",
+            runtime_environment="FLINK_18_9",
+            service_execution_role="arn",
+        )
+
+        assert op.hook.aws_conn_id == "aws_default"
+        assert op.hook._region_name is None
+        assert op.hook._verify is None
+        assert op.hook._config is None
+
+    @mock.patch.object(KinesisAnalyticsV2Hook, "conn")
+    def test_create_application(self, kinesis_analytics_mock_conn):
+        self.op = KinesisAnalyticsV2CreateApplicationOperator(
+            task_id="create_application_operator",
+            application_name="demo",
+            application_description="demo app",
+            runtime_environment="FLINK_18_9",
+            service_execution_role=self.ROLE_ARN,
+            create_application_kwargs={
+                "ApplicationConfiguration": {
+                    "FlinkApplicationConfiguration": {
+                        "ParallelismConfiguration": {
+                            "ConfigurationType": "CUSTOM",
+                            "Parallelism": 2,
+                            "ParallelismPerKPU": 1,
+                            "AutoScalingEnabled": True,
+                        }
+                    }
+                }
+            },
+        )
+        self.op.execute({})
+        kinesis_analytics_mock_conn.create_application.assert_called_once_with(
+            ApplicationName="demo",
+            ApplicationDescription="demo app",
+            RuntimeEnvironment="FLINK_18_9",
+            ServiceExecutionRole=self.ROLE_ARN,
+            ApplicationConfiguration={
+                "FlinkApplicationConfiguration": {
+                    "ParallelismConfiguration": {
+                        "ConfigurationType": "CUSTOM",
+                        "Parallelism": 2,
+                        "ParallelismPerKPU": 1,
+                        "AutoScalingEnabled": True,
+                    }
+                }
+            },
+        )
+
+    @mock.patch.object(KinesisAnalyticsV2Hook, "conn")
+    def test_create_application_throw_error_when_invalid_arguments_provided(
+        self, kinesis_analytics_mock_conn
+    ):
+        operator = KinesisAnalyticsV2CreateApplicationOperator(
+            task_id="create_application_operator",
+            application_name="demo",
+            runtime_environment="FLINK_18_9",
+            service_execution_role=self.ROLE_ARN,
+            create_application_kwargs={"AppId": {"code": 
"s3://test/flink.jar"}},
+            aws_conn_id="fake-conn-id",
+            region_name="eu-west-2",
+            verify=True,
+            botocore_config={"read_timeout": 42},
+        )
+
+        operator.defer = mock.MagicMock()
+        error_message = "Invalid arguments provided"
+
+        err_response = {"Error": {"Code": "InvalidArgumentException", 
"Message": error_message}}
+
+        exception = client("kinesisanalyticsv2").exceptions.ClientError(
+            err_response, operation_name="CreateApplication"
+        )
+        returned_exception = type(exception)
+
+        kinesis_analytics_mock_conn.exceptions.InvalidArgumentException = 
returned_exception
+        kinesis_analytics_mock_conn.create_application.side_effect = exception
+
+        with pytest.raises(AirflowException, match=error_message):
+            operator.execute({})
+
+
+class TestKinesisAnalyticsV2StartApplicationOperator:
+    APPLICATION_ARN = 
"arn:aws:kinesisanalytics:us-east-1:123456789012:application/demo"
+    ROLE_ARN = "arn:aws:iam::123456789012:role/KinesisExecutionRole"
+    RUN_CONFIGURATION = {"FlinkRunConfiguration": {"AllowNonRestoredState": 
True}}
+
+    @pytest.fixture
+    def mock_conn(self) -> Generator[BaseAwsConnection, None, None]:
+        with mock.patch.object(KinesisAnalyticsV2Hook, "conn") as _conn:
+            _conn.start_application.return_value = {}
+            _conn.describe_application.return_value = {
+                "ApplicationDetail": {"ApplicationARN": self.APPLICATION_ARN}
+            }
+            yield _conn
+
+    @pytest.fixture
+    def kinesis_analytics_v2_hook(self) -> Generator[KinesisAnalyticsV2Hook, 
None, None]:
+        with mock_aws():
+            hook = KinesisAnalyticsV2Hook(aws_conn_id="aws_default")
+            yield hook
+
+    def setup_method(self):
+        self.operator = KinesisAnalyticsV2StartApplicationOperator(
+            task_id="start_application_operator",
+            application_name="demo",
+            run_configuration=self.RUN_CONFIGURATION,
+            aws_conn_id="fake-conn-id",
+            region_name="eu-west-2",
+            verify=True,
+            botocore_config={"read_timeout": 42},
+        )
+
+        self.operator.defer = mock.MagicMock()
+
+    def test_init(self):
+        op = KinesisAnalyticsV2StartApplicationOperator(
+            task_id="start_application_operator",
+            application_name="demo",
+            run_configuration=self.RUN_CONFIGURATION,
+            aws_conn_id="fake-conn-id",
+            region_name="eu-west-2",
+            verify=True,
+            botocore_config={"read_timeout": 42},
+        )
+
+        assert op.application_name == "demo"
+        assert op.run_configuration == self.RUN_CONFIGURATION
+        assert op.hook.client_type == "kinesisanalyticsv2"
+        assert op.hook.resource_type is None
+        assert op.hook.aws_conn_id == "fake-conn-id"
+        assert op.hook._region_name == "eu-west-2"
+        assert op.hook._verify is True
+        assert op.hook._config is not None
+        assert op.hook._config.read_timeout == 42
+
+        op = KinesisAnalyticsV2StartApplicationOperator(
+            task_id="start_application_operator",
+            application_name="demo",
+            run_configuration=self.RUN_CONFIGURATION,
+        )
+
+        assert op.hook.aws_conn_id == "aws_default"
+        assert op.hook._region_name is None
+        assert op.hook._verify is None
+        assert op.hook._config is None
+
+    @mock.patch.object(KinesisAnalyticsV2Hook, "conn")
+    def test_start_application(self, kinesis_analytics_mock_conn):
+        kinesis_analytics_mock_conn.describe_application.return_value = {
+            "ApplicationDetail": {"ApplicationARN": self.APPLICATION_ARN}
+        }
+        kinesis_analytics_mock_conn.start_application.return_value = {}
+
+        self.op = KinesisAnalyticsV2StartApplicationOperator(
+            task_id="start_application_operator",
+            application_name="demo",
+            run_configuration=self.RUN_CONFIGURATION,
+        )
+        self.op.wait_for_completion = False
+        response = self.op.execute({})
+
+        assert response == {"ApplicationARN": self.APPLICATION_ARN}
+
+        kinesis_analytics_mock_conn.start_application.assert_called_once_with(
+            ApplicationName="demo", RunConfiguration=self.RUN_CONFIGURATION
+        )
+
+    @pytest.mark.parametrize(
+        "wait_for_completion, deferrable",
+        [
+            pytest.param(False, False, id="no_wait"),
+            pytest.param(True, False, id="wait"),
+            pytest.param(False, True, id="defer"),
+        ],
+    )
+    @mock.patch.object(KinesisAnalyticsV2Hook, "get_waiter")
+    def test_start_application_wait_combinations(
+        self, _, wait_for_completion, deferrable, mock_conn, 
kinesis_analytics_v2_hook
+    ):
+        self.operator.wait_for_completion = wait_for_completion
+        self.operator.deferrable = deferrable
+
+        response = self.operator.execute({})
+
+        assert response == {"ApplicationARN": self.APPLICATION_ARN}
+        assert kinesis_analytics_v2_hook.get_waiter.call_count == 
wait_for_completion
+        assert self.operator.defer.call_count == deferrable
+
+    @mock.patch.object(KinesisAnalyticsV2Hook, "conn")
+    def test_start_application_throw_error_when_invalid_config_provided(self, 
kinesis_analytics_mock_conn):
+        operator = KinesisAnalyticsV2StartApplicationOperator(
+            task_id="start_application_operator",
+            application_name="demo",
+            run_configuration={
+                "ApplicationRestoreConfiguration": {
+                    "ApplicationRestoreType": "SKIP_RESTORE",
+                }
+            },
+            aws_conn_id="fake-conn-id",
+            region_name="eu-west-2",
+            verify=True,
+            botocore_config={"read_timeout": 42},
+        )
+
+        operator.defer = mock.MagicMock()
+        error_message = "Invalid config provided"
+
+        err_response = {
+            "Error": {"Code": "InvalidApplicationConfigurationException", 
"Message": error_message}
+        }
+
+        exception = client("kinesisanalyticsv2").exceptions.ClientError(
+            err_response, operation_name="StartApplication"
+        )
+        returned_exception = type(exception)
+
+        kinesis_analytics_mock_conn.exceptions.InvalidArgumentException = 
returned_exception
+        kinesis_analytics_mock_conn.start_application.side_effect = exception
+
+        with pytest.raises(AirflowException, match=error_message):
+            operator.execute({})
+
+    @mock.patch.object(KinesisAnalyticsV2Hook, "conn")
+    def test_execute_complete(self, kinesis_analytics_mock_conn):
+        kinesis_analytics_mock_conn.describe_application.return_value = {
+            "ApplicationDetail": {"ApplicationARN": self.APPLICATION_ARN}
+        }
+
+        event = {"status": "success", "application_name": "demo"}
+
+        response = self.operator.execute_complete(context=None, event=event)
+
+        assert {"ApplicationARN": self.APPLICATION_ARN} == response
+
+    @mock.patch.object(KinesisAnalyticsV2Hook, "conn")
+    def test_execute_complete_failure(self, kinesis_analytics_mock_conn):
+        kinesis_analytics_mock_conn.describe_application.return_value = {
+            "ApplicationDetail": {"ApplicationARN": self.APPLICATION_ARN}
+        }
+
+        event = {"status": "error", "application_name": "demo"}
+
+        with pytest.raises(
+            AirflowException,
+            match="Error while starting AWS Managed Service for Apache Flink 
application",
+        ):
+            self.operator.execute_complete(context=None, event=event)
+
+
+class TestKinesisAnalyticsV2StopApplicationOperator:
+    APPLICATION_ARN = 
"arn:aws:kinesisanalytics:us-east-1:123456789012:application/demo"
+    ROLE_ARN = "arn:aws:iam::123456789012:role/KinesisExecutionRole"
+
+    @pytest.fixture
+    def mock_conn(self) -> Generator[BaseAwsConnection, None, None]:
+        with mock.patch.object(KinesisAnalyticsV2Hook, "conn") as _conn:
+            _conn.stop_application.return_value = {}
+            _conn.describe_application.return_value = {
+                "ApplicationDetail": {"ApplicationARN": self.APPLICATION_ARN}
+            }
+            yield _conn
+
+    @pytest.fixture
+    def kinesis_analytics_v2_hook(self) -> Generator[KinesisAnalyticsV2Hook, 
None, None]:
+        with mock_aws():
+            hook = KinesisAnalyticsV2Hook(aws_conn_id="aws_default")
+            yield hook
+
+    def setup_method(self):
+        self.operator = KinesisAnalyticsV2StopApplicationOperator(
+            task_id="stop_application_operator",
+            application_name="demo",
+            force=False,
+            aws_conn_id="fake-conn-id",
+            region_name="eu-west-2",
+            verify=True,
+            botocore_config={"read_timeout": 42},
+        )
+
+        self.operator.defer = mock.MagicMock()
+
+    def test_init(self):
+        op = KinesisAnalyticsV2StopApplicationOperator(
+            task_id="stop_application_operator",
+            application_name="demo",
+            force=False,
+            aws_conn_id="fake-conn-id",
+            region_name="eu-west-2",
+            verify=True,
+            botocore_config={"read_timeout": 42},
+        )
+
+        assert op.application_name == "demo"
+        assert op.force is False
+        assert op.hook.client_type == "kinesisanalyticsv2"
+        assert op.hook.resource_type is None
+        assert op.hook.aws_conn_id == "fake-conn-id"
+        assert op.hook._region_name == "eu-west-2"
+        assert op.hook._verify is True
+        assert op.hook._config is not None
+        assert op.hook._config.read_timeout == 42
+
+        op = KinesisAnalyticsV2StopApplicationOperator(
+            task_id="stop_application_operator",
+            application_name="demo",
+            force=False,
+        )
+
+        assert op.hook.aws_conn_id == "aws_default"
+        assert op.hook._region_name is None
+        assert op.hook._verify is None
+        assert op.hook._config is None
+
+    @mock.patch.object(KinesisAnalyticsV2Hook, "conn")
+    def test_stop_application(self, kinesis_analytics_mock_conn):
+        kinesis_analytics_mock_conn.describe_application.return_value = {
+            "ApplicationDetail": {"ApplicationARN": self.APPLICATION_ARN}
+        }
+        kinesis_analytics_mock_conn.stop_application.return_value = {}
+
+        self.op = KinesisAnalyticsV2StopApplicationOperator(
+            task_id="stop_application_operator", application_name="demo", 
force=False
+        )
+        self.op.wait_for_completion = False
+        response = self.op.execute({})
+
+        assert response == {"ApplicationARN": self.APPLICATION_ARN}
+
+        kinesis_analytics_mock_conn.stop_application.assert_called_once_with(
+            ApplicationName="demo", Force=False
+        )
+
+    @pytest.mark.parametrize(
+        "wait_for_completion, deferrable",
+        [
+            pytest.param(False, False, id="no_wait"),
+            pytest.param(True, False, id="wait"),
+            pytest.param(False, True, id="defer"),
+        ],
+    )
+    @mock.patch.object(KinesisAnalyticsV2Hook, "get_waiter")
+    def test_stop_application_wait_combinations(
+        self, _, wait_for_completion, deferrable, mock_conn, 
kinesis_analytics_v2_hook
+    ):
+        self.operator.wait_for_completion = wait_for_completion
+        self.operator.deferrable = deferrable
+
+        response = self.operator.execute({})
+
+        assert response == {"ApplicationARN": self.APPLICATION_ARN}
+        assert kinesis_analytics_v2_hook.get_waiter.call_count == 
wait_for_completion
+        assert self.operator.defer.call_count == deferrable
+
+    @mock.patch.object(KinesisAnalyticsV2Hook, "conn")
+    def test_stop_application_throw_error_when_invalid_config_provided(self, 
kinesis_analytics_mock_conn):
+        operator = KinesisAnalyticsV2StopApplicationOperator(
+            task_id="stop_application_operator",
+            application_name="demo",
+            force=False,
+            aws_conn_id="fake-conn-id",
+            region_name="eu-west-2",
+            verify=True,
+            botocore_config={"read_timeout": 42},
+        )
+
+        operator.defer = mock.MagicMock()
+        error_message = "resource not found"
+
+        err_response = {"Error": {"Code": "ResourceNotFoundException", 
"Message": error_message}}
+
+        exception = client("kinesisanalyticsv2").exceptions.ClientError(
+            err_response, operation_name="StopApplication"
+        )
+        returned_exception = type(exception)
+
+        kinesis_analytics_mock_conn.exceptions.ResourceNotFoundException = 
returned_exception
+        kinesis_analytics_mock_conn.stop_application.side_effect = exception
+
+        with pytest.raises(AirflowException, match=error_message):
+            operator.execute({})
+
+    @mock.patch.object(KinesisAnalyticsV2Hook, "conn")
+    def test_execute_complete(self, kinesis_analytics_mock_conn):
+        kinesis_analytics_mock_conn.describe_application.return_value = {
+            "ApplicationDetail": {"ApplicationARN": self.APPLICATION_ARN}
+        }
+
+        event = {"status": "success", "application_name": "demo"}
+
+        response = self.operator.execute_complete(context=None, event=event)
+
+        assert {"ApplicationARN": self.APPLICATION_ARN} == response
+
+    @mock.patch.object(KinesisAnalyticsV2Hook, "conn")
+    def test_execute_complete_failure(self, kinesis_analytics_mock_conn):
+        kinesis_analytics_mock_conn.describe_application.return_value = {
+            "ApplicationDetail": {"ApplicationARN": self.APPLICATION_ARN}
+        }
+        event = {"status": "error", "application_name": "demo"}
+
+        with pytest.raises(
+            AirflowException, match="Error while stopping AWS Managed Service 
for Apache Flink application"
+        ):
+            self.operator.execute_complete(context=None, event=event)
diff --git a/tests/providers/amazon/aws/sensors/test_kinesis_analytics.py 
b/tests/providers/amazon/aws/sensors/test_kinesis_analytics.py
new file mode 100644
index 0000000000..73a2cfdb77
--- /dev/null
+++ b/tests/providers/amazon/aws/sensors/test_kinesis_analytics.py
@@ -0,0 +1,172 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+import pytest
+
+from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.providers.amazon.aws.hooks.kinesis_analytics import 
KinesisAnalyticsV2Hook
+from airflow.providers.amazon.aws.sensors.kinesis_analytics import (
+    KinesisAnalyticsV2StartApplicationCompletedSensor,
+    KinesisAnalyticsV2StopApplicationCompletedSensor,
+)
+
+
+class TestKinesisAnalyticsV2StartApplicationCompletedSensor:
+    SENSOR = KinesisAnalyticsV2StartApplicationCompletedSensor
+    APPLICATION_ARN = 
"arn:aws:kinesisanalytics:us-east-1:123456789012:application/demo"
+
+    def setup_method(self):
+        self.default_op_kwargs = dict(
+            task_id="start_application_sensor",
+            application_name="demo",
+            poke_interval=5,
+            max_retries=1,
+        )
+        self.sensor = self.SENSOR(**self.default_op_kwargs, aws_conn_id=None)
+
+    def test_base_aws_op_attributes(self):
+        op = self.SENSOR(**self.default_op_kwargs)
+        assert op.hook.aws_conn_id == "aws_default"
+        assert op.hook._region_name is None
+        assert op.hook._verify is None
+        assert op.hook._config is None
+
+        op = self.SENSOR(
+            **self.default_op_kwargs,
+            aws_conn_id="aws-test-custom-conn",
+            region_name="eu-west-1",
+            verify=False,
+            botocore_config={"read_timeout": 42},
+        )
+        assert op.hook.aws_conn_id == "aws-test-custom-conn"
+        assert op.hook._region_name == "eu-west-1"
+        assert op.hook._verify is False
+        assert op.hook._config is not None
+        assert op.hook._config.read_timeout == 42
+
+    @pytest.mark.parametrize("state", SENSOR.SUCCESS_STATES)
+    @mock.patch.object(KinesisAnalyticsV2Hook, "conn")
+    def test_poke_success_state(self, mock_conn, state):
+        mock_conn.describe_application.return_value = {
+            "ApplicationDetail": {"ApplicationARN": self.APPLICATION_ARN, 
"ApplicationStatus": state}
+        }
+
+        assert self.sensor.poke({}) is True
+
+    @pytest.mark.parametrize("state", SENSOR.INTERMEDIATE_STATES)
+    @mock.patch.object(KinesisAnalyticsV2Hook, "conn")
+    def test_intermediate_state(self, mock_conn, state):
+        mock_conn.describe_application.return_value = {
+            "ApplicationDetail": {"ApplicationARN": self.APPLICATION_ARN, 
"ApplicationStatus": state}
+        }
+        assert self.sensor.poke({}) is False
+
+    @pytest.mark.parametrize(
+        "soft_fail, expected_exception",
+        [
+            pytest.param(False, AirflowException, id="not-soft-fail"),
+            pytest.param(True, AirflowSkipException, id="soft-fail"),
+        ],
+    )
+    @pytest.mark.parametrize("state", SENSOR.FAILURE_STATES)
+    @mock.patch.object(KinesisAnalyticsV2Hook, "conn")
+    def test_poke_failure_states(self, mock_conn, state, soft_fail, 
expected_exception):
+        mock_conn.describe_application.return_value = {
+            "ApplicationDetail": {"ApplicationARN": self.APPLICATION_ARN, 
"ApplicationStatus": state}
+        }
+
+        sensor = self.SENSOR(**self.default_op_kwargs, aws_conn_id=None, 
soft_fail=soft_fail)
+
+        with pytest.raises(
+            expected_exception, match="AWS Managed Service for Apache Flink 
application start failed"
+        ):
+            sensor.poke({})
+
+
+class TestKinesisAnalyticsV2StopApplicationCompletedSensor:
+    SENSOR = KinesisAnalyticsV2StopApplicationCompletedSensor
+    APPLICATION_ARN = 
"arn:aws:kinesisanalytics:us-east-1:123456789012:application/demo"
+
+    def setup_method(self):
+        self.default_op_kwargs = dict(
+            task_id="stop_application_sensor",
+            application_name="demo",
+            poke_interval=5,
+            max_retries=1,
+        )
+        self.sensor = self.SENSOR(**self.default_op_kwargs, aws_conn_id=None)
+
+    def test_base_aws_op_attributes(self):
+        op = self.SENSOR(**self.default_op_kwargs)
+        assert op.hook.aws_conn_id == "aws_default"
+        assert op.hook._region_name is None
+        assert op.hook._verify is None
+        assert op.hook._config is None
+
+        op = self.SENSOR(
+            **self.default_op_kwargs,
+            aws_conn_id="aws-test-custom-conn",
+            region_name="eu-west-1",
+            verify=False,
+            botocore_config={"read_timeout": 42},
+        )
+        assert op.hook.aws_conn_id == "aws-test-custom-conn"
+        assert op.hook._region_name == "eu-west-1"
+        assert op.hook._verify is False
+        assert op.hook._config is not None
+        assert op.hook._config.read_timeout == 42
+
+    @pytest.mark.parametrize("state", SENSOR.SUCCESS_STATES)
+    @mock.patch.object(KinesisAnalyticsV2Hook, "conn")
+    def test_poke_success_state(self, mock_conn, state):
+        mock_conn.describe_application.return_value = {
+            "ApplicationDetail": {"ApplicationARN": self.APPLICATION_ARN, 
"ApplicationStatus": state}
+        }
+
+        assert self.sensor.poke({}) is True
+
+    @pytest.mark.parametrize("state", SENSOR.INTERMEDIATE_STATES)
+    @mock.patch.object(KinesisAnalyticsV2Hook, "conn")
+    def test_intermediate_state(self, mock_conn, state):
+        mock_conn.describe_application.return_value = {
+            "ApplicationDetail": {"ApplicationARN": self.APPLICATION_ARN, 
"ApplicationStatus": state}
+        }
+        assert self.sensor.poke({}) is False
+
+    @pytest.mark.parametrize(
+        "soft_fail, expected_exception",
+        [
+            pytest.param(False, AirflowException, id="not-soft-fail"),
+            pytest.param(True, AirflowSkipException, id="soft-fail"),
+        ],
+    )
+    @pytest.mark.parametrize("state", SENSOR.FAILURE_STATES)
+    @mock.patch.object(KinesisAnalyticsV2Hook, "conn")
+    def test_poke_failure_states(self, mock_conn, state, soft_fail, 
expected_exception):
+        mock_conn.describe_application.return_value = {
+            "ApplicationDetail": {"ApplicationARN": self.APPLICATION_ARN, 
"ApplicationStatus": state}
+        }
+
+        sensor = self.SENSOR(**self.default_op_kwargs, aws_conn_id=None, 
soft_fail=soft_fail)
+
+        with pytest.raises(
+            expected_exception, match="AWS Managed Service for Apache Flink 
application stop failed"
+        ):
+            sensor.poke({})
diff --git a/tests/providers/amazon/aws/triggers/test_kinesis_analytics.py 
b/tests/providers/amazon/aws/triggers/test_kinesis_analytics.py
new file mode 100644
index 0000000000..3692905f22
--- /dev/null
+++ b/tests/providers/amazon/aws/triggers/test_kinesis_analytics.py
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+from unittest.mock import AsyncMock
+
+import pytest
+
+from airflow.providers.amazon.aws.hooks.kinesis_analytics import 
KinesisAnalyticsV2Hook
+from airflow.providers.amazon.aws.triggers.kinesis_analytics import (
+    KinesisAnalyticsV2ApplicationOperationCompleteTrigger,
+)
+from airflow.triggers.base import TriggerEvent
+from tests.providers.amazon.aws.utils.test_waiter import 
assert_expected_waiter_type
+
+BASE_TRIGGER_CLASSPATH = 
"airflow.providers.amazon.aws.triggers.kinesis_analytics."
+
+
+class TestKinesisAnalyticsV2ApplicationOperationCompleteTrigger:
+    APPLICATION_NAME = "demo"
+
+    def test_serialization(self):
+        """Assert that arguments and classpath are correctly serialized."""
+        trigger = KinesisAnalyticsV2ApplicationOperationCompleteTrigger(
+            application_name=self.APPLICATION_NAME, 
waiter_name="application_start_complete"
+        )
+        classpath, kwargs = trigger.serialize()
+        assert classpath == BASE_TRIGGER_CLASSPATH + 
"KinesisAnalyticsV2ApplicationOperationCompleteTrigger"
+        assert kwargs.get("application_name") == self.APPLICATION_NAME
+
+    @pytest.mark.asyncio
+    @mock.patch.object(KinesisAnalyticsV2Hook, "get_waiter")
+    @mock.patch.object(KinesisAnalyticsV2Hook, "async_conn")
+    async def test_run_success_with_application_start_complete_waiter(self, 
mock_async_conn, mock_get_waiter):
+        mock_async_conn.__aenter__.return_value = mock.MagicMock()
+        mock_get_waiter().wait = AsyncMock()
+        trigger = KinesisAnalyticsV2ApplicationOperationCompleteTrigger(
+            application_name=self.APPLICATION_NAME, 
waiter_name="application_start_complete"
+        )
+
+        generator = trigger.run()
+        response = await generator.asend(None)
+
+        assert response == TriggerEvent({"status": "success", 
"application_name": self.APPLICATION_NAME})
+        assert_expected_waiter_type(mock_get_waiter, 
"application_start_complete")
+        mock_get_waiter().wait.assert_called_once()
+
+    @pytest.mark.asyncio
+    @mock.patch.object(KinesisAnalyticsV2Hook, "get_waiter")
+    @mock.patch.object(KinesisAnalyticsV2Hook, "async_conn")
+    async def test_run_success_with_application_stop_complete_waiter(self, 
mock_async_conn, mock_get_waiter):
+        mock_async_conn.__aenter__.return_value = mock.MagicMock()
+        mock_get_waiter().wait = AsyncMock()
+        trigger = KinesisAnalyticsV2ApplicationOperationCompleteTrigger(
+            application_name=self.APPLICATION_NAME, 
waiter_name="application_stop_waiter"
+        )
+
+        generator = trigger.run()
+        response = await generator.asend(None)
+
+        assert response == TriggerEvent({"status": "success", 
"application_name": self.APPLICATION_NAME})
+        assert_expected_waiter_type(mock_get_waiter, "application_stop_waiter")
+        mock_get_waiter().wait.assert_called_once()
diff --git a/tests/providers/amazon/aws/waiters/test_kinesis_analytics.py 
b/tests/providers/amazon/aws/waiters/test_kinesis_analytics.py
new file mode 100644
index 0000000000..31d8253833
--- /dev/null
+++ b/tests/providers/amazon/aws/waiters/test_kinesis_analytics.py
@@ -0,0 +1,106 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+import boto3
+import botocore
+import pytest
+
+from airflow.providers.amazon.aws.hooks.kinesis_analytics import 
KinesisAnalyticsV2Hook
+from airflow.providers.amazon.aws.sensors.kinesis_analytics import (
+    KinesisAnalyticsV2StartApplicationCompletedSensor,
+    KinesisAnalyticsV2StopApplicationCompletedSensor,
+)
+
+
+class TestKinesisAnalyticsV2CustomWaiters:
+    def test_service_waiters(self):
+        assert "application_start_complete" in 
KinesisAnalyticsV2Hook().list_waiters()
+        assert "application_stop_complete" in 
KinesisAnalyticsV2Hook().list_waiters()
+
+
+class TestKinesisAnalyticsV2CustomWaitersBase:
+    @pytest.fixture(autouse=True)
+    def mock_conn(self, monkeypatch):
+        self.client = boto3.client("kinesisanalyticsv2")
+        monkeypatch.setattr(KinesisAnalyticsV2Hook, "conn", self.client)
+
+
+class 
TestKinesisAnalyticsV2ApplicationStartWaiter(TestKinesisAnalyticsV2CustomWaitersBase):
+    APPLICATION_NAME = "demo"
+    WAITER_NAME = "application_start_complete"
+
+    @pytest.fixture
+    def mock_describe_application(self):
+        with mock.patch.object(self.client, "describe_application") as 
mock_getter:
+            yield mock_getter
+
+    @pytest.mark.parametrize("state", 
KinesisAnalyticsV2StartApplicationCompletedSensor.SUCCESS_STATES)
+    def test_start_application_complete(self, state, 
mock_describe_application):
+        mock_describe_application.return_value = {"ApplicationDetail": 
{"ApplicationStatus": state}}
+
+        
KinesisAnalyticsV2Hook().get_waiter(self.WAITER_NAME).wait(ApplicationName=self.APPLICATION_NAME)
+
+    @pytest.mark.parametrize("state", 
KinesisAnalyticsV2StartApplicationCompletedSensor.FAILURE_STATES)
+    def test_start_application_complete_failed(self, state, 
mock_describe_application):
+        mock_describe_application.return_value = {"ApplicationDetail": 
{"ApplicationStatus": state}}
+        with pytest.raises(botocore.exceptions.WaiterError):
+            
KinesisAnalyticsV2Hook().get_waiter(self.WAITER_NAME).wait(ApplicationName=self.APPLICATION_NAME)
+
+    def test_start_application_complete_wait(self, mock_describe_application):
+        wait = {"ApplicationDetail": {"ApplicationStatus": "STARTING"}}
+        success = {"ApplicationDetail": {"ApplicationStatus": "RUNNING"}}
+
+        mock_describe_application.side_effect = [wait, wait, success]
+
+        KinesisAnalyticsV2Hook().get_waiter(self.WAITER_NAME).wait(
+            ApplicationName=self.APPLICATION_NAME, WaiterConfig={"Delay": 
0.01, "MaxAttempts": 3}
+        )
+
+
+class 
TestKinesisAnalyticsV2ApplicationStopWaiter(TestKinesisAnalyticsV2CustomWaitersBase):
+    APPLICATION_NAME = "demo"
+    WAITER_NAME = "application_stop_complete"
+
+    @pytest.fixture
+    def mock_describe_application(self):
+        with mock.patch.object(self.client, "describe_application") as 
mock_getter:
+            yield mock_getter
+
+    @pytest.mark.parametrize("state", 
KinesisAnalyticsV2StopApplicationCompletedSensor.SUCCESS_STATES)
+    def test_stop_application_complete(self, state, mock_describe_application):
+        mock_describe_application.return_value = {"ApplicationDetail": 
{"ApplicationStatus": state}}
+
+        
KinesisAnalyticsV2Hook().get_waiter(self.WAITER_NAME).wait(ApplicationName=self.APPLICATION_NAME)
+
+    @pytest.mark.parametrize("state", 
KinesisAnalyticsV2StopApplicationCompletedSensor.FAILURE_STATES)
+    def test_stop_application_complete_failed(self, state, 
mock_describe_application):
+        mock_describe_application.return_value = {"ApplicationDetail": 
{"ApplicationStatus": state}}
+        with pytest.raises(botocore.exceptions.WaiterError):
+            
KinesisAnalyticsV2Hook().get_waiter(self.WAITER_NAME).wait(ApplicationName=self.APPLICATION_NAME)
+
+    def test_stop_application_complete_wait(self, mock_describe_application):
+        wait = {"ApplicationDetail": {"ApplicationStatus": "STOPPING"}}
+        success = {"ApplicationDetail": {"ApplicationStatus": "READY"}}
+
+        mock_describe_application.side_effect = [wait, wait, success]
+
+        KinesisAnalyticsV2Hook().get_waiter(self.WAITER_NAME).wait(
+            ApplicationName=self.APPLICATION_NAME, WaiterConfig={"Delay": 
0.01, "MaxAttempts": 3}
+        )
diff --git a/tests/system/providers/amazon/aws/example_kinesis_analytics.py 
b/tests/system/providers/amazon/aws/example_kinesis_analytics.py
new file mode 100644
index 0000000000..0d93f1e336
--- /dev/null
+++ b/tests/system/providers/amazon/aws/example_kinesis_analytics.py
@@ -0,0 +1,272 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import datetime as dt
+import json
+import random
+from datetime import datetime
+
+import boto3
+
+from airflow import DAG, settings
+from airflow.decorators import task, task_group
+from airflow.models import Connection
+from airflow.models.baseoperator import chain
+from airflow.providers.amazon.aws.hooks.kinesis_analytics import 
KinesisAnalyticsV2Hook
+from airflow.providers.amazon.aws.operators.kinesis_analytics import (
+    KinesisAnalyticsV2CreateApplicationOperator,
+    KinesisAnalyticsV2StartApplicationOperator,
+    KinesisAnalyticsV2StopApplicationOperator,
+)
+from airflow.providers.amazon.aws.operators.s3 import (
+    S3CreateBucketOperator,
+    S3DeleteBucketOperator,
+)
+from airflow.providers.amazon.aws.sensors.kinesis_analytics import (
+    KinesisAnalyticsV2StartApplicationCompletedSensor,
+    KinesisAnalyticsV2StopApplicationCompletedSensor,
+)
+from airflow.providers.amazon.aws.transfers.http_to_s3 import HttpToS3Operator
+from airflow.utils.trigger_rule import TriggerRule
+from tests.system.providers.amazon.aws.utils import SystemTestContextBuilder
+
+ROLE_ARN_KEY = "ROLE_ARN"
+sys_test_context_task = 
SystemTestContextBuilder().add_variable(ROLE_ARN_KEY).build()
+
+DAG_ID = "example_kinesis_analytics"
+
+
+@task_group
+def kinesis_analytics_v2_workflow():
+    # [START howto_operator_create_application]
+    create_application = KinesisAnalyticsV2CreateApplicationOperator(
+        task_id="create_application",
+        application_name=application_name,
+        runtime_environment="FLINK-1_18",
+        service_execution_role=test_context[ROLE_ARN_KEY],
+        create_application_kwargs={
+            "ApplicationConfiguration": {
+                "FlinkApplicationConfiguration": {
+                    "ParallelismConfiguration": {
+                        "ConfigurationType": "CUSTOM",
+                        "Parallelism": 2,
+                        "ParallelismPerKPU": 1,
+                        "AutoScalingEnabled": False,
+                    }
+                },
+                "EnvironmentProperties": {
+                    "PropertyGroups": [
+                        {
+                            "PropertyGroupId": "BlueprintMetadata",
+                            "PropertyMap": {
+                                "AWSRegion": region_name,
+                                "BlueprintName": 
"KDS_FLINK-DATASTREAM-JAVA_S3",
+                                "BucketName": f"s3://{bucket_name}/",
+                                "PartitionFormat": "yyyy-MM-dd-HH",
+                                "StreamInitialPosition": "TRIM_HORIZON",
+                                "StreamName": stream_name,
+                            },
+                        },
+                    ]
+                },
+                "ApplicationCodeConfiguration": {
+                    "CodeContent": {
+                        "S3ContentLocation": {
+                            "BucketARN": f"arn:aws:s3:::{bucket_name}",
+                            "FileKey": 
"code/kds-to-s3-datastream-java-1.0.1.jar",
+                        },
+                    },
+                    "CodeContentType": "ZIPFILE",
+                },
+            }
+        },
+    )
+    # [END howto_operator_create_application]
+
+    # [START howto_operator_start_application]
+    start_application = KinesisAnalyticsV2StartApplicationOperator(
+        task_id="start_application",
+        application_name=application_name,
+    )
+    # [END howto_operator_start_application]
+    start_application.wait_for_completion = False
+
+    # [START howto_sensor_start_application]
+    await_start_application = 
KinesisAnalyticsV2StartApplicationCompletedSensor(
+        task_id="await_start_application",
+        application_name=application_name,
+    )
+    # [END howto_sensor_start_application]
+
+    # [START howto_operator_stop_application]
+    stop_application = KinesisAnalyticsV2StopApplicationOperator(
+        task_id="stop_application",
+        application_name=application_name,
+    )
+    # [END howto_operator_stop_application]
+    stop_application.wait_for_completion = False
+
+    # [START howto_sensor_stop_application]
+    await_stop_application = KinesisAnalyticsV2StopApplicationCompletedSensor(
+        task_id="await_stop_application",
+        application_name=application_name,
+    )
+    # [END howto_sensor_stop_application]
+
+    @task(trigger_rule=TriggerRule.ALL_DONE)
+    def delete_application(app_name: str):
+        kinesis_analytics_v2_hook = KinesisAnalyticsV2Hook()
+        response = 
kinesis_analytics_v2_hook.conn.describe_application(ApplicationName=app_name)
+        kinesis_analytics_v2_hook.conn.delete_application(
+            ApplicationName=app_name, 
CreateTimestamp=response["ApplicationDetail"]["CreateTimestamp"]
+        )
+
+    chain(
+        create_application,
+        start_application,
+        await_start_application,
+        stop_application,
+        await_stop_application,
+        delete_application(application_name),
+    )
+
+
+@task_group
+def copy_jar_to_s3(bucket: str):
+    """
+
+    Copy application code to S3 using HttpToS3Operator.
+
+    :param bucket: Name of the Amazon S3 bucket.
+    """
+
+    @task
+    def create_connection(conn_id):
+        conn = Connection(
+            conn_id=conn_id,
+            conn_type="http",
+            host="https://github.com/";,
+        )
+        session = settings.Session()
+        session.add(conn)
+        session.commit()
+
+    @task(trigger_rule=TriggerRule.ALL_DONE)
+    def delete_connection(conn_id: str):
+        session = settings.Session()
+        conn_to_details = session.query(Connection).filter(Connection.conn_id 
== conn_id).first()
+        session.delete(conn_to_details)
+        session.commit()
+
+    copy_jar_file = HttpToS3Operator(
+        task_id="copy_jar_file",
+        http_conn_id=http_conn_id,
+        
endpoint="awslabs/managed-service-for-apache-flink-blueprints/releases/download/v2.0.1/kds-to-s3-datastream-java-1.0.1.jar",
+        s3_bucket=bucket,
+        s3_key="code/kds-to-s3-datastream-java-1.0.1.jar",
+    )
+
+    chain(create_connection(http_conn_id), copy_jar_file, 
delete_connection(http_conn_id))
+
+
+@task
+def create_kinesis_stream(stream: str, region: str):
+    """
+    Create kinesis stream and put some sample data.
+
+    :param stream: Name of the kinesis stream.
+    :param region: Region name
+    """
+    client = boto3.client("kinesis", region_name=region)
+    client.create_stream(StreamName=stream, ShardCount=1, 
StreamModeDetails={"StreamMode": "PROVISIONED"})
+    account_id = boto3.client("sts").get_caller_identity()["Account"]
+    waiter = client.get_waiter("stream_exists")
+    waiter.wait(StreamName=stream, WaiterConfig={"Delay": 60, "MaxAttempts": 
4})
+
+    def get_data():
+        return {
+            "event_time": dt.datetime.now().isoformat(),
+            "ticker": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
+            "price": round(random.random() * 100, 2),
+        }
+
+    for _ in range(2):
+        data = get_data()
+        client.put_record(
+            StreamARN=f"arn:aws:kinesis:{region}:{account_id}:stream/{stream}",
+            Data=json.dumps(data),
+            PartitionKey=data["ticker"],
+        )
+
+
+@task(trigger_rule=TriggerRule.ALL_DONE)
+def delete_kinesis_stream(stream: str, region: str):
+    client = boto3.client("kinesis", region_name=region)
+    client.delete_stream(StreamName=stream, EnforceConsumerDeletion=True)
+
+
+with DAG(
+    dag_id=DAG_ID,
+    schedule="@once",
+    start_date=datetime(2021, 1, 1),
+    tags=["example"],
+    catchup=False,
+) as dag:
+    test_context = sys_test_context_task()
+    env_id = test_context["ENV_ID"]
+    bucket_name = f"{env_id}-kinesis-analytics"
+    application_name = f"{env_id}-test-app"
+    http_conn_id = f"{env_id}-git"
+    region_name = boto3.session.Session().region_name
+    stream_name = f"{env_id}-test-stream"
+
+    create_bucket = S3CreateBucketOperator(
+        task_id="create_bucket",
+        bucket_name=bucket_name,
+    )
+
+    delete_bucket = S3DeleteBucketOperator(
+        task_id="delete_bucket",
+        trigger_rule=TriggerRule.ALL_DONE,
+        bucket_name=bucket_name,
+        force_delete=True,
+    )
+
+    chain(
+        # TEST SETUP
+        test_context,
+        create_bucket,
+        create_kinesis_stream(stream=stream_name, region=region_name),
+        copy_jar_to_s3(bucket=bucket_name),
+        # TEST BODY
+        kinesis_analytics_v2_workflow(),
+        # TEST TEARDOWN
+        delete_kinesis_stream(stream=stream_name, region=region_name),
+        delete_bucket,
+    )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)


Reply via email to