This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a4e3fbed1a Introduce Amazon Kinesis Analytics V2 (Managed Service for
Apache Flink application) (#40765)
a4e3fbed1a is described below
commit a4e3fbed1a07b5685820a468e2d2ebb986b7d6b4
Author: GPK <[email protected]>
AuthorDate: Thu Jul 18 19:18:48 2024 +0100
Introduce Amazon Kinesis Analytics V2 (Managed Service for Apache Flink
application) (#40765)
---
.../amazon/aws/hooks/kinesis_analytics.py | 37 ++
.../amazon/aws/operators/kinesis_analytics.py | 348 +++++++++++++++
.../amazon/aws/sensors/kinesis_analytics.py | 242 ++++++++++
.../amazon/aws/triggers/kinesis_analytics.py | 69 +++
.../amazon/aws/waiters/kinesisanalyticsv2.json | 151 +++++++
airflow/providers/amazon/provider.yaml | 18 +
.../operators/kinesis_analytics.rst | 115 +++++
.../aws/[email protected] | Bin 0 -> 16946 bytes
docs/spelling_wordlist.txt | 1 +
.../amazon/aws/hooks/test_kinesis_analytics.py | 31 ++
.../amazon/aws/operators/test_kinesis_analytics.py | 485 +++++++++++++++++++++
.../amazon/aws/sensors/test_kinesis_analytics.py | 172 ++++++++
.../amazon/aws/triggers/test_kinesis_analytics.py | 78 ++++
.../amazon/aws/waiters/test_kinesis_analytics.py | 106 +++++
.../amazon/aws/example_kinesis_analytics.py | 272 ++++++++++++
15 files changed, 2125 insertions(+)
diff --git a/airflow/providers/amazon/aws/hooks/kinesis_analytics.py
b/airflow/providers/amazon/aws/hooks/kinesis_analytics.py
new file mode 100644
index 0000000000..6f346dec45
--- /dev/null
+++ b/airflow/providers/amazon/aws/hooks/kinesis_analytics.py
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class KinesisAnalyticsV2Hook(AwsBaseHook):
+ """
+ Interact with Amazon Kinesis Analytics V2.
+
+ Provide thin wrapper around
:external+boto3:py:class:`boto3.client("kinesisanalyticsv2")
<KinesisAnalyticsV2.Client>`.
+
+ Additional arguments (such as ``aws_conn_id``) may be specified and
+ are passed down to the underlying AwsBaseHook.
+
+ .. seealso::
+ - :class:`airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+ """
+
+ def __init__(self, *args, **kwargs) -> None:
+ kwargs["client_type"] = "kinesisanalyticsv2"
+ super().__init__(*args, **kwargs)
diff --git a/airflow/providers/amazon/aws/operators/kinesis_analytics.py
b/airflow/providers/amazon/aws/operators/kinesis_analytics.py
new file mode 100644
index 0000000000..727aa714c6
--- /dev/null
+++ b/airflow/providers/amazon/aws/operators/kinesis_analytics.py
@@ -0,0 +1,348 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any, Sequence
+
+from botocore.exceptions import ClientError
+
+from airflow.configuration import conf
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.kinesis_analytics import
KinesisAnalyticsV2Hook
+from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator
+from airflow.providers.amazon.aws.triggers.kinesis_analytics import (
+ KinesisAnalyticsV2ApplicationOperationCompleteTrigger,
+)
+from airflow.providers.amazon.aws.utils import validate_execute_complete_event
+from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
+
+if TYPE_CHECKING:
+ from airflow.utils.context import Context
+
+
+class
KinesisAnalyticsV2CreateApplicationOperator(AwsBaseOperator[KinesisAnalyticsV2Hook]):
+ """
+ Creates an AWS Managed Service for Apache Flink application.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:KinesisAnalyticsV2CreateApplicationOperator`
+
+ :param application_name: The name of application. (templated)
+ :param runtime_environment: The runtime environment for the application.
(templated)
+ :param service_execution_role: The IAM role used by the application to
access services. (templated)
+ :param create_application_kwargs: Create application extra properties.
(templated)
+ :param application_description: A summary description of the application.
(templated)
+
+ :param aws_conn_id: The Airflow connection used for AWS credentials.
+ If this is ``None`` or empty then the default boto3 behaviour is used.
If
+ running Airflow in a distributed manner and aws_conn_id is None or
+ empty, then default boto3 configuration would be used (and must be
+ maintained on each worker node).
+ :param region_name: AWS region_name. If not specified then the default
boto3 behaviour is used.
+ :param verify: Whether to verify SSL certificates. See:
+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
+ :param botocore_config: Configuration dictionary (key-values) for botocore
client. See:
+
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
+ """
+
+ aws_hook_class = KinesisAnalyticsV2Hook
+ ui_color = "#44b5e2"
+
+ template_fields: Sequence[str] = aws_template_fields(
+ "application_name",
+ "runtime_environment",
+ "service_execution_role",
+ "create_application_kwargs",
+ "application_description",
+ )
+ template_fields_renderers: dict = {
+ "create_application_kwargs": "json",
+ }
+
+ def __init__(
+ self,
+ application_name: str,
+ runtime_environment: str,
+ service_execution_role: str,
+ create_application_kwargs: dict[str, Any] | None = None,
+ application_description: str = "Managed Service for Apache Flink
application created from Airflow",
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+ self.application_name = application_name
+ self.runtime_environment = runtime_environment
+ self.service_execution_role = service_execution_role
+ self.create_application_kwargs = create_application_kwargs or {}
+ self.application_description = application_description
+
+ def execute(self, context: Context) -> dict[str, str]:
+ self.log.info("Creating AWS Managed Service for Apache Flink
application %s.", self.application_name)
+ try:
+ response = self.hook.conn.create_application(
+ ApplicationName=self.application_name,
+ ApplicationDescription=self.application_description,
+ RuntimeEnvironment=self.runtime_environment,
+ ServiceExecutionRole=self.service_execution_role,
+ **self.create_application_kwargs,
+ )
+ except ClientError as error:
+ raise AirflowException(
+ f"AWS Managed Service for Apache Flink application creation
failed: {error.response['Error']['Message']}"
+ )
+
+ self.log.info(
+ "AWS Managed Service for Apache Flink application created
successfully %s.",
+ self.application_name,
+ )
+
+ return {"ApplicationARN":
response["ApplicationDetail"]["ApplicationARN"]}
+
+
+class
KinesisAnalyticsV2StartApplicationOperator(AwsBaseOperator[KinesisAnalyticsV2Hook]):
+ """
+ Starts an AWS Managed Service for Apache Flink application.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:KinesisAnalyticsV2StartApplicationOperator`
+
+ :param application_name: The name of application. (templated)
+ :param run_configuration: Application properties to start Apache Flink
Job. (templated)
+
+ :param wait_for_completion: Whether to wait for job to stop. (default:
True)
+ :param waiter_delay: Time in seconds to wait between status checks.
(default: 60)
+ :param waiter_max_attempts: Maximum number of attempts to check for job
completion. (default: 20)
+ :param deferrable: If True, the operator will wait asynchronously for the
job to stop.
+ This implies waiting for completion. This mode requires aiobotocore
module to be installed.
+ (default: False)
+ :param aws_conn_id: The Airflow connection used for AWS credentials.
+ If this is ``None`` or empty then the default boto3 behaviour is used.
If
+ running Airflow in a distributed manner and aws_conn_id is None or
+ empty, then default boto3 configuration would be used (and must be
+ maintained on each worker node).
+ :param region_name: AWS region_name. If not specified then the default
boto3 behaviour is used.
+ :param verify: Whether to verify SSL certificates. See:
+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
+ :param botocore_config: Configuration dictionary (key-values) for botocore
client. See:
+
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
+ """
+
+ aws_hook_class = KinesisAnalyticsV2Hook
+ ui_color = "#44b5e2"
+
+ template_fields: Sequence[str] = aws_template_fields(
+ "application_name",
+ "run_configuration",
+ )
+ template_fields_renderers: dict = {
+ "run_configuration": "json",
+ }
+
+ def __init__(
+ self,
+ application_name: str,
+ run_configuration: dict[str, Any] | None = None,
+ wait_for_completion: bool = True,
+ waiter_delay: int = 60,
+ waiter_max_attempts: int = 20,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+ self.application_name = application_name
+ self.run_configuration = run_configuration or {}
+ self.wait_for_completion = wait_for_completion
+ self.waiter_delay = waiter_delay
+ self.waiter_max_attempts = waiter_max_attempts
+ self.deferrable = deferrable
+
+ def execute(self, context: Context) -> dict[str, Any]:
+ msg = "AWS Managed Service for Apache Flink application"
+
+ try:
+ self.log.info("Starting %s %s.", msg, self.application_name)
+ self.hook.conn.start_application(
+ ApplicationName=self.application_name,
RunConfiguration=self.run_configuration
+ )
+ except ClientError as error:
+ raise AirflowException(
+ f"Failed to start {msg} {self.application_name}:
{error.response['Error']['Message']}"
+ )
+
+ describe_response =
self.hook.conn.describe_application(ApplicationName=self.application_name)
+
+ if self.deferrable:
+ self.log.info("Deferring for %s to start: %s.", msg,
self.application_name)
+ self.defer(
+ trigger=KinesisAnalyticsV2ApplicationOperationCompleteTrigger(
+ application_name=self.application_name,
+ waiter_name="application_start_complete",
+ aws_conn_id=self.aws_conn_id,
+ waiter_delay=self.waiter_delay,
+ waiter_max_attempts=self.waiter_max_attempts,
+ region_name=self.region_name,
+ verify=self.verify,
+ botocore_config=self.botocore_config,
+ ),
+ method_name="execute_complete",
+ )
+ if self.wait_for_completion:
+ self.log.info("Waiting for %s to start: %s.", msg,
self.application_name)
+
+ self.hook.get_waiter("application_start_complete").wait(
+ ApplicationName=self.application_name,
+ WaiterConfig={"Delay": self.waiter_delay, "MaxAttempts":
self.waiter_max_attempts},
+ )
+
+ self.log.info("%s started successfully %s.", msg,
self.application_name)
+
+ return {"ApplicationARN":
describe_response["ApplicationDetail"]["ApplicationARN"]}
+
+ def execute_complete(self, context: Context, event: dict[str, Any] | None
= None) -> dict[str, Any]:
+ event = validate_execute_complete_event(event)
+
+ if event["status"] != "success":
+ raise AirflowException(
+ "Error while starting AWS Managed Service for Apache Flink
application: %s", event
+ )
+
+ response = self.hook.conn.describe_application(
+ ApplicationName=event["application_name"],
+ )
+
+ self.log.info(
+ "AWS Managed Service for Apache Flink application %s started
successfully.",
+ event["application_name"],
+ )
+
+ return {"ApplicationARN":
response["ApplicationDetail"]["ApplicationARN"]}
+
+
+class
KinesisAnalyticsV2StopApplicationOperator(AwsBaseOperator[KinesisAnalyticsV2Hook]):
+ """
+ Stop an AWS Managed Service for Apache Flink application.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:KinesisAnalyticsV2StopApplicationOperator`
+
+ :param application_name: The name of your application. (templated)
+ :param force: Set to true to force the application to stop. If you set
Force to true, Managed Service for
+ Apache Flink stops the application without taking a snapshot.
(templated)
+
+ :param wait_for_completion: Whether to wait for job to stop. (default:
True)
+ :param waiter_delay: Time in seconds to wait between status checks.
(default: 60)
+ :param waiter_max_attempts: Maximum number of attempts to check for job
completion. (default: 20)
+ :param deferrable: If True, the operator will wait asynchronously for the
job to stop.
+ This implies waiting for completion. This mode requires aiobotocore
module to be installed.
+ (default: False)
+ :param aws_conn_id: The Airflow connection used for AWS credentials.
+ If this is ``None`` or empty then the default boto3 behaviour is used.
If
+ running Airflow in a distributed manner and aws_conn_id is None or
+ empty, then default boto3 configuration would be used (and must be
+ maintained on each worker node).
+ :param region_name: AWS region_name. If not specified then the default
boto3 behaviour is used.
+ :param verify: Whether to verify SSL certificates. See:
+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
+ :param botocore_config: Configuration dictionary (key-values) for botocore
client. See:
+
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
+ """
+
+ aws_hook_class = KinesisAnalyticsV2Hook
+ ui_color = "#44b5e2"
+
+ template_fields: Sequence[str] = aws_template_fields(
+ "application_name",
+ "force",
+ )
+
+ def __init__(
+ self,
+ application_name: str,
+ force: bool = False,
+ wait_for_completion: bool = True,
+ waiter_delay: int = 60,
+ waiter_max_attempts: int = 20,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+ self.application_name = application_name
+ self.force = force
+ self.wait_for_completion = wait_for_completion
+ self.waiter_delay = waiter_delay
+ self.waiter_max_attempts = waiter_max_attempts
+ self.deferrable = deferrable
+
+ def execute(self, context: Context) -> dict[str, Any]:
+ msg = "AWS Managed Service for Apache Flink application"
+
+ try:
+ self.log.info("Stopping %s %s.", msg, self.application_name)
+
+
self.hook.conn.stop_application(ApplicationName=self.application_name,
Force=self.force)
+ except ClientError as error:
+ raise AirflowException(
+ f"Failed to stop {msg} {self.application_name}:
{error.response['Error']['Message']}"
+ )
+
+ describe_response =
self.hook.conn.describe_application(ApplicationName=self.application_name)
+
+ if self.deferrable:
+ self.log.info("Deferring for %s to stop: %s.", msg,
self.application_name)
+ self.defer(
+ trigger=KinesisAnalyticsV2ApplicationOperationCompleteTrigger(
+ application_name=self.application_name,
+ waiter_name="application_stop_complete",
+ aws_conn_id=self.aws_conn_id,
+ waiter_delay=self.waiter_delay,
+ waiter_max_attempts=self.waiter_max_attempts,
+ region_name=self.region_name,
+ verify=self.verify,
+ botocore_config=self.botocore_config,
+ ),
+ method_name="execute_complete",
+ )
+ if self.wait_for_completion:
+ self.log.info("Waiting for %s to stop: %s.", msg,
self.application_name)
+
+ self.hook.get_waiter("application_stop_complete").wait(
+ ApplicationName=self.application_name,
+ WaiterConfig={"Delay": self.waiter_delay, "MaxAttempts":
self.waiter_max_attempts},
+ )
+
+ self.log.info("%s stopped successfully %s.", msg,
self.application_name)
+
+ return {"ApplicationARN":
describe_response["ApplicationDetail"]["ApplicationARN"]}
+
+ def execute_complete(self, context: Context, event: dict[str, Any] | None
= None) -> dict[str, Any]:
+ event = validate_execute_complete_event(event)
+
+ if event["status"] != "success":
+ raise AirflowException("Error while stopping AWS Managed Service
for Apache Flink application")
+
+ response = self.hook.conn.describe_application(
+ ApplicationName=event["application_name"],
+ )
+
+ self.log.info(
+ "AWS Managed Service for Apache Flink application %s stopped
successfully.",
+ event["application_name"],
+ )
+
+ return {"ApplicationARN":
response["ApplicationDetail"]["ApplicationARN"]}
diff --git a/airflow/providers/amazon/aws/sensors/kinesis_analytics.py
b/airflow/providers/amazon/aws/sensors/kinesis_analytics.py
new file mode 100644
index 0000000000..37a2dc733d
--- /dev/null
+++ b/airflow/providers/amazon/aws/sensors/kinesis_analytics.py
@@ -0,0 +1,242 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any, Sequence
+
+from airflow.configuration import conf
+from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.providers.amazon.aws.hooks.kinesis_analytics import
KinesisAnalyticsV2Hook
+from airflow.providers.amazon.aws.sensors.base_aws import AwsBaseSensor
+from airflow.providers.amazon.aws.triggers.kinesis_analytics import (
+ KinesisAnalyticsV2ApplicationOperationCompleteTrigger,
+)
+from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
+
+if TYPE_CHECKING:
+ from airflow.utils.context import Context
+
+
+class
KinesisAnalyticsV2StartApplicationCompletedSensor(AwsBaseSensor[KinesisAnalyticsV2Hook]):
+ """
+ Waits for AWS Managed Service for Apache Flink application to start.
+
+ .. seealso::
+ For more information on how to use this sensor, take a look at the
guide:
+ :ref:`howto/sensor:KinesisAnalyticsV2StartApplicationCompletedSensor`
+
+ :param application_name: Application name.
+
+ :param deferrable: If True, the sensor will operate in deferrable mode.
This mode requires aiobotocore
+ module to be installed.
+ (default: False, but can be overridden in config file by setting
default_deferrable to True)
+ :param poke_interval: Polling period in seconds to check for the status of
the job. (default: 120)
+ :param max_retries: Number of times before returning the current state.
(default: 75)
+ :param aws_conn_id: The Airflow connection used for AWS credentials.
+ If this is ``None`` or empty then the default boto3 behaviour is used.
If
+ running Airflow in a distributed manner and aws_conn_id is None or
+ empty, then default boto3 configuration would be used (and must be
+ maintained on each worker node).
+ :param region_name: AWS region_name. If not specified then the default
boto3 behaviour is used.
+ :param verify: Whether to verify SSL certificates. See:
+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
+ :param botocore_config: Configuration dictionary (key-values) for botocore
client. See:
+
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
+
+ """
+
+ aws_hook_class = KinesisAnalyticsV2Hook
+ ui_color = "#66c3ff"
+
+ INTERMEDIATE_STATES: tuple[str, ...] = ("STARTING", "UPDATING",
"AUTOSCALING")
+ FAILURE_STATES: tuple[str, ...] = (
+ "DELETING",
+ "STOPPING",
+ "READY",
+ "FORCE_STOPPING",
+ "ROLLING_BACK",
+ "MAINTENANCE",
+ "ROLLED_BACK",
+ )
+ SUCCESS_STATES: tuple[str, ...] = ("RUNNING",)
+ FAILURE_MESSAGE = "AWS Managed Service for Apache Flink application start
failed."
+
+ template_fields: Sequence[str] = aws_template_fields("application_name")
+
+ def __init__(
+ self,
+ *,
+ application_name: str,
+ max_retries: int = 75,
+ poke_interval: int = 120,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
+ **kwargs: Any,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.application_name = application_name
+ self.max_retries = max_retries
+ self.poke_interval = poke_interval
+ self.deferrable = deferrable
+
+ def execute(self, context: Context) -> Any:
+ if self.deferrable:
+ self.defer(
+ trigger=KinesisAnalyticsV2ApplicationOperationCompleteTrigger(
+ application_name=self.application_name,
+ waiter_name="application_start_complete",
+ aws_conn_id=self.aws_conn_id,
+ waiter_delay=int(self.poke_interval),
+ waiter_max_attempts=self.max_retries,
+ region_name=self.region_name,
+ verify=self.verify,
+ botocore_config=self.botocore_config,
+ ),
+ method_name="poke",
+ )
+ else:
+ super().execute(context=context)
+
+ def poke(self, context: Context, **kwargs) -> bool:
+ status =
self.hook.conn.describe_application(ApplicationName=self.application_name)[
+ "ApplicationDetail"
+ ]["ApplicationStatus"]
+
+ self.log.info(
+ "Poking for AWS Managed Service for Apache Flink application: %s
status: %s",
+ self.application_name,
+ status,
+ )
+
+ if status in self.FAILURE_STATES:
+ # TODO: remove this if block when min_airflow_version is set to
higher than 2.7.1
+ if self.soft_fail:
+ raise AirflowSkipException(self.FAILURE_MESSAGE)
+ raise AirflowException(self.FAILURE_MESSAGE)
+
+ if status in self.SUCCESS_STATES:
+ self.log.info(
+ "AWS Managed Service for Apache Flink application started
successfully `%s`.",
+ self.application_name,
+ )
+ return True
+
+ return False
+
+
+class
KinesisAnalyticsV2StopApplicationCompletedSensor(AwsBaseSensor[KinesisAnalyticsV2Hook]):
+ """
+ Waits for AWS Managed Service for Apache Flink application to stop.
+
+ .. seealso::
+ For more information on how to use this sensor, take a look at the
guide:
+ :ref:`howto/sensor:KinesisAnalyticsV2StopApplicationCompletedSensor`
+
+ :param application_name: Application name.
+
+ :param deferrable: If True, the sensor will operate in deferrable mode.
This mode requires aiobotocore
+ module to be installed.
+ (default: False, but can be overridden in config file by setting
default_deferrable to True)
+ :param poke_interval: Polling period in seconds to check for the status of
the job. (default: 120)
+ :param max_retries: Number of times before returning the current state.
(default: 75)
+ :param aws_conn_id: The Airflow connection used for AWS credentials.
+ If this is ``None`` or empty then the default boto3 behaviour is used.
If
+ running Airflow in a distributed manner and aws_conn_id is None or
+ empty, then default boto3 configuration would be used (and must be
+ maintained on each worker node).
+ :param region_name: AWS region_name. If not specified then the default
boto3 behaviour is used.
+ :param verify: Whether to verify SSL certificates. See:
+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
+ :param botocore_config: Configuration dictionary (key-values) for botocore
client. See:
+
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
+
+ """
+
+ aws_hook_class = KinesisAnalyticsV2Hook
+ ui_color = "#66c3ff"
+
+ INTERMEDIATE_STATES: tuple[str, ...] = (
+ "STARTING",
+ "UPDATING",
+ "AUTOSCALING",
+ "RUNNING",
+ "STOPPING",
+ "FORCE_STOPPING",
+ )
+ FAILURE_STATES: tuple[str, ...] = ("DELETING", "ROLLING_BACK",
"MAINTENANCE", "ROLLED_BACK")
+ SUCCESS_STATES: tuple[str, ...] = ("READY",)
+ FAILURE_MESSAGE = "AWS Managed Service for Apache Flink application stop
failed."
+
+ template_fields: Sequence[str] = aws_template_fields("application_name")
+
+ def __init__(
+ self,
+ *,
+ application_name: str,
+ max_retries: int = 75,
+ poke_interval: int = 120,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
+ **kwargs: Any,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.application_name = application_name
+ self.max_retries = max_retries
+ self.poke_interval = poke_interval
+ self.deferrable = deferrable
+
+ def execute(self, context: Context) -> Any:
+ if self.deferrable:
+ self.defer(
+ trigger=KinesisAnalyticsV2ApplicationOperationCompleteTrigger(
+ application_name=self.application_name,
+ waiter_name="application_stop_complete",
+ aws_conn_id=self.aws_conn_id,
+ waiter_delay=int(self.poke_interval),
+ waiter_max_attempts=self.max_retries,
+ region_name=self.region_name,
+ verify=self.verify,
+ botocore_config=self.botocore_config,
+ ),
+ method_name="poke",
+ )
+ else:
+ super().execute(context=context)
+
+ def poke(self, context: Context, **kwargs) -> bool:
+ status =
self.hook.conn.describe_application(ApplicationName=self.application_name)[
+ "ApplicationDetail"
+ ]["ApplicationStatus"]
+
+ self.log.info(
+ "Poking for AWS Managed Service for Apache Flink application: %s
status: %s",
+ self.application_name,
+ status,
+ )
+
+ if status in self.FAILURE_STATES:
+ # TODO: remove this if block when min_airflow_version is set to
higher than 2.7.1
+ if self.soft_fail:
+ raise AirflowSkipException(self.FAILURE_MESSAGE)
+ raise AirflowException(self.FAILURE_MESSAGE)
+
+ if status in self.SUCCESS_STATES:
+ self.log.info(
+ "AWS Managed Service for Apache Flink application stopped
successfully `%s`.",
+ self.application_name,
+ )
+ return True
+
+ return False
diff --git a/airflow/providers/amazon/aws/triggers/kinesis_analytics.py
b/airflow/providers/amazon/aws/triggers/kinesis_analytics.py
new file mode 100644
index 0000000000..195a2285de
--- /dev/null
+++ b/airflow/providers/amazon/aws/triggers/kinesis_analytics.py
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from airflow.providers.amazon.aws.hooks.kinesis_analytics import
KinesisAnalyticsV2Hook
+from airflow.providers.amazon.aws.triggers.base import AwsBaseWaiterTrigger
+
+if TYPE_CHECKING:
+ from airflow.providers.amazon.aws.hooks.base_aws import AwsGenericHook
+
+
+class
KinesisAnalyticsV2ApplicationOperationCompleteTrigger(AwsBaseWaiterTrigger):
+ """
+ Trigger when a Managed Service for Apache Flink application Start or Stop
is complete.
+
+ :param application_name: Application name.
+ :param waiter_name: The name of the waiter for stop or start application.
+ :param waiter_delay: The amount of time in seconds to wait between
attempts. (default: 120)
+ :param waiter_max_attempts: The maximum number of attempts to be made.
(default: 75)
+ :param aws_conn_id: The Airflow connection used for AWS credentials.
+ """
+
+ def __init__(
+ self,
+ application_name: str,
+ waiter_name: str,
+ waiter_delay: int = 120,
+ waiter_max_attempts: int = 75,
+ aws_conn_id: str | None = "aws_default",
+ **kwargs,
+ ) -> None:
+ super().__init__(
+ serialized_fields={"application_name": application_name,
"waiter_name": waiter_name},
+ waiter_name=waiter_name,
+ waiter_args={"ApplicationName": application_name},
+ failure_message=f"AWS Managed Service for Apache Flink Application
{application_name} failed.",
+ status_message=f"Status of AWS Managed Service for Apache Flink
Application {application_name} is",
+ status_queries=["ApplicationDetail.ApplicationStatus"],
+ return_key="application_name",
+ return_value=application_name,
+ waiter_delay=waiter_delay,
+ waiter_max_attempts=waiter_max_attempts,
+ aws_conn_id=aws_conn_id,
+ **kwargs,
+ )
+
+ def hook(self) -> AwsGenericHook:
+ return KinesisAnalyticsV2Hook(
+ aws_conn_id=self.aws_conn_id,
+ region_name=self.region_name,
+ verify=self.verify,
+ config=self.botocore_config,
+ )
diff --git a/airflow/providers/amazon/aws/waiters/kinesisanalyticsv2.json
b/airflow/providers/amazon/aws/waiters/kinesisanalyticsv2.json
new file mode 100644
index 0000000000..5f6f0ec6e7
--- /dev/null
+++ b/airflow/providers/amazon/aws/waiters/kinesisanalyticsv2.json
@@ -0,0 +1,151 @@
+{
+ "version": 2,
+ "waiters": {
+ "application_start_complete": {
+ "delay": 120,
+ "maxAttempts": 75,
+ "operation": "DescribeApplication",
+ "acceptors": [
+ {
+ "matcher": "path",
+ "argument": "ApplicationDetail.ApplicationStatus",
+ "expected": "STARTING",
+ "state": "retry"
+ },
+ {
+ "matcher": "path",
+ "argument": "ApplicationDetail.ApplicationStatus",
+ "expected": "UPDATING",
+ "state": "retry"
+ },
+ {
+ "matcher": "path",
+ "argument": "ApplicationDetail.ApplicationStatus",
+ "expected": "AUTOSCALING",
+ "state": "retry"
+ },
+ {
+ "matcher": "path",
+ "argument": "ApplicationDetail.ApplicationStatus",
+ "expected": "DELETING",
+ "state": "failure"
+ },
+ {
+ "matcher": "path",
+ "argument": "ApplicationDetail.ApplicationStatus",
+ "expected": "STOPPING",
+ "state": "failure"
+ },
+ {
+ "matcher": "path",
+ "argument": "ApplicationDetail.ApplicationStatus",
+ "expected": "READY",
+ "state": "failure"
+ },
+ {
+ "matcher": "path",
+ "argument": "ApplicationDetail.ApplicationStatus",
+ "expected": "FORCE_STOPPING",
+ "state": "failure"
+ },
+ {
+ "matcher": "path",
+ "argument": "ApplicationDetail.ApplicationStatus",
+ "expected": "ROLLING_BACK",
+ "state": "failure"
+ },
+ {
+ "matcher": "path",
+ "argument": "ApplicationDetail.ApplicationStatus",
+ "expected": "MAINTENANCE",
+ "state": "failure"
+ },
+ {
+ "matcher": "path",
+ "argument": "ApplicationDetail.ApplicationStatus",
+ "expected": "ROLLED_BACK",
+ "state": "failure"
+ },
+ {
+ "matcher": "path",
+ "argument": "ApplicationDetail.ApplicationStatus",
+ "expected": "RUNNING",
+ "state": "success"
+ }
+ ]
+ },
+ "application_stop_complete": {
+ "delay": 120,
+ "maxAttempts": 75,
+ "operation": "DescribeApplication",
+ "acceptors": [
+ {
+ "matcher": "path",
+ "argument": "ApplicationDetail.ApplicationStatus",
+ "expected": "STARTING",
+ "state": "retry"
+ },
+ {
+ "matcher": "path",
+ "argument": "ApplicationDetail.ApplicationStatus",
+ "expected": "UPDATING",
+ "state": "retry"
+ },
+ {
+ "matcher": "path",
+ "argument": "ApplicationDetail.ApplicationStatus",
+ "expected": "AUTOSCALING",
+ "state": "retry"
+ },
+ {
+ "matcher": "path",
+ "argument": "ApplicationDetail.ApplicationStatus",
+ "expected": "RUNNING",
+ "state": "retry"
+ },
+ {
+ "matcher": "path",
+ "argument": "ApplicationDetail.ApplicationStatus",
+ "expected": "STOPPING",
+ "state": "retry"
+ },
+ {
+ "matcher": "path",
+ "argument": "ApplicationDetail.ApplicationStatus",
+ "expected": "FORCE_STOPPING",
+ "state": "retry"
+ },
+ {
+ "matcher": "path",
+ "argument": "ApplicationDetail.ApplicationStatus",
+ "expected": "DELETING",
+ "state": "failure"
+ },
+ {
+ "matcher": "path",
+ "argument": "ApplicationDetail.ApplicationStatus",
+ "expected": "ROLLING_BACK",
+ "state": "failure"
+ },
+ {
+ "matcher": "path",
+ "argument": "ApplicationDetail.ApplicationStatus",
+ "expected": "MAINTENANCE",
+ "state": "failure"
+ },
+ {
+ "matcher": "path",
+ "argument": "ApplicationDetail.ApplicationStatus",
+ "expected": "ROLLED_BACK",
+ "state": "failure"
+ },
+ {
+ "matcher": "path",
+ "argument": "ApplicationDetail.ApplicationStatus",
+ "expected": "READY",
+ "state": "success"
+ }
+ ]
+ }
+ }
+}
diff --git a/airflow/providers/amazon/provider.yaml
b/airflow/providers/amazon/provider.yaml
index 478aeb230c..a7b4d4272f 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -248,6 +248,12 @@ integrations:
external-doc-url: https://aws.amazon.com/kinesis/data-firehose/
logo: /integration-logos/aws/[email protected]
tags: [aws]
+ - integration-name: Amazon Managed Service for Apache Flink
+ how-to-guide:
+ - /docs/apache-airflow-providers-amazon/operators/kinesis_analytics.rst
+ external-doc-url: https://aws.amazon.com/managed-service-apache-flink/
+ logo: /integration-logos/aws/[email protected]
+ tags: [aws]
- integration-name: Amazon OpenSearch Serverless
how-to-guide:
-
/docs/apache-airflow-providers-amazon/operators/opensearchserverless.rst
@@ -435,6 +441,9 @@ operators:
- integration-name: AWS Lambda
python-modules:
- airflow.providers.amazon.aws.operators.lambda_function
+ - integration-name: Amazon Managed Service for Apache Flink
+ python-modules:
+ - airflow.providers.amazon.aws.operators.kinesis_analytics
- integration-name: Amazon Simple Storage Service (S3)
python-modules:
- airflow.providers.amazon.aws.operators.s3
@@ -521,6 +530,9 @@ sensors:
- integration-name: AWS Lambda
python-modules:
- airflow.providers.amazon.aws.sensors.lambda_function
+ - integration-name: Amazon Managed Service for Apache Flink
+ python-modules:
+ - airflow.providers.amazon.aws.sensors.kinesis_analytics
- integration-name: Amazon OpenSearch Serverless
python-modules:
- airflow.providers.amazon.aws.sensors.opensearch_serverless
@@ -624,6 +636,9 @@ hooks:
- integration-name: AWS Lambda
python-modules:
- airflow.providers.amazon.aws.hooks.lambda_function
+ - integration-name: Amazon Managed Service for Apache Flink
+ python-modules:
+ - airflow.providers.amazon.aws.hooks.kinesis_analytics
- integration-name: Amazon CloudWatch Logs
python-modules:
- airflow.providers.amazon.aws.hooks.logs
@@ -703,6 +718,9 @@ triggers:
- integration-name: AWS Lambda
python-modules:
- airflow.providers.amazon.aws.triggers.lambda_function
+ - integration-name: Amazon Managed Service for Apache Flink
+ python-modules:
+ - airflow.providers.amazon.aws.triggers.kinesis_analytics
- integration-name: Amazon OpenSearch Serverless
python-modules:
- airflow.providers.amazon.aws.triggers.opensearch_serverless
diff --git
a/docs/apache-airflow-providers-amazon/operators/kinesis_analytics.rst
b/docs/apache-airflow-providers-amazon/operators/kinesis_analytics.rst
new file mode 100644
index 0000000000..d7156feabb
--- /dev/null
+++ b/docs/apache-airflow-providers-amazon/operators/kinesis_analytics.rst
@@ -0,0 +1,115 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+========================================
+Amazon Managed Service for Apache Flink
+========================================
+
+`Amazon Managed Service for Apache Flink
<https://aws.amazon.com/managed-service-apache-flink/>`__ is a fully managed
service that you can use to process and analyze streaming data using
+Java, Python, SQL, or Scala. The service enables you to quickly author and run
Java, SQL, or Scala code against streaming sources to perform time series
analytics,
+feed real-time dashboards, and create real-time metrics.
+
+Prerequisite Tasks
+------------------
+
+.. include:: ../_partials/prerequisite_tasks.rst
+
+Generic Parameters
+------------------
+
+.. include:: ../_partials/generic_parameters.rst
+
+Operators
+---------
+
+.. _howto/operator:KinesisAnalyticsV2CreateApplicationOperator:
+
+Create an Amazon Managed Service for Apache Flink Application
+==============================================================
+
+To create an Amazon Managed Service for Apache Flink application, you can use
+:class:`~airflow.providers.amazon.aws.operators.kinesis_analytics.KinesisAnalyticsV2CreateApplicationOperator`.
+
+.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_kinesis_analytics.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_create_application]
+ :end-before: [END howto_operator_create_application]
+
+.. _howto/operator:KinesisAnalyticsV2StartApplicationOperator:
+
+Start an Amazon Managed Service for Apache Flink Application
+=============================================================
+
+To start an Amazon Managed Service for Apache Flink application, you can use
+:class:`~airflow.providers.amazon.aws.operators.kinesis_analytics.KinesisAnalyticsV2StartApplicationOperator`.
+
+.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_kinesis_analytics.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_start_application]
+ :end-before: [END howto_operator_start_application]
+
+.. _howto/operator:KinesisAnalyticsV2StopApplicationOperator:
+
+Stop an Amazon Managed Service for Apache Flink Application
+=============================================================
+
+To stop an Amazon Managed Service for Apache Flink application, you can use
+:class:`~airflow.providers.amazon.aws.operators.kinesis_analytics.KinesisAnalyticsV2StopApplicationOperator`.
+
+.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_kinesis_analytics.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_stop_application]
+ :end-before: [END howto_operator_stop_application]
+
+Sensors
+-------
+
+.. _howto/sensor:KinesisAnalyticsV2StartApplicationCompletedSensor:
+
+Wait for an Amazon Managed Service for Apache Flink Application to start
+========================================================================
+
+To wait on the state of an Amazon Managed Service for Apache Flink Application
to start you can use
+:class:`~airflow.providers.amazon.aws.sensors.kinesis_analytics.KinesisAnalyticsV2StartApplicationCompletedSensor`.
+
+.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_kinesis_analytics.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_sensor_start_application]
+ :end-before: [END howto_sensor_start_application]
+
+.. _howto/sensor:KinesisAnalyticsV2StopApplicationCompletedSensor:
+
+Wait for an Amazon Managed Service for Apache Flink Application to stop
+========================================================================
+
+To wait on the state of an Amazon Managed Service for Apache Flink Application
to stop you can use
+:class:`~airflow.providers.amazon.aws.sensors.kinesis_analytics.KinesisAnalyticsV2StopApplicationCompletedSensor`.
+
+.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_kinesis_analytics.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_sensor_stop_application]
+ :end-before: [END howto_sensor_stop_application]
+
+Reference
+---------
+
+* `AWS boto3 library documentation for Amazon Managed Service for Apache Flink
(Kinesis Analytics)
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesisanalyticsv2.html>`__
diff --git
a/docs/integration-logos/aws/[email protected]
b/docs/integration-logos/aws/[email protected]
new file mode 100644
index 0000000000..0d22a7c5f2
Binary files /dev/null and
b/docs/integration-logos/aws/[email protected] differ
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 99589be246..b8b3c60a0d 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -906,6 +906,7 @@ KiB
Kibana
killMode
Kinesis
+kinesis
kinit
kms
knn
diff --git a/tests/providers/amazon/aws/hooks/test_kinesis_analytics.py
b/tests/providers/amazon/aws/hooks/test_kinesis_analytics.py
new file mode 100644
index 0000000000..4ed152cd3e
--- /dev/null
+++ b/tests/providers/amazon/aws/hooks/test_kinesis_analytics.py
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+
+from airflow.providers.amazon.aws.hooks.kinesis_analytics import
KinesisAnalyticsV2Hook
+
+
+class TestKinesisAnalyticsV2Hook:
+ @pytest.mark.parametrize(
+ "test_hook, service_name",
+ [pytest.param(KinesisAnalyticsV2Hook(), "kinesisanalyticsv2",
id="kinesisanalyticsv2")],
+ )
+ def test_kinesis_analytics_v2_hook(self, test_hook, service_name):
+ kinesis_analytics_hook = KinesisAnalyticsV2Hook()
+ assert kinesis_analytics_hook.conn is not None
diff --git a/tests/providers/amazon/aws/operators/test_kinesis_analytics.py
b/tests/providers/amazon/aws/operators/test_kinesis_analytics.py
new file mode 100644
index 0000000000..36734d4f56
--- /dev/null
+++ b/tests/providers/amazon/aws/operators/test_kinesis_analytics.py
@@ -0,0 +1,485 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Generator
+from unittest import mock
+
+import pytest
+from boto3 import client
+from moto import mock_aws
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.kinesis_analytics import
KinesisAnalyticsV2Hook
+from airflow.providers.amazon.aws.operators.kinesis_analytics import (
+ KinesisAnalyticsV2CreateApplicationOperator,
+ KinesisAnalyticsV2StartApplicationOperator,
+ KinesisAnalyticsV2StopApplicationOperator,
+)
+
+if TYPE_CHECKING:
+ from airflow.providers.amazon.aws.hooks.base_aws import BaseAwsConnection
+
+
+class TestKinesisAnalyticsV2CreateApplicationOperator:
+ APPLICATION_ARN =
"arn:aws:kinesisanalytics:us-east-1:918313644466:application/demo"
+ ROLE_ARN = "arn:aws:iam::123456789012:role/KinesisExecutionRole"
+
+ @pytest.fixture
+ def mock_conn(self) -> Generator[BaseAwsConnection, None, None]:
+ with mock.patch.object(KinesisAnalyticsV2Hook, "conn") as _conn:
+ _conn.create_application.return_value = {
+ "ApplicationDetail": {"ApplicationARN": self.APPLICATION_ARN}
+ }
+ yield _conn
+
+ @pytest.fixture
+ def kinesis_analytics_v2_hook(self) -> Generator[KinesisAnalyticsV2Hook,
None, None]:
+ with mock_aws():
+ hook = KinesisAnalyticsV2Hook(aws_conn_id="aws_default")
+ yield hook
+
+ def test_init(self):
+ op = KinesisAnalyticsV2CreateApplicationOperator(
+ task_id="create_application_operator",
+ application_name="demo",
+ runtime_environment="FLINK_18_9",
+ service_execution_role=self.ROLE_ARN,
+ aws_conn_id="fake-conn-id",
+ region_name="eu-west-2",
+ verify=True,
+ botocore_config={"read_timeout": 42},
+ )
+
+ assert op.application_name == "demo"
+ assert op.runtime_environment == "FLINK_18_9"
+ assert op.service_execution_role == self.ROLE_ARN
+ assert op.hook.client_type == "kinesisanalyticsv2"
+ assert op.hook.resource_type is None
+ assert op.hook.aws_conn_id == "fake-conn-id"
+ assert op.hook._region_name == "eu-west-2"
+ assert op.hook._verify is True
+ assert op.hook._config is not None
+ assert op.hook._config.read_timeout == 42
+
+ op = KinesisAnalyticsV2CreateApplicationOperator(
+ task_id="create_application_operator",
+ application_name="demo",
+ runtime_environment="FLINK_18_9",
+ service_execution_role="arn",
+ )
+
+ assert op.hook.aws_conn_id == "aws_default"
+ assert op.hook._region_name is None
+ assert op.hook._verify is None
+ assert op.hook._config is None
+
+ @mock.patch.object(KinesisAnalyticsV2Hook, "conn")
+ def test_create_application(self, kinesis_analytics_mock_conn):
+ self.op = KinesisAnalyticsV2CreateApplicationOperator(
+ task_id="create_application_operator",
+ application_name="demo",
+ application_description="demo app",
+ runtime_environment="FLINK_18_9",
+ service_execution_role=self.ROLE_ARN,
+ create_application_kwargs={
+ "ApplicationConfiguration": {
+ "FlinkApplicationConfiguration": {
+ "ParallelismConfiguration": {
+ "ConfigurationType": "CUSTOM",
+ "Parallelism": 2,
+ "ParallelismPerKPU": 1,
+ "AutoScalingEnabled": True,
+ }
+ }
+ }
+ },
+ )
+ self.op.execute({})
+ kinesis_analytics_mock_conn.create_application.assert_called_once_with(
+ ApplicationName="demo",
+ ApplicationDescription="demo app",
+ RuntimeEnvironment="FLINK_18_9",
+ ServiceExecutionRole=self.ROLE_ARN,
+ ApplicationConfiguration={
+ "FlinkApplicationConfiguration": {
+ "ParallelismConfiguration": {
+ "ConfigurationType": "CUSTOM",
+ "Parallelism": 2,
+ "ParallelismPerKPU": 1,
+ "AutoScalingEnabled": True,
+ }
+ }
+ },
+ )
+
+ @mock.patch.object(KinesisAnalyticsV2Hook, "conn")
+ def test_create_application_throw_error_when_invalid_arguments_provided(
+ self, kinesis_analytics_mock_conn
+ ):
+ operator = KinesisAnalyticsV2CreateApplicationOperator(
+ task_id="create_application_operator",
+ application_name="demo",
+ runtime_environment="FLINK_18_9",
+ service_execution_role=self.ROLE_ARN,
+ create_application_kwargs={"AppId": {"code":
"s3://test/flink.jar"}},
+ aws_conn_id="fake-conn-id",
+ region_name="eu-west-2",
+ verify=True,
+ botocore_config={"read_timeout": 42},
+ )
+
+ operator.defer = mock.MagicMock()
+ error_message = "Invalid arguments provided"
+
+ err_response = {"Error": {"Code": "InvalidArgumentException",
"Message": error_message}}
+
+ exception = client("kinesisanalyticsv2").exceptions.ClientError(
+ err_response, operation_name="CreateApplication"
+ )
+ returned_exception = type(exception)
+
+ kinesis_analytics_mock_conn.exceptions.InvalidArgumentException =
returned_exception
+ kinesis_analytics_mock_conn.create_application.side_effect = exception
+
+ with pytest.raises(AirflowException, match=error_message):
+ operator.execute({})
+
+
+class TestKinesisAnalyticsV2StartApplicationOperator:
+ APPLICATION_ARN =
"arn:aws:kinesisanalytics:us-east-1:123456789012:application/demo"
+ ROLE_ARN = "arn:aws:iam::123456789012:role/KinesisExecutionRole"
+ RUN_CONFIGURATION = {"FlinkRunConfiguration": {"AllowNonRestoredState":
True}}
+
+ @pytest.fixture
+ def mock_conn(self) -> Generator[BaseAwsConnection, None, None]:
+ with mock.patch.object(KinesisAnalyticsV2Hook, "conn") as _conn:
+ _conn.start_application.return_value = {}
+ _conn.describe_application.return_value = {
+ "ApplicationDetail": {"ApplicationARN": self.APPLICATION_ARN}
+ }
+ yield _conn
+
+ @pytest.fixture
+ def kinesis_analytics_v2_hook(self) -> Generator[KinesisAnalyticsV2Hook,
None, None]:
+ with mock_aws():
+ hook = KinesisAnalyticsV2Hook(aws_conn_id="aws_default")
+ yield hook
+
+ def setup_method(self):
+ self.operator = KinesisAnalyticsV2StartApplicationOperator(
+ task_id="start_application_operator",
+ application_name="demo",
+ run_configuration=self.RUN_CONFIGURATION,
+ aws_conn_id="fake-conn-id",
+ region_name="eu-west-2",
+ verify=True,
+ botocore_config={"read_timeout": 42},
+ )
+
+ self.operator.defer = mock.MagicMock()
+
+ def test_init(self):
+ op = KinesisAnalyticsV2StartApplicationOperator(
+ task_id="start_application_operator",
+ application_name="demo",
+ run_configuration=self.RUN_CONFIGURATION,
+ aws_conn_id="fake-conn-id",
+ region_name="eu-west-2",
+ verify=True,
+ botocore_config={"read_timeout": 42},
+ )
+
+ assert op.application_name == "demo"
+ assert op.run_configuration == self.RUN_CONFIGURATION
+ assert op.hook.client_type == "kinesisanalyticsv2"
+ assert op.hook.resource_type is None
+ assert op.hook.aws_conn_id == "fake-conn-id"
+ assert op.hook._region_name == "eu-west-2"
+ assert op.hook._verify is True
+ assert op.hook._config is not None
+ assert op.hook._config.read_timeout == 42
+
+ op = KinesisAnalyticsV2StartApplicationOperator(
+ task_id="start_application_operator",
+ application_name="demo",
+ run_configuration=self.RUN_CONFIGURATION,
+ )
+
+ assert op.hook.aws_conn_id == "aws_default"
+ assert op.hook._region_name is None
+ assert op.hook._verify is None
+ assert op.hook._config is None
+
+ @mock.patch.object(KinesisAnalyticsV2Hook, "conn")
+ def test_start_application(self, kinesis_analytics_mock_conn):
+ kinesis_analytics_mock_conn.describe_application.return_value = {
+ "ApplicationDetail": {"ApplicationARN": self.APPLICATION_ARN}
+ }
+ kinesis_analytics_mock_conn.start_application.return_value = {}
+
+ self.op = KinesisAnalyticsV2StartApplicationOperator(
+ task_id="start_application_operator",
+ application_name="demo",
+ run_configuration=self.RUN_CONFIGURATION,
+ )
+ self.op.wait_for_completion = False
+ response = self.op.execute({})
+
+ assert response == {"ApplicationARN": self.APPLICATION_ARN}
+
+ kinesis_analytics_mock_conn.start_application.assert_called_once_with(
+ ApplicationName="demo", RunConfiguration=self.RUN_CONFIGURATION
+ )
+
+ @pytest.mark.parametrize(
+ "wait_for_completion, deferrable",
+ [
+ pytest.param(False, False, id="no_wait"),
+ pytest.param(True, False, id="wait"),
+ pytest.param(False, True, id="defer"),
+ ],
+ )
+ @mock.patch.object(KinesisAnalyticsV2Hook, "get_waiter")
+ def test_start_application_wait_combinations(
+ self, _, wait_for_completion, deferrable, mock_conn,
kinesis_analytics_v2_hook
+ ):
+ self.operator.wait_for_completion = wait_for_completion
+ self.operator.deferrable = deferrable
+
+ response = self.operator.execute({})
+
+ assert response == {"ApplicationARN": self.APPLICATION_ARN}
+ assert kinesis_analytics_v2_hook.get_waiter.call_count ==
wait_for_completion
+ assert self.operator.defer.call_count == deferrable
+
+ @mock.patch.object(KinesisAnalyticsV2Hook, "conn")
+ def test_start_application_throw_error_when_invalid_config_provided(self,
kinesis_analytics_mock_conn):
+ operator = KinesisAnalyticsV2StartApplicationOperator(
+ task_id="start_application_operator",
+ application_name="demo",
+ run_configuration={
+ "ApplicationRestoreConfiguration": {
+ "ApplicationRestoreType": "SKIP_RESTORE",
+ }
+ },
+ aws_conn_id="fake-conn-id",
+ region_name="eu-west-2",
+ verify=True,
+ botocore_config={"read_timeout": 42},
+ )
+
+ operator.defer = mock.MagicMock()
+ error_message = "Invalid config provided"
+
+ err_response = {
+ "Error": {"Code": "InvalidApplicationConfigurationException",
"Message": error_message}
+ }
+
+ exception = client("kinesisanalyticsv2").exceptions.ClientError(
+ err_response, operation_name="StartApplication"
+ )
+ returned_exception = type(exception)
+
+ kinesis_analytics_mock_conn.exceptions.InvalidArgumentException =
returned_exception
+ kinesis_analytics_mock_conn.start_application.side_effect = exception
+
+ with pytest.raises(AirflowException, match=error_message):
+ operator.execute({})
+
+ @mock.patch.object(KinesisAnalyticsV2Hook, "conn")
+ def test_execute_complete(self, kinesis_analytics_mock_conn):
+ kinesis_analytics_mock_conn.describe_application.return_value = {
+ "ApplicationDetail": {"ApplicationARN": self.APPLICATION_ARN}
+ }
+
+ event = {"status": "success", "application_name": "demo"}
+
+ response = self.operator.execute_complete(context=None, event=event)
+
+ assert {"ApplicationARN": self.APPLICATION_ARN} == response
+
+ @mock.patch.object(KinesisAnalyticsV2Hook, "conn")
+ def test_execute_complete_failure(self, kinesis_analytics_mock_conn):
+ kinesis_analytics_mock_conn.describe_application.return_value = {
+ "ApplicationDetail": {"ApplicationARN": self.APPLICATION_ARN}
+ }
+
+ event = {"status": "error", "application_name": "demo"}
+
+ with pytest.raises(
+ AirflowException,
+ match="Error while starting AWS Managed Service for Apache Flink
application",
+ ):
+ self.operator.execute_complete(context=None, event=event)
+
+
+class TestKinesisAnalyticsV2StopApplicationOperator:
+ APPLICATION_ARN =
"arn:aws:kinesisanalytics:us-east-1:123456789012:application/demo"
+ ROLE_ARN = "arn:aws:iam::123456789012:role/KinesisExecutionRole"
+
+ @pytest.fixture
+ def mock_conn(self) -> Generator[BaseAwsConnection, None, None]:
+ with mock.patch.object(KinesisAnalyticsV2Hook, "conn") as _conn:
+ _conn.stop_application.return_value = {}
+ _conn.describe_application.return_value = {
+ "ApplicationDetail": {"ApplicationARN": self.APPLICATION_ARN}
+ }
+ yield _conn
+
+ @pytest.fixture
+ def kinesis_analytics_v2_hook(self) -> Generator[KinesisAnalyticsV2Hook,
None, None]:
+ with mock_aws():
+ hook = KinesisAnalyticsV2Hook(aws_conn_id="aws_default")
+ yield hook
+
+ def setup_method(self):
+ self.operator = KinesisAnalyticsV2StopApplicationOperator(
+ task_id="stop_application_operator",
+ application_name="demo",
+ force=False,
+ aws_conn_id="fake-conn-id",
+ region_name="eu-west-2",
+ verify=True,
+ botocore_config={"read_timeout": 42},
+ )
+
+ self.operator.defer = mock.MagicMock()
+
+ def test_init(self):
+ op = KinesisAnalyticsV2StopApplicationOperator(
+ task_id="stop_application_operator",
+ application_name="demo",
+ force=False,
+ aws_conn_id="fake-conn-id",
+ region_name="eu-west-2",
+ verify=True,
+ botocore_config={"read_timeout": 42},
+ )
+
+ assert op.application_name == "demo"
+ assert op.force is False
+ assert op.hook.client_type == "kinesisanalyticsv2"
+ assert op.hook.resource_type is None
+ assert op.hook.aws_conn_id == "fake-conn-id"
+ assert op.hook._region_name == "eu-west-2"
+ assert op.hook._verify is True
+ assert op.hook._config is not None
+ assert op.hook._config.read_timeout == 42
+
+ op = KinesisAnalyticsV2StopApplicationOperator(
+ task_id="stop_application_operator",
+ application_name="demo",
+ force=False,
+ )
+
+ assert op.hook.aws_conn_id == "aws_default"
+ assert op.hook._region_name is None
+ assert op.hook._verify is None
+ assert op.hook._config is None
+
+ @mock.patch.object(KinesisAnalyticsV2Hook, "conn")
+ def test_stop_application(self, kinesis_analytics_mock_conn):
+ kinesis_analytics_mock_conn.describe_application.return_value = {
+ "ApplicationDetail": {"ApplicationARN": self.APPLICATION_ARN}
+ }
+ kinesis_analytics_mock_conn.stop_application.return_value = {}
+
+ self.op = KinesisAnalyticsV2StopApplicationOperator(
+ task_id="stop_application_operator", application_name="demo",
force=False
+ )
+ self.op.wait_for_completion = False
+ response = self.op.execute({})
+
+ assert response == {"ApplicationARN": self.APPLICATION_ARN}
+
+ kinesis_analytics_mock_conn.stop_application.assert_called_once_with(
+ ApplicationName="demo", Force=False
+ )
+
+ @pytest.mark.parametrize(
+ "wait_for_completion, deferrable",
+ [
+ pytest.param(False, False, id="no_wait"),
+ pytest.param(True, False, id="wait"),
+ pytest.param(False, True, id="defer"),
+ ],
+ )
+ @mock.patch.object(KinesisAnalyticsV2Hook, "get_waiter")
+ def test_stop_application_wait_combinations(
+ self, _, wait_for_completion, deferrable, mock_conn,
kinesis_analytics_v2_hook
+ ):
+ self.operator.wait_for_completion = wait_for_completion
+ self.operator.deferrable = deferrable
+
+ response = self.operator.execute({})
+
+ assert response == {"ApplicationARN": self.APPLICATION_ARN}
+ assert kinesis_analytics_v2_hook.get_waiter.call_count ==
wait_for_completion
+ assert self.operator.defer.call_count == deferrable
+
+ @mock.patch.object(KinesisAnalyticsV2Hook, "conn")
+ def test_stop_application_throw_error_when_invalid_config_provided(self,
kinesis_analytics_mock_conn):
+ operator = KinesisAnalyticsV2StopApplicationOperator(
+ task_id="stop_application_operator",
+ application_name="demo",
+ force=False,
+ aws_conn_id="fake-conn-id",
+ region_name="eu-west-2",
+ verify=True,
+ botocore_config={"read_timeout": 42},
+ )
+
+ operator.defer = mock.MagicMock()
+ error_message = "resource not found"
+
+ err_response = {"Error": {"Code": "ResourceNotFoundException",
"Message": error_message}}
+
+ exception = client("kinesisanalyticsv2").exceptions.ClientError(
+ err_response, operation_name="StopApplication"
+ )
+ returned_exception = type(exception)
+
+ kinesis_analytics_mock_conn.exceptions.ResourceNotFoundException =
returned_exception
+ kinesis_analytics_mock_conn.stop_application.side_effect = exception
+
+ with pytest.raises(AirflowException, match=error_message):
+ operator.execute({})
+
+ @mock.patch.object(KinesisAnalyticsV2Hook, "conn")
+ def test_execute_complete(self, kinesis_analytics_mock_conn):
+ kinesis_analytics_mock_conn.describe_application.return_value = {
+ "ApplicationDetail": {"ApplicationARN": self.APPLICATION_ARN}
+ }
+
+ event = {"status": "success", "application_name": "demo"}
+
+ response = self.operator.execute_complete(context=None, event=event)
+
+ assert {"ApplicationARN": self.APPLICATION_ARN} == response
+
+ @mock.patch.object(KinesisAnalyticsV2Hook, "conn")
+ def test_execute_complete_failure(self, kinesis_analytics_mock_conn):
+ kinesis_analytics_mock_conn.describe_application.return_value = {
+ "ApplicationDetail": {"ApplicationARN": self.APPLICATION_ARN}
+ }
+ event = {"status": "error", "application_name": "demo"}
+
+ with pytest.raises(
+ AirflowException, match="Error while stopping AWS Managed Service
for Apache Flink application"
+ ):
+ self.operator.execute_complete(context=None, event=event)
diff --git a/tests/providers/amazon/aws/sensors/test_kinesis_analytics.py
b/tests/providers/amazon/aws/sensors/test_kinesis_analytics.py
new file mode 100644
index 0000000000..73a2cfdb77
--- /dev/null
+++ b/tests/providers/amazon/aws/sensors/test_kinesis_analytics.py
@@ -0,0 +1,172 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+import pytest
+
+from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.providers.amazon.aws.hooks.kinesis_analytics import
KinesisAnalyticsV2Hook
+from airflow.providers.amazon.aws.sensors.kinesis_analytics import (
+ KinesisAnalyticsV2StartApplicationCompletedSensor,
+ KinesisAnalyticsV2StopApplicationCompletedSensor,
+)
+
+
+class TestKinesisAnalyticsV2StartApplicationCompletedSensor:
+ SENSOR = KinesisAnalyticsV2StartApplicationCompletedSensor
+ APPLICATION_ARN =
"arn:aws:kinesisanalytics:us-east-1:123456789012:application/demo"
+
+ def setup_method(self):
+ self.default_op_kwargs = dict(
+ task_id="start_application_sensor",
+ application_name="demo",
+ poke_interval=5,
+ max_retries=1,
+ )
+ self.sensor = self.SENSOR(**self.default_op_kwargs, aws_conn_id=None)
+
+ def test_base_aws_op_attributes(self):
+ op = self.SENSOR(**self.default_op_kwargs)
+ assert op.hook.aws_conn_id == "aws_default"
+ assert op.hook._region_name is None
+ assert op.hook._verify is None
+ assert op.hook._config is None
+
+ op = self.SENSOR(
+ **self.default_op_kwargs,
+ aws_conn_id="aws-test-custom-conn",
+ region_name="eu-west-1",
+ verify=False,
+ botocore_config={"read_timeout": 42},
+ )
+ assert op.hook.aws_conn_id == "aws-test-custom-conn"
+ assert op.hook._region_name == "eu-west-1"
+ assert op.hook._verify is False
+ assert op.hook._config is not None
+ assert op.hook._config.read_timeout == 42
+
+ @pytest.mark.parametrize("state", SENSOR.SUCCESS_STATES)
+ @mock.patch.object(KinesisAnalyticsV2Hook, "conn")
+ def test_poke_success_state(self, mock_conn, state):
+ mock_conn.describe_application.return_value = {
+ "ApplicationDetail": {"ApplicationARN": self.APPLICATION_ARN,
"ApplicationStatus": state}
+ }
+
+ assert self.sensor.poke({}) is True
+
+ @pytest.mark.parametrize("state", SENSOR.INTERMEDIATE_STATES)
+ @mock.patch.object(KinesisAnalyticsV2Hook, "conn")
+ def test_intermediate_state(self, mock_conn, state):
+ mock_conn.describe_application.return_value = {
+ "ApplicationDetail": {"ApplicationARN": self.APPLICATION_ARN,
"ApplicationStatus": state}
+ }
+ assert self.sensor.poke({}) is False
+
+ @pytest.mark.parametrize(
+ "soft_fail, expected_exception",
+ [
+ pytest.param(False, AirflowException, id="not-soft-fail"),
+ pytest.param(True, AirflowSkipException, id="soft-fail"),
+ ],
+ )
+ @pytest.mark.parametrize("state", SENSOR.FAILURE_STATES)
+ @mock.patch.object(KinesisAnalyticsV2Hook, "conn")
+ def test_poke_failure_states(self, mock_conn, state, soft_fail,
expected_exception):
+ mock_conn.describe_application.return_value = {
+ "ApplicationDetail": {"ApplicationARN": self.APPLICATION_ARN,
"ApplicationStatus": state}
+ }
+
+ sensor = self.SENSOR(**self.default_op_kwargs, aws_conn_id=None,
soft_fail=soft_fail)
+
+ with pytest.raises(
+ expected_exception, match="AWS Managed Service for Apache Flink
application start failed"
+ ):
+ sensor.poke({})
+
+
+class TestKinesisAnalyticsV2StopApplicationCompletedSensor:
+ SENSOR = KinesisAnalyticsV2StopApplicationCompletedSensor
+ APPLICATION_ARN =
"arn:aws:kinesisanalytics:us-east-1:123456789012:application/demo"
+
+ def setup_method(self):
+ self.default_op_kwargs = dict(
+ task_id="stop_application_sensor",
+ application_name="demo",
+ poke_interval=5,
+ max_retries=1,
+ )
+ self.sensor = self.SENSOR(**self.default_op_kwargs, aws_conn_id=None)
+
+ def test_base_aws_op_attributes(self):
+ op = self.SENSOR(**self.default_op_kwargs)
+ assert op.hook.aws_conn_id == "aws_default"
+ assert op.hook._region_name is None
+ assert op.hook._verify is None
+ assert op.hook._config is None
+
+ op = self.SENSOR(
+ **self.default_op_kwargs,
+ aws_conn_id="aws-test-custom-conn",
+ region_name="eu-west-1",
+ verify=False,
+ botocore_config={"read_timeout": 42},
+ )
+ assert op.hook.aws_conn_id == "aws-test-custom-conn"
+ assert op.hook._region_name == "eu-west-1"
+ assert op.hook._verify is False
+ assert op.hook._config is not None
+ assert op.hook._config.read_timeout == 42
+
+ @pytest.mark.parametrize("state", SENSOR.SUCCESS_STATES)
+ @mock.patch.object(KinesisAnalyticsV2Hook, "conn")
+ def test_poke_success_state(self, mock_conn, state):
+ mock_conn.describe_application.return_value = {
+ "ApplicationDetail": {"ApplicationARN": self.APPLICATION_ARN,
"ApplicationStatus": state}
+ }
+
+ assert self.sensor.poke({}) is True
+
+ @pytest.mark.parametrize("state", SENSOR.INTERMEDIATE_STATES)
+ @mock.patch.object(KinesisAnalyticsV2Hook, "conn")
+ def test_intermediate_state(self, mock_conn, state):
+ mock_conn.describe_application.return_value = {
+ "ApplicationDetail": {"ApplicationARN": self.APPLICATION_ARN,
"ApplicationStatus": state}
+ }
+ assert self.sensor.poke({}) is False
+
+ @pytest.mark.parametrize(
+ "soft_fail, expected_exception",
+ [
+ pytest.param(False, AirflowException, id="not-soft-fail"),
+ pytest.param(True, AirflowSkipException, id="soft-fail"),
+ ],
+ )
+ @pytest.mark.parametrize("state", SENSOR.FAILURE_STATES)
+ @mock.patch.object(KinesisAnalyticsV2Hook, "conn")
+ def test_poke_failure_states(self, mock_conn, state, soft_fail,
expected_exception):
+ mock_conn.describe_application.return_value = {
+ "ApplicationDetail": {"ApplicationARN": self.APPLICATION_ARN,
"ApplicationStatus": state}
+ }
+
+ sensor = self.SENSOR(**self.default_op_kwargs, aws_conn_id=None,
soft_fail=soft_fail)
+
+ with pytest.raises(
+ expected_exception, match="AWS Managed Service for Apache Flink
application stop failed"
+ ):
+ sensor.poke({})
diff --git a/tests/providers/amazon/aws/triggers/test_kinesis_analytics.py
b/tests/providers/amazon/aws/triggers/test_kinesis_analytics.py
new file mode 100644
index 0000000000..3692905f22
--- /dev/null
+++ b/tests/providers/amazon/aws/triggers/test_kinesis_analytics.py
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+from unittest.mock import AsyncMock
+
+import pytest
+
+from airflow.providers.amazon.aws.hooks.kinesis_analytics import
KinesisAnalyticsV2Hook
+from airflow.providers.amazon.aws.triggers.kinesis_analytics import (
+ KinesisAnalyticsV2ApplicationOperationCompleteTrigger,
+)
+from airflow.triggers.base import TriggerEvent
+from tests.providers.amazon.aws.utils.test_waiter import
assert_expected_waiter_type
+
+BASE_TRIGGER_CLASSPATH =
"airflow.providers.amazon.aws.triggers.kinesis_analytics."
+
+
+class TestKinesisAnalyticsV2ApplicationOperationCompleteTrigger:
+ APPLICATION_NAME = "demo"
+
+ def test_serialization(self):
+ """Assert that arguments and classpath are correctly serialized."""
+ trigger = KinesisAnalyticsV2ApplicationOperationCompleteTrigger(
+ application_name=self.APPLICATION_NAME,
waiter_name="application_start_complete"
+ )
+ classpath, kwargs = trigger.serialize()
+ assert classpath == BASE_TRIGGER_CLASSPATH +
"KinesisAnalyticsV2ApplicationOperationCompleteTrigger"
+ assert kwargs.get("application_name") == self.APPLICATION_NAME
+
+ @pytest.mark.asyncio
+ @mock.patch.object(KinesisAnalyticsV2Hook, "get_waiter")
+ @mock.patch.object(KinesisAnalyticsV2Hook, "async_conn")
+ async def test_run_success_with_application_start_complete_waiter(self,
mock_async_conn, mock_get_waiter):
+ mock_async_conn.__aenter__.return_value = mock.MagicMock()
+ mock_get_waiter().wait = AsyncMock()
+ trigger = KinesisAnalyticsV2ApplicationOperationCompleteTrigger(
+ application_name=self.APPLICATION_NAME,
waiter_name="application_start_complete"
+ )
+
+ generator = trigger.run()
+ response = await generator.asend(None)
+
+ assert response == TriggerEvent({"status": "success",
"application_name": self.APPLICATION_NAME})
+ assert_expected_waiter_type(mock_get_waiter,
"application_start_complete")
+ mock_get_waiter().wait.assert_called_once()
+
+ @pytest.mark.asyncio
+ @mock.patch.object(KinesisAnalyticsV2Hook, "get_waiter")
+ @mock.patch.object(KinesisAnalyticsV2Hook, "async_conn")
+ async def test_run_success_with_application_stop_complete_waiter(self,
mock_async_conn, mock_get_waiter):
+ mock_async_conn.__aenter__.return_value = mock.MagicMock()
+ mock_get_waiter().wait = AsyncMock()
+ trigger = KinesisAnalyticsV2ApplicationOperationCompleteTrigger(
+ application_name=self.APPLICATION_NAME,
waiter_name="application_stop_waiter"
+ )
+
+ generator = trigger.run()
+ response = await generator.asend(None)
+
+ assert response == TriggerEvent({"status": "success",
"application_name": self.APPLICATION_NAME})
+ assert_expected_waiter_type(mock_get_waiter, "application_stop_waiter")
+ mock_get_waiter().wait.assert_called_once()
diff --git a/tests/providers/amazon/aws/waiters/test_kinesis_analytics.py
b/tests/providers/amazon/aws/waiters/test_kinesis_analytics.py
new file mode 100644
index 0000000000..31d8253833
--- /dev/null
+++ b/tests/providers/amazon/aws/waiters/test_kinesis_analytics.py
@@ -0,0 +1,106 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+import boto3
+import botocore
+import pytest
+
+from airflow.providers.amazon.aws.hooks.kinesis_analytics import
KinesisAnalyticsV2Hook
+from airflow.providers.amazon.aws.sensors.kinesis_analytics import (
+ KinesisAnalyticsV2StartApplicationCompletedSensor,
+ KinesisAnalyticsV2StopApplicationCompletedSensor,
+)
+
+
+class TestKinesisAnalyticsV2CustomWaiters:
+ def test_service_waiters(self):
+ assert "application_start_complete" in
KinesisAnalyticsV2Hook().list_waiters()
+ assert "application_stop_complete" in
KinesisAnalyticsV2Hook().list_waiters()
+
+
+class TestKinesisAnalyticsV2CustomWaitersBase:
+ @pytest.fixture(autouse=True)
+ def mock_conn(self, monkeypatch):
+ self.client = boto3.client("kinesisanalyticsv2")
+ monkeypatch.setattr(KinesisAnalyticsV2Hook, "conn", self.client)
+
+
+class
TestKinesisAnalyticsV2ApplicationStartWaiter(TestKinesisAnalyticsV2CustomWaitersBase):
+ APPLICATION_NAME = "demo"
+ WAITER_NAME = "application_start_complete"
+
+ @pytest.fixture
+ def mock_describe_application(self):
+ with mock.patch.object(self.client, "describe_application") as
mock_getter:
+ yield mock_getter
+
+ @pytest.mark.parametrize("state",
KinesisAnalyticsV2StartApplicationCompletedSensor.SUCCESS_STATES)
+ def test_start_application_complete(self, state,
mock_describe_application):
+ mock_describe_application.return_value = {"ApplicationDetail":
{"ApplicationStatus": state}}
+
+
KinesisAnalyticsV2Hook().get_waiter(self.WAITER_NAME).wait(ApplicationName=self.APPLICATION_NAME)
+
+ @pytest.mark.parametrize("state",
KinesisAnalyticsV2StartApplicationCompletedSensor.FAILURE_STATES)
+ def test_start_application_complete_failed(self, state,
mock_describe_application):
+ mock_describe_application.return_value = {"ApplicationDetail":
{"ApplicationStatus": state}}
+ with pytest.raises(botocore.exceptions.WaiterError):
+
KinesisAnalyticsV2Hook().get_waiter(self.WAITER_NAME).wait(ApplicationName=self.APPLICATION_NAME)
+
+ def test_start_application_complete_wait(self, mock_describe_application):
+ wait = {"ApplicationDetail": {"ApplicationStatus": "STARTING"}}
+ success = {"ApplicationDetail": {"ApplicationStatus": "RUNNING"}}
+
+ mock_describe_application.side_effect = [wait, wait, success]
+
+ KinesisAnalyticsV2Hook().get_waiter(self.WAITER_NAME).wait(
+ ApplicationName=self.APPLICATION_NAME, WaiterConfig={"Delay":
0.01, "MaxAttempts": 3}
+ )
+
+
+class
TestKinesisAnalyticsV2ApplicationStopWaiter(TestKinesisAnalyticsV2CustomWaitersBase):
+ APPLICATION_NAME = "demo"
+ WAITER_NAME = "application_stop_complete"
+
+ @pytest.fixture
+ def mock_describe_application(self):
+ with mock.patch.object(self.client, "describe_application") as
mock_getter:
+ yield mock_getter
+
+ @pytest.mark.parametrize("state",
KinesisAnalyticsV2StopApplicationCompletedSensor.SUCCESS_STATES)
+ def test_stop_application_complete(self, state, mock_describe_application):
+ mock_describe_application.return_value = {"ApplicationDetail":
{"ApplicationStatus": state}}
+
+
KinesisAnalyticsV2Hook().get_waiter(self.WAITER_NAME).wait(ApplicationName=self.APPLICATION_NAME)
+
+ @pytest.mark.parametrize("state",
KinesisAnalyticsV2StopApplicationCompletedSensor.FAILURE_STATES)
+ def test_stop_application_complete_failed(self, state,
mock_describe_application):
+ mock_describe_application.return_value = {"ApplicationDetail":
{"ApplicationStatus": state}}
+ with pytest.raises(botocore.exceptions.WaiterError):
+
KinesisAnalyticsV2Hook().get_waiter(self.WAITER_NAME).wait(ApplicationName=self.APPLICATION_NAME)
+
+ def test_stop_application_complete_wait(self, mock_describe_application):
+ wait = {"ApplicationDetail": {"ApplicationStatus": "STOPPING"}}
+ success = {"ApplicationDetail": {"ApplicationStatus": "READY"}}
+
+ mock_describe_application.side_effect = [wait, wait, success]
+
+ KinesisAnalyticsV2Hook().get_waiter(self.WAITER_NAME).wait(
+ ApplicationName=self.APPLICATION_NAME, WaiterConfig={"Delay":
0.01, "MaxAttempts": 3}
+ )
diff --git a/tests/system/providers/amazon/aws/example_kinesis_analytics.py
b/tests/system/providers/amazon/aws/example_kinesis_analytics.py
new file mode 100644
index 0000000000..0d93f1e336
--- /dev/null
+++ b/tests/system/providers/amazon/aws/example_kinesis_analytics.py
@@ -0,0 +1,272 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import datetime as dt
+import json
+import random
+from datetime import datetime
+
+import boto3
+
+from airflow import DAG, settings
+from airflow.decorators import task, task_group
+from airflow.models import Connection
+from airflow.models.baseoperator import chain
+from airflow.providers.amazon.aws.hooks.kinesis_analytics import
KinesisAnalyticsV2Hook
+from airflow.providers.amazon.aws.operators.kinesis_analytics import (
+ KinesisAnalyticsV2CreateApplicationOperator,
+ KinesisAnalyticsV2StartApplicationOperator,
+ KinesisAnalyticsV2StopApplicationOperator,
+)
+from airflow.providers.amazon.aws.operators.s3 import (
+ S3CreateBucketOperator,
+ S3DeleteBucketOperator,
+)
+from airflow.providers.amazon.aws.sensors.kinesis_analytics import (
+ KinesisAnalyticsV2StartApplicationCompletedSensor,
+ KinesisAnalyticsV2StopApplicationCompletedSensor,
+)
+from airflow.providers.amazon.aws.transfers.http_to_s3 import HttpToS3Operator
+from airflow.utils.trigger_rule import TriggerRule
+from tests.system.providers.amazon.aws.utils import SystemTestContextBuilder
+
+ROLE_ARN_KEY = "ROLE_ARN"
+sys_test_context_task =
SystemTestContextBuilder().add_variable(ROLE_ARN_KEY).build()
+
+DAG_ID = "example_kinesis_analytics"
+
+
+@task_group
+def kinesis_analytics_v2_workflow():
+ # [START howto_operator_create_application]
+ create_application = KinesisAnalyticsV2CreateApplicationOperator(
+ task_id="create_application",
+ application_name=application_name,
+ runtime_environment="FLINK-1_18",
+ service_execution_role=test_context[ROLE_ARN_KEY],
+ create_application_kwargs={
+ "ApplicationConfiguration": {
+ "FlinkApplicationConfiguration": {
+ "ParallelismConfiguration": {
+ "ConfigurationType": "CUSTOM",
+ "Parallelism": 2,
+ "ParallelismPerKPU": 1,
+ "AutoScalingEnabled": False,
+ }
+ },
+ "EnvironmentProperties": {
+ "PropertyGroups": [
+ {
+ "PropertyGroupId": "BlueprintMetadata",
+ "PropertyMap": {
+ "AWSRegion": region_name,
+ "BlueprintName":
"KDS_FLINK-DATASTREAM-JAVA_S3",
+ "BucketName": f"s3://{bucket_name}/",
+ "PartitionFormat": "yyyy-MM-dd-HH",
+ "StreamInitialPosition": "TRIM_HORIZON",
+ "StreamName": stream_name,
+ },
+ },
+ ]
+ },
+ "ApplicationCodeConfiguration": {
+ "CodeContent": {
+ "S3ContentLocation": {
+ "BucketARN": f"arn:aws:s3:::{bucket_name}",
+ "FileKey":
"code/kds-to-s3-datastream-java-1.0.1.jar",
+ },
+ },
+ "CodeContentType": "ZIPFILE",
+ },
+ }
+ },
+ )
+ # [END howto_operator_create_application]
+
+ # [START howto_operator_start_application]
+ start_application = KinesisAnalyticsV2StartApplicationOperator(
+ task_id="start_application",
+ application_name=application_name,
+ )
+ # [END howto_operator_start_application]
+ start_application.wait_for_completion = False
+
+ # [START howto_sensor_start_application]
+ await_start_application =
KinesisAnalyticsV2StartApplicationCompletedSensor(
+ task_id="await_start_application",
+ application_name=application_name,
+ )
+ # [END howto_sensor_start_application]
+
+ # [START howto_operator_stop_application]
+ stop_application = KinesisAnalyticsV2StopApplicationOperator(
+ task_id="stop_application",
+ application_name=application_name,
+ )
+ # [END howto_operator_stop_application]
+ stop_application.wait_for_completion = False
+
+ # [START howto_sensor_stop_application]
+ await_stop_application = KinesisAnalyticsV2StopApplicationCompletedSensor(
+ task_id="await_stop_application",
+ application_name=application_name,
+ )
+ # [END howto_sensor_stop_application]
+
+ @task(trigger_rule=TriggerRule.ALL_DONE)
+ def delete_application(app_name: str):
+ kinesis_analytics_v2_hook = KinesisAnalyticsV2Hook()
+ response =
kinesis_analytics_v2_hook.conn.describe_application(ApplicationName=app_name)
+ kinesis_analytics_v2_hook.conn.delete_application(
+ ApplicationName=app_name,
CreateTimestamp=response["ApplicationDetail"]["CreateTimestamp"]
+ )
+
+ chain(
+ create_application,
+ start_application,
+ await_start_application,
+ stop_application,
+ await_stop_application,
+ delete_application(application_name),
+ )
+
+
+@task_group
+def copy_jar_to_s3(bucket: str):
+ """
+
+ Copy application code to S3 using HttpToS3Operator.
+
+ :param bucket: Name of the Amazon S3 bucket.
+ """
+
+ @task
+ def create_connection(conn_id):
+ conn = Connection(
+ conn_id=conn_id,
+ conn_type="http",
+ host="https://github.com/",
+ )
+ session = settings.Session()
+ session.add(conn)
+ session.commit()
+
+ @task(trigger_rule=TriggerRule.ALL_DONE)
+ def delete_connection(conn_id: str):
+ session = settings.Session()
+ conn_to_details = session.query(Connection).filter(Connection.conn_id
== conn_id).first()
+ session.delete(conn_to_details)
+ session.commit()
+
+ copy_jar_file = HttpToS3Operator(
+ task_id="copy_jar_file",
+ http_conn_id=http_conn_id,
+
endpoint="awslabs/managed-service-for-apache-flink-blueprints/releases/download/v2.0.1/kds-to-s3-datastream-java-1.0.1.jar",
+ s3_bucket=bucket,
+ s3_key="code/kds-to-s3-datastream-java-1.0.1.jar",
+ )
+
+ chain(create_connection(http_conn_id), copy_jar_file,
delete_connection(http_conn_id))
+
+
+@task
+def create_kinesis_stream(stream: str, region: str):
+ """
+ Create kinesis stream and put some sample data.
+
+ :param stream: Name of the kinesis stream.
+ :param region: Region name
+ """
+ client = boto3.client("kinesis", region_name=region)
+ client.create_stream(StreamName=stream, ShardCount=1,
StreamModeDetails={"StreamMode": "PROVISIONED"})
+ account_id = boto3.client("sts").get_caller_identity()["Account"]
+ waiter = client.get_waiter("stream_exists")
+ waiter.wait(StreamName=stream, WaiterConfig={"Delay": 60, "MaxAttempts":
4})
+
+ def get_data():
+ return {
+ "event_time": dt.datetime.now().isoformat(),
+ "ticker": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
+ "price": round(random.random() * 100, 2),
+ }
+
+ for _ in range(2):
+ data = get_data()
+ client.put_record(
+ StreamARN=f"arn:aws:kinesis:{region}:{account_id}:stream/{stream}",
+ Data=json.dumps(data),
+ PartitionKey=data["ticker"],
+ )
+
+
+@task(trigger_rule=TriggerRule.ALL_DONE)
+def delete_kinesis_stream(stream: str, region: str):
+ client = boto3.client("kinesis", region_name=region)
+ client.delete_stream(StreamName=stream, EnforceConsumerDeletion=True)
+
+
+with DAG(
+ dag_id=DAG_ID,
+ schedule="@once",
+ start_date=datetime(2021, 1, 1),
+ tags=["example"],
+ catchup=False,
+) as dag:
+ test_context = sys_test_context_task()
+ env_id = test_context["ENV_ID"]
+ bucket_name = f"{env_id}-kinesis-analytics"
+ application_name = f"{env_id}-test-app"
+ http_conn_id = f"{env_id}-git"
+ region_name = boto3.session.Session().region_name
+ stream_name = f"{env_id}-test-stream"
+
+ create_bucket = S3CreateBucketOperator(
+ task_id="create_bucket",
+ bucket_name=bucket_name,
+ )
+
+ delete_bucket = S3DeleteBucketOperator(
+ task_id="delete_bucket",
+ trigger_rule=TriggerRule.ALL_DONE,
+ bucket_name=bucket_name,
+ force_delete=True,
+ )
+
+ chain(
+ # TEST SETUP
+ test_context,
+ create_bucket,
+ create_kinesis_stream(stream=stream_name, region=region_name),
+ copy_jar_to_s3(bucket=bucket_name),
+ # TEST BODY
+ kinesis_analytics_v2_workflow(),
+ # TEST TEARDOWN
+ delete_kinesis_stream(stream=stream_name, region=region_name),
+ delete_bucket,
+ )
+
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)