This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new bd2f156bc8 D205 Support - Providers: Amazon/AWS (#32224) bd2f156bc8 is described below commit bd2f156bc842f201c736709dae65d04f08573bae Author: D. Ferruzzi <ferru...@amazon.com> AuthorDate: Wed Jun 28 03:02:07 2023 -0700 D205 Support - Providers: Amazon/AWS (#32224) * D205 Support - Providers: Amazon/AWS * CI Fixes --- airflow/providers/amazon/aws/hooks/appflow.py | 4 +- airflow/providers/amazon/aws/hooks/batch_client.py | 17 ++++-- .../providers/amazon/aws/hooks/batch_waiters.py | 17 +++--- .../providers/amazon/aws/hooks/cloud_formation.py | 1 + airflow/providers/amazon/aws/hooks/datasync.py | 6 +- airflow/providers/amazon/aws/hooks/dms.py | 4 +- airflow/providers/amazon/aws/hooks/dynamodb.py | 1 + airflow/providers/amazon/aws/hooks/ec2.py | 1 + airflow/providers/amazon/aws/hooks/ecr.py | 1 + airflow/providers/amazon/aws/hooks/ecs.py | 6 +- airflow/providers/amazon/aws/hooks/eks.py | 1 + .../aws/hooks/elasticache_replication_group.py | 1 + airflow/providers/amazon/aws/hooks/emr.py | 14 +++-- airflow/providers/amazon/aws/hooks/glue.py | 10 ++-- airflow/providers/amazon/aws/hooks/glue_catalog.py | 1 + airflow/providers/amazon/aws/hooks/glue_crawler.py | 5 +- airflow/providers/amazon/aws/hooks/kinesis.py | 1 + .../providers/amazon/aws/hooks/lambda_function.py | 1 + airflow/providers/amazon/aws/hooks/logs.py | 8 +-- airflow/providers/amazon/aws/hooks/quicksight.py | 1 + airflow/providers/amazon/aws/hooks/rds.py | 19 +++---- .../providers/amazon/aws/hooks/redshift_data.py | 1 + airflow/providers/amazon/aws/hooks/s3.py | 25 +++------ airflow/providers/amazon/aws/hooks/ses.py | 1 + airflow/providers/amazon/aws/hooks/sns.py | 1 + airflow/providers/amazon/aws/hooks/sqs.py | 1 + airflow/providers/amazon/aws/hooks/ssm.py | 4 +- .../providers/amazon/aws/hooks/step_function.py | 1 + airflow/providers/amazon/aws/hooks/sts.py | 1 + airflow/providers/amazon/aws/links/emr.py | 5 +- .../providers/amazon/aws/log/s3_task_handler.py | 13 ++--- airflow/providers/amazon/aws/operators/eks.py | 2 +- airflow/providers/amazon/aws/operators/emr.py | 9 +-- .../providers/amazon/aws/operators/glue_crawler.py | 8 ++- .../amazon/aws/operators/lambda_function.py | 5 +- .../providers/amazon/aws/operators/quicksight.py | 3 +- airflow/providers/amazon/aws/operators/rds.py | 1 + airflow/providers/amazon/aws/operators/s3.py | 11 ++-- .../providers/amazon/aws/operators/sagemaker.py | 65 +++++++++++++--------- airflow/providers/amazon/aws/sensors/athena.py | 3 +- airflow/providers/amazon/aws/sensors/batch.py | 13 ++--- airflow/providers/amazon/aws/sensors/ec2.py | 3 +- airflow/providers/amazon/aws/sensors/ecs.py | 9 +-- airflow/providers/amazon/aws/sensors/eks.py | 1 + airflow/providers/amazon/aws/sensors/emr.py | 20 ++----- .../amazon/aws/sensors/lambda_function.py | 3 +- airflow/providers/amazon/aws/sensors/s3.py | 17 +++--- airflow/providers/amazon/aws/sensors/sagemaker.py | 18 ++---- airflow/providers/amazon/aws/sensors/sqs.py | 1 + .../providers/amazon/aws/sensors/step_function.py | 4 +- .../amazon/aws/transfers/dynamodb_to_s3.py | 13 ++--- .../providers/amazon/aws/transfers/ftp_to_s3.py | 3 +- .../amazon/aws/transfers/hive_to_dynamodb.py | 7 ++- .../providers/amazon/aws/transfers/sftp_to_s3.py | 3 +- .../providers/amazon/aws/transfers/sql_to_s3.py | 1 + airflow/providers/amazon/aws/triggers/batch.py | 11 ++-- airflow/providers/amazon/aws/triggers/ec2.py | 3 +- airflow/providers/amazon/aws/triggers/ecs.py | 1 + airflow/providers/amazon/aws/triggers/eks.py | 6 +- airflow/providers/amazon/aws/triggers/emr.py | 13 ++--- airflow/providers/amazon/aws/utils/tags.py | 3 + .../providers/amazon/aws/utils/task_log_fetcher.py | 9 ++- airflow/providers/amazon/aws/utils/waiter.py | 3 +- .../amazon/aws/utils/waiter_with_logging.py | 25 +++++---- 64 files changed, 230 insertions(+), 240 deletions(-) diff --git a/airflow/providers/amazon/aws/hooks/appflow.py b/airflow/providers/amazon/aws/hooks/appflow.py index cc35688021..f976f40692 100644 --- a/airflow/providers/amazon/aws/hooks/appflow.py +++ b/airflow/providers/amazon/aws/hooks/appflow.py @@ -29,6 +29,7 @@ if TYPE_CHECKING: class AppflowHook(AwsBaseHook): """ Interact with Amazon Appflow. + Provide thin wrapper around :external+boto3:py:class:`boto3.client("appflow") <Appflow.Client>`. Additional arguments (such as ``aws_conn_id``) may be specified and @@ -80,8 +81,7 @@ class AppflowHook(AwsBaseHook): self, flow_name: str, filter_tasks: list[TaskTypeDef], set_trigger_ondemand: bool = False ) -> None: """ - Update the flow task filter. - All filters will be removed if an empty array is passed to filter_tasks. + Update the flow task filter; all filters will be removed if an empty array is passed to filter_tasks. :param flow_name: The flow name :param filter_tasks: List flow tasks to be added diff --git a/airflow/providers/amazon/aws/hooks/batch_client.py b/airflow/providers/amazon/aws/hooks/batch_client.py index be24aadb67..e585297d3a 100644 --- a/airflow/providers/amazon/aws/hooks/batch_client.py +++ b/airflow/providers/amazon/aws/hooks/batch_client.py @@ -44,6 +44,7 @@ from airflow.typing_compat import Protocol, runtime_checkable class BatchProtocol(Protocol): """ A structured Protocol for ``boto3.client('batch') -> botocore.client.Batch``. + This is used for type hints on :py:meth:`.BatchClient.client`; it covers only the subset of client methods required. @@ -140,6 +141,7 @@ class BatchProtocol(Protocol): class BatchClientHook(AwsBaseHook): """ Interact with AWS Batch. + Provide thick wrapper around :external+boto3:py:class:`boto3.client("batch") <Batch.Client>`. :param max_retries: exponential back-off retries, 4200 = 48 hours; @@ -289,8 +291,9 @@ class BatchClientHook(AwsBaseHook): def poll_for_job_running(self, job_id: str, delay: int | float | None = None) -> None: """ - Poll for job running. The status that indicates a job is running or - already complete are: 'RUNNING'|'SUCCEEDED'|'FAILED'. + Poll for job running. + + The status that indicates a job is running or already complete are: 'RUNNING'|'SUCCEEDED'|'FAILED'. So the status options that this will wait for are the transitions from: 'SUBMITTED'>'PENDING'>'RUNNABLE'>'STARTING'>'RUNNING'|'SUCCEEDED'|'FAILED' @@ -311,8 +314,9 @@ class BatchClientHook(AwsBaseHook): def poll_for_job_complete(self, job_id: str, delay: int | float | None = None) -> None: """ - Poll for job completion. The status that indicates job completion - are: 'SUCCEEDED'|'FAILED'. + Poll for job completion. + + The status that indicates job completion are: 'SUCCEEDED'|'FAILED'. So the status options that this will wait for are the transitions from: 'SUBMITTED'>'PENDING'>'RUNNABLE'>'STARTING'>'RUNNING'>'SUCCEEDED'|'FAILED' @@ -555,8 +559,9 @@ class BatchClientHook(AwsBaseHook): @staticmethod def exponential_delay(tries: int) -> float: """ - An exponential back-off delay, with random jitter. There is a maximum - interval of 10 minutes (with random jitter between 3 and 10 minutes). + An exponential back-off delay, with random jitter. + + There is a maximum interval of 10 minutes (with random jitter between 3 and 10 minutes). This is used in the :py:meth:`.poll_for_job_status` method. :param tries: Number of tries diff --git a/airflow/providers/amazon/aws/hooks/batch_waiters.py b/airflow/providers/amazon/aws/hooks/batch_waiters.py index ec01fbadd9..c93a00b053 100644 --- a/airflow/providers/amazon/aws/hooks/batch_waiters.py +++ b/airflow/providers/amazon/aws/hooks/batch_waiters.py @@ -122,10 +122,11 @@ class BatchWaitersHook(BatchClientHook): @property def waiter_config(self) -> dict: """ - An immutable waiter configuration for this instance; a ``deepcopy`` is returned by this - property. During the init for BatchWaiters, the waiter_config is used to build a - waiter_model and this only occurs during the class init, to avoid any accidental - mutations of waiter_config leaking into the waiter_model. + An immutable waiter configuration for this instance; a ``deepcopy`` is returned by this property. + + During the init for BatchWaiters, the waiter_config is used to build a + waiter_model and this only occurs during the class init, to avoid any + accidental mutations of waiter_config leaking into the waiter_model. :return: a waiter configuration for AWS Batch services """ @@ -193,9 +194,11 @@ class BatchWaitersHook(BatchClientHook): get_batch_log_fetcher: Callable[[str], AwsTaskLogFetcher | None] | None = None, ) -> None: """ - Wait for Batch job to complete. This assumes that the ``.waiter_model`` is configured - using some variation of the ``.default_config`` so that it can generate waiters with the - following names: "JobExists", "JobRunning" and "JobComplete". + Wait for Batch job to complete. + + This assumes that the ``.waiter_model`` is configured using some + variation of the ``.default_config`` so that it can generate waiters + with the following names: "JobExists", "JobRunning" and "JobComplete". :param job_id: a Batch job ID diff --git a/airflow/providers/amazon/aws/hooks/cloud_formation.py b/airflow/providers/amazon/aws/hooks/cloud_formation.py index b39ee7be56..1772e6501c 100644 --- a/airflow/providers/amazon/aws/hooks/cloud_formation.py +++ b/airflow/providers/amazon/aws/hooks/cloud_formation.py @@ -27,6 +27,7 @@ from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook class CloudFormationHook(AwsBaseHook): """ Interact with AWS CloudFormation. + Provide thin wrapper around :external+boto3:py:class:`boto3.client("cloudformation") <CloudFormation.Client>`. diff --git a/airflow/providers/amazon/aws/hooks/datasync.py b/airflow/providers/amazon/aws/hooks/datasync.py index 0c017a9312..1fd06eeefe 100644 --- a/airflow/providers/amazon/aws/hooks/datasync.py +++ b/airflow/providers/amazon/aws/hooks/datasync.py @@ -27,6 +27,7 @@ from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook class DataSyncHook(AwsBaseHook): """ Interact with AWS DataSync. + Provide thick wrapper around :external+boto3:py:class:`boto3.client("datasync") <DataSync.Client>`. Additional arguments (such as ``aws_conn_id``) may be specified and @@ -198,8 +199,7 @@ class DataSyncHook(AwsBaseHook): destination_location_arns: list, ) -> list: """ - Return list of TaskArns for which use any one of the specified - source LocationArns and any one of the specified destination LocationArns. + Return list of TaskArns which use both a specified source and destination LocationArns. :param source_location_arns: List of source LocationArns. :param destination_location_arns: List of destination LocationArns. @@ -224,6 +224,7 @@ class DataSyncHook(AwsBaseHook): def start_task_execution(self, task_arn: str, **kwargs) -> str: """ Start a TaskExecution for the specified task_arn. + Each task can have at most one TaskExecution. Additional keyword arguments send to ``start_task_execution`` boto3 method. @@ -300,6 +301,7 @@ class DataSyncHook(AwsBaseHook): def wait_for_task_execution(self, task_execution_arn: str, max_iterations: int = 60) -> bool: """ Wait for Task Execution status to be complete (SUCCESS/ERROR). + The ``task_execution_arn`` must exist, or a boto3 ClientError will be raised. :param task_execution_arn: TaskExecutionArn diff --git a/airflow/providers/amazon/aws/hooks/dms.py b/airflow/providers/amazon/aws/hooks/dms.py index 2d9b75bbf4..6950edadc8 100644 --- a/airflow/providers/amazon/aws/hooks/dms.py +++ b/airflow/providers/amazon/aws/hooks/dms.py @@ -35,6 +35,7 @@ class DmsTaskWaiterStatus(str, Enum): class DmsHook(AwsBaseHook): """ Interact with AWS Database Migration Service (DMS). + Provide thin wrapper around :external+boto3:py:class:`boto3.client("dms") <DatabaseMigrationService.Client>`. @@ -197,8 +198,7 @@ class DmsHook(AwsBaseHook): def wait_for_task_status(self, replication_task_arn: str, status: DmsTaskWaiterStatus): """ - Waits for replication task to reach status. - Supported statuses: deleted, ready, running, stopped. + Waits for replication task to reach status; supported statuses: deleted, ready, running, stopped. :param status: Status to wait for :param replication_task_arn: Replication task ARN diff --git a/airflow/providers/amazon/aws/hooks/dynamodb.py b/airflow/providers/amazon/aws/hooks/dynamodb.py index 8710cff317..db38b401af 100644 --- a/airflow/providers/amazon/aws/hooks/dynamodb.py +++ b/airflow/providers/amazon/aws/hooks/dynamodb.py @@ -27,6 +27,7 @@ from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook class DynamoDBHook(AwsBaseHook): """ Interact with Amazon DynamoDB. + Provide thick wrapper around :external+boto3:py:class:`boto3.resource("dynamodb") <DynamoDB.ServiceResource>`. diff --git a/airflow/providers/amazon/aws/hooks/ec2.py b/airflow/providers/amazon/aws/hooks/ec2.py index 13bcb60196..c22f258dfd 100644 --- a/airflow/providers/amazon/aws/hooks/ec2.py +++ b/airflow/providers/amazon/aws/hooks/ec2.py @@ -50,6 +50,7 @@ def only_client_type(func): class EC2Hook(AwsBaseHook): """ Interact with Amazon Elastic Compute Cloud (EC2). + Provide thick wrapper around :external+boto3:py:class:`boto3.client("ec2") <EC2.Client>` or :external+boto3:py:class:`boto3.resource("ec2") <EC2.ServiceResource>`. diff --git a/airflow/providers/amazon/aws/hooks/ecr.py b/airflow/providers/amazon/aws/hooks/ecr.py index 8f41f33635..f60d4d070d 100644 --- a/airflow/providers/amazon/aws/hooks/ecr.py +++ b/airflow/providers/amazon/aws/hooks/ecr.py @@ -51,6 +51,7 @@ class EcrCredentials: class EcrHook(AwsBaseHook): """ Interact with Amazon Elastic Container Registry (ECR). + Provide thin wrapper around :external+boto3:py:class:`boto3.client("ecr") <ECR.Client>`. Additional arguments (such as ``aws_conn_id``) may be specified and diff --git a/airflow/providers/amazon/aws/hooks/ecs.py b/airflow/providers/amazon/aws/hooks/ecs.py index ad45da5d0a..6d6f8cd821 100644 --- a/airflow/providers/amazon/aws/hooks/ecs.py +++ b/airflow/providers/amazon/aws/hooks/ecs.py @@ -81,6 +81,7 @@ class EcsTaskStates(_StringCompareEnum): class EcsHook(AwsGenericHook): """ Interact with Amazon Elastic Container Service (ECS). + Provide thin wrapper around :external+boto3:py:class:`boto3.client("ecs") <ECS.Client>`. Additional arguments (such as ``aws_conn_id``) may be specified and @@ -137,8 +138,9 @@ class EcsHook(AwsGenericHook): @runtime_checkable class EcsProtocol(Protocol): """ - A structured Protocol for ``boto3.client('ecs')``. This is used for type hints on - :py:meth:`.EcsOperator.client`. + A structured Protocol for ``boto3.client('ecs')``. + + This is used for type hints on :py:meth:`.EcsOperator.client`. .. seealso:: diff --git a/airflow/providers/amazon/aws/hooks/eks.py b/airflow/providers/amazon/aws/hooks/eks.py index 331020f47d..41cc49c465 100644 --- a/airflow/providers/amazon/aws/hooks/eks.py +++ b/airflow/providers/amazon/aws/hooks/eks.py @@ -78,6 +78,7 @@ class NodegroupStates(Enum): class EksHook(AwsBaseHook): """ Interact with Amazon Elastic Kubernetes Service (EKS). + Provide thin wrapper around :external+boto3:py:class:`boto3.client("eks") <EKS.Client>`. Additional arguments (such as ``aws_conn_id``) may be specified and diff --git a/airflow/providers/amazon/aws/hooks/elasticache_replication_group.py b/airflow/providers/amazon/aws/hooks/elasticache_replication_group.py index 44b20bcdce..ae66e95352 100644 --- a/airflow/providers/amazon/aws/hooks/elasticache_replication_group.py +++ b/airflow/providers/amazon/aws/hooks/elasticache_replication_group.py @@ -26,6 +26,7 @@ from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook class ElastiCacheReplicationGroupHook(AwsBaseHook): """ Interact with Amazon ElastiCache. + Provide thick wrapper around :external+boto3:py:class:`boto3.client("elasticache") <ElastiCache.Client>`. :param max_retries: Max retries for checking availability of and deleting replication group diff --git a/airflow/providers/amazon/aws/hooks/emr.py b/airflow/providers/amazon/aws/hooks/emr.py index 9685678ec1..2fb35a34a6 100644 --- a/airflow/providers/amazon/aws/hooks/emr.py +++ b/airflow/providers/amazon/aws/hooks/emr.py @@ -32,6 +32,7 @@ from airflow.utils.helpers import prune_dict class EmrHook(AwsBaseHook): """ Interact with Amazon Elastic MapReduce Service (EMR). + Provide thick wrapper around :external+boto3:py:class:`boto3.client("emr") <EMR.Client>`. :param emr_conn_id: :ref:`Amazon Elastic MapReduce Connection <howto/connection:emr>`. @@ -57,8 +58,7 @@ class EmrHook(AwsBaseHook): def get_cluster_id_by_name(self, emr_cluster_name: str, cluster_states: list[str]) -> str | None: """ - Fetch id of EMR cluster with given name and (optional) states. - Will return only if single id is found. + Fetch id of EMR cluster with given name and (optional) states; returns only if single id is found. .. seealso:: - :external+boto3:py:meth:`EMR.Client.list_clusters` @@ -233,6 +233,7 @@ class EmrHook(AwsBaseHook): class EmrServerlessHook(AwsBaseHook): """ Interact with Amazon EMR Serverless. + Provide thin wrapper around :py:class:`boto3.client("emr-serverless") <EMRServerless.Client>`. Additional arguments (such as ``aws_conn_id``) may be specified and @@ -257,8 +258,8 @@ class EmrServerlessHook(AwsBaseHook): def cancel_running_jobs(self, application_id: str, waiter_config: dict = {}): """ - List all jobs in an intermediate state and cancel them. - Then wait for those jobs to reach a terminal state. + List all jobs in an intermediate state, cancel them, then wait for those jobs to reach terminal state. + Note: if new jobs are triggered while this operation is ongoing, it's going to time out and return an error. """ @@ -295,6 +296,7 @@ class EmrServerlessHook(AwsBaseHook): class EmrContainerHook(AwsBaseHook): """ Interact with Amazon EMR Containers (Amazon EMR on EKS). + Provide thick wrapper around :py:class:`boto3.client("emr-containers") <EMRContainers.Client>`. :param virtual_cluster_id: Cluster ID of the EMR on EKS virtual cluster @@ -366,6 +368,7 @@ class EmrContainerHook(AwsBaseHook): ) -> str: """ Submit a job to the EMR Containers API and return the job ID. + A job run is a unit of work, such as a Spark jar, PySpark script, or SparkSQL query, that you submit to Amazon EMR on EKS. @@ -463,8 +466,7 @@ class EmrContainerHook(AwsBaseHook): max_polling_attempts: int | None = None, ) -> str | None: """ - Poll the status of submitted job run until query state reaches final state. - Returns one of the final states. + Poll the status of submitted job run until query state reaches final state; returns the final state. :param job_id: The ID of the job run request. :param poll_interval: Time (in seconds) to wait between calls to check query status on EMR diff --git a/airflow/providers/amazon/aws/hooks/glue.py b/airflow/providers/amazon/aws/hooks/glue.py index 421d11c09e..0e8ccd6907 100644 --- a/airflow/providers/amazon/aws/hooks/glue.py +++ b/airflow/providers/amazon/aws/hooks/glue.py @@ -33,6 +33,7 @@ ERROR_LOG_SUFFIX = "error" class GlueJobHook(AwsBaseHook): """ Interact with AWS Glue. + Provide thick wrapper around :external+boto3:py:class:`boto3.client("glue") <Glue.Client>`. :param s3_bucket: S3 bucket where logs and local etl script will be uploaded @@ -182,8 +183,7 @@ class GlueJobHook(AwsBaseHook): def get_job_state(self, job_name: str, run_id: str) -> str: """ - Get state of the Glue job. - The job state can be running, finished, failed, stopped or timeout. + Get state of the Glue job; the job state can be running, finished, failed, stopped or timeout. .. seealso:: - :external+boto3:py:meth:`Glue.Client.get_job_run` @@ -263,8 +263,7 @@ class GlueJobHook(AwsBaseHook): def job_completion(self, job_name: str, run_id: str, verbose: bool = False) -> dict[str, str]: """ - Waits until Glue job with job_name completes or fails and return final state if finished. - Raises AirflowException when the job failed. + Wait until Glue job with job_name finishes; return final state if finished or raises AirflowException. :param job_name: unique job name per AWS account :param run_id: The job-run ID of the predecessor job run @@ -282,8 +281,7 @@ class GlueJobHook(AwsBaseHook): async def async_job_completion(self, job_name: str, run_id: str, verbose: bool = False) -> dict[str, str]: """ - Waits until Glue job with job_name completes or fails and return final state if finished. - Raises AirflowException when the job failed. + Wait until Glue job with job_name finishes; return final state if finished or raises AirflowException. :param job_name: unique job name per AWS account :param run_id: The job-run ID of the predecessor job run diff --git a/airflow/providers/amazon/aws/hooks/glue_catalog.py b/airflow/providers/amazon/aws/hooks/glue_catalog.py index 81c452cc0e..a5d434879e 100644 --- a/airflow/providers/amazon/aws/hooks/glue_catalog.py +++ b/airflow/providers/amazon/aws/hooks/glue_catalog.py @@ -27,6 +27,7 @@ from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook class GlueCatalogHook(AwsBaseHook): """ Interact with AWS Glue Data Catalog. + Provide thin wrapper around :external+boto3:py:class:`boto3.client("glue") <Glue.Client>`. Additional arguments (such as ``aws_conn_id``) may be specified and diff --git a/airflow/providers/amazon/aws/hooks/glue_crawler.py b/airflow/providers/amazon/aws/hooks/glue_crawler.py index fcd16dadeb..82bd3f45fd 100644 --- a/airflow/providers/amazon/aws/hooks/glue_crawler.py +++ b/airflow/providers/amazon/aws/hooks/glue_crawler.py @@ -26,6 +26,7 @@ from airflow.providers.amazon.aws.hooks.sts import StsHook class GlueCrawlerHook(AwsBaseHook): """ Interacts with AWS Glue Crawler. + Provide thin wrapper around :external+boto3:py:class:`boto3.client("glue") <Glue.Client>`. Additional arguments (such as ``aws_conn_id``) may be specified and @@ -169,9 +170,7 @@ class GlueCrawlerHook(AwsBaseHook): def wait_for_crawler_completion(self, crawler_name: str, poll_interval: int = 5) -> str: """ - Waits until Glue crawler completes and - returns the status of the latest crawl run. - Raises AirflowException if the crawler fails or is cancelled. + Wait until Glue crawler completes; returns the status of the latest crawl or raises AirflowException. :param crawler_name: unique crawler name per AWS account :param poll_interval: Time (in seconds) to wait between two consecutive calls to check crawler status diff --git a/airflow/providers/amazon/aws/hooks/kinesis.py b/airflow/providers/amazon/aws/hooks/kinesis.py index e527aa6aac..0d2908bd8b 100644 --- a/airflow/providers/amazon/aws/hooks/kinesis.py +++ b/airflow/providers/amazon/aws/hooks/kinesis.py @@ -26,6 +26,7 @@ from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook class FirehoseHook(AwsBaseHook): """ Interact with Amazon Kinesis Firehose. + Provide thick wrapper around :external+boto3:py:class:`boto3.client("firehose") <Firehose.Client>`. :param delivery_stream: Name of the delivery stream diff --git a/airflow/providers/amazon/aws/hooks/lambda_function.py b/airflow/providers/amazon/aws/hooks/lambda_function.py index a3e82688ae..2d61f0751f 100644 --- a/airflow/providers/amazon/aws/hooks/lambda_function.py +++ b/airflow/providers/amazon/aws/hooks/lambda_function.py @@ -27,6 +27,7 @@ from airflow.providers.amazon.aws.utils import trim_none_values class LambdaHook(AwsBaseHook): """ Interact with AWS Lambda. + Provide thin wrapper around :external+boto3:py:class:`boto3.client("lambda") <Lambda.Client>`. Additional arguments (such as ``aws_conn_id``) may be specified and diff --git a/airflow/providers/amazon/aws/hooks/logs.py b/airflow/providers/amazon/aws/hooks/logs.py index 6d006a95a3..ba9ef09112 100644 --- a/airflow/providers/amazon/aws/hooks/logs.py +++ b/airflow/providers/amazon/aws/hooks/logs.py @@ -15,10 +15,6 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -""" -This module contains a hook (AwsLogsHook) with some very basic -functionality for interacting with AWS CloudWatch. -""" from __future__ import annotations import warnings @@ -39,6 +35,7 @@ NUM_CONSECUTIVE_EMPTY_RESPONSE_EXIT_THRESHOLD = 3 class AwsLogsHook(AwsBaseHook): """ Interact with Amazon CloudWatch Logs. + Provide thin wrapper around :external+boto3:py:class:`boto3.client("logs") <CloudWatchLogs.Client>`. Additional arguments (such as ``aws_conn_id``) may be specified and @@ -68,8 +65,7 @@ class AwsLogsHook(AwsBaseHook): continuation_token: ContinuationToken | None = None, ) -> Generator: """ - A generator for log items in a single stream. This will yield all the - items that are available at the current moment. + A generator for log items in a single stream; yields all items available at the current moment. .. seealso:: - :external+boto3:py:meth:`CloudWatchLogs.Client.get_log_events` diff --git a/airflow/providers/amazon/aws/hooks/quicksight.py b/airflow/providers/amazon/aws/hooks/quicksight.py index 2c61252b74..8cdf62db00 100644 --- a/airflow/providers/amazon/aws/hooks/quicksight.py +++ b/airflow/providers/amazon/aws/hooks/quicksight.py @@ -30,6 +30,7 @@ from airflow.providers.amazon.aws.hooks.sts import StsHook class QuickSightHook(AwsBaseHook): """ Interact with Amazon QuickSight. + Provide thin wrapper around :external+boto3:py:class:`boto3.client("quicksight") <QuickSight.Client>`. Additional arguments (such as ``aws_conn_id``) may be specified and diff --git a/airflow/providers/amazon/aws/hooks/rds.py b/airflow/providers/amazon/aws/hooks/rds.py index 16c72c1b26..df0f7af0e5 100644 --- a/airflow/providers/amazon/aws/hooks/rds.py +++ b/airflow/providers/amazon/aws/hooks/rds.py @@ -31,6 +31,7 @@ if TYPE_CHECKING: class RdsHook(AwsGenericHook["RDSClient"]): """ Interact with Amazon Relational Database Service (RDS). + Provide thin wrapper around :external+boto3:py:class:`boto3.client("rds") <RDS.Client>`. Additional arguments (such as ``aws_conn_id``) may be specified and @@ -69,8 +70,7 @@ class RdsHook(AwsGenericHook["RDSClient"]): self, snapshot_id: str, target_state: str, check_interval: int = 30, max_attempts: int = 40 ) -> None: """ - Polls DB Snapshots until the target state is reached. - An error is raised after a max number of attempts. + Poll DB Snapshots until target_state is reached; raise AirflowException after max_attempts. .. seealso:: - :external+boto3:py:meth:`RDS.Client.describe_db_snapshots` @@ -118,8 +118,7 @@ class RdsHook(AwsGenericHook["RDSClient"]): self, snapshot_id: str, target_state: str, check_interval: int = 30, max_attempts: int = 40 ) -> None: """ - Polls DB Cluster Snapshots until the target state is reached. - An error is raised after a max number of attempts. + Poll DB Cluster Snapshots until target_state is reached; raise AirflowException after a max_attempts. .. seealso:: - :external+boto3:py:meth:`RDS.Client.describe_db_cluster_snapshots` @@ -167,8 +166,7 @@ class RdsHook(AwsGenericHook["RDSClient"]): self, export_task_id: str, target_state: str, check_interval: int = 30, max_attempts: int = 40 ) -> None: """ - Polls export tasks until the target state is reached. - An error is raised after a max number of attempts. + Poll export tasks until target_state is reached; raise AirflowException after max_attempts. .. seealso:: - :external+boto3:py:meth:`RDS.Client.describe_export_tasks` @@ -209,8 +207,7 @@ class RdsHook(AwsGenericHook["RDSClient"]): self, subscription_name: str, target_state: str, check_interval: int = 30, max_attempts: int = 40 ) -> None: """ - Polls Even Subscriptions until the target state is reached. - An error is raised after a max number of attempts. + Poll Event Subscriptions until target_state is reached; raise AirflowException after max_attempts. .. seealso:: - :external+boto3:py:meth:`RDS.Client.describe_event_subscriptions` @@ -251,8 +248,7 @@ class RdsHook(AwsGenericHook["RDSClient"]): self, db_instance_id: str, target_state: str, check_interval: int = 30, max_attempts: int = 40 ) -> None: """ - Polls DB Instances until the target state is reached. - An error is raised after a max number of attempts. + Poll DB Instances until target_state is reached; raise AirflowException after max_attempts. .. seealso:: - :external+boto3:py:meth:`RDS.Client.describe_db_instances` @@ -300,8 +296,7 @@ class RdsHook(AwsGenericHook["RDSClient"]): self, db_cluster_id: str, target_state: str, check_interval: int = 30, max_attempts: int = 40 ) -> None: """ - Polls DB Clusters until the target state is reached. - An error is raised after a max number of attempts. + Poll DB Clusters until target_state is reached; raise AirflowException after max_attempts. .. seealso:: - :external+boto3:py:meth:`RDS.Client.describe_db_clusters` diff --git a/airflow/providers/amazon/aws/hooks/redshift_data.py b/airflow/providers/amazon/aws/hooks/redshift_data.py index b38fc8962b..fddd42bd61 100644 --- a/airflow/providers/amazon/aws/hooks/redshift_data.py +++ b/airflow/providers/amazon/aws/hooks/redshift_data.py @@ -31,6 +31,7 @@ if TYPE_CHECKING: class RedshiftDataHook(AwsGenericHook["RedshiftDataAPIServiceClient"]): """ Interact with Amazon Redshift Data API. + Provide thin wrapper around :external+boto3:py:class:`boto3.client("redshift-data") <RedshiftDataAPIService.Client>`. diff --git a/airflow/providers/amazon/aws/hooks/s3.py b/airflow/providers/amazon/aws/hooks/s3.py index 2a832e015d..f4b493757d 100644 --- a/airflow/providers/amazon/aws/hooks/s3.py +++ b/airflow/providers/amazon/aws/hooks/s3.py @@ -64,10 +64,7 @@ logger = logging.getLogger(__name__) def provide_bucket_name(func: T) -> T: - """ - Function decorator that provides a bucket name taken from the connection - in case no bucket name has been passed to the function. - """ + """Provide a bucket name taken from the connection if no bucket name has been passed to the function.""" if hasattr(func, "_unify_bucket_name_and_key_wrapped"): logger.warning("`unify_bucket_name_and_key` should wrap `provide_bucket_name`.") function_signature = signature(func) @@ -97,10 +94,7 @@ def provide_bucket_name(func: T) -> T: def provide_bucket_name_async(func: T) -> T: - """ - Function decorator that provides a bucket name taken from the connection - in case no bucket name has been passed to the function. - """ + """Provide a bucket name taken from the connection if no bucket name has been passed to the function.""" function_signature = signature(func) @wraps(func) @@ -120,10 +114,7 @@ def provide_bucket_name_async(func: T) -> T: def unify_bucket_name_and_key(func: T) -> T: - """ - Function decorator that unifies bucket name and key taken from the key - in case no bucket name and at least a key has been passed to the function. - """ + """Unify bucket name and key in case no bucket name and at least a key has been passed to the function.""" function_signature = signature(func) @wraps(func) @@ -156,6 +147,7 @@ def unify_bucket_name_and_key(func: T) -> T: class S3Hook(AwsBaseHook): """ Interact with Amazon Simple Storage Service (S3). + Provide thick wrapper around :external+boto3:py:class:`boto3.client("s3") <S3.Client>` and :external+boto3:py:class:`boto3.resource("s3") <S3.ServiceResource>`. @@ -495,8 +487,10 @@ class S3Hook(AwsBaseHook): key: str, ) -> bool: """ - Function to check if wildcard_match is True get list of files that a key matching a wildcard - expression exists in a bucket asynchronously and return the boolean value. If wildcard_match + Get a list of files that a key matching a wildcard expression or get the head object. + + If wildcard_match is True get list of files that a key matching a wildcard + expression exists in a bucket asynchronously and return the boolean value. If wildcard_match is False get the head object from the bucket and return the boolean value. :param client: aiobotocore client @@ -1359,8 +1353,7 @@ class S3Hook(AwsBaseHook): bucket_name: str | None = None, ) -> None: """ - Overwrites the existing TagSet with provided tags. - Must provide a TagSet, a key/value pair, or both. + Overwrites the existing TagSet with provided tags; must provide a TagSet, a key/value pair, or both. .. seealso:: - :external+boto3:py:meth:`S3.Client.put_bucket_tagging` diff --git a/airflow/providers/amazon/aws/hooks/ses.py b/airflow/providers/amazon/aws/hooks/ses.py index 2f5a6a9164..120d3235f9 100644 --- a/airflow/providers/amazon/aws/hooks/ses.py +++ b/airflow/providers/amazon/aws/hooks/ses.py @@ -26,6 +26,7 @@ from airflow.utils.email import build_mime_message class SesHook(AwsBaseHook): """ Interact with Amazon Simple Email Service. + Provide thin wrapper around :external+boto3:py:class:`boto3.client("ses") <SES.Client>`. Additional arguments (such as ``aws_conn_id``) may be specified and diff --git a/airflow/providers/amazon/aws/hooks/sns.py b/airflow/providers/amazon/aws/hooks/sns.py index 199376c7ff..b2d344ded4 100644 --- a/airflow/providers/amazon/aws/hooks/sns.py +++ b/airflow/providers/amazon/aws/hooks/sns.py @@ -40,6 +40,7 @@ def _get_message_attribute(o): class SnsHook(AwsBaseHook): """ Interact with Amazon Simple Notification Service. + Provide thin wrapper around :external+boto3:py:class:`boto3.client("sns") <SNS.Client>`. Additional arguments (such as ``aws_conn_id``) may be specified and diff --git a/airflow/providers/amazon/aws/hooks/sqs.py b/airflow/providers/amazon/aws/hooks/sqs.py index c59beba922..d1d8ed99d0 100644 --- a/airflow/providers/amazon/aws/hooks/sqs.py +++ b/airflow/providers/amazon/aws/hooks/sqs.py @@ -24,6 +24,7 @@ from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook class SqsHook(AwsBaseHook): """ Interact with Amazon Simple Queue Service. + Provide thin wrapper around :external+boto3:py:class:`boto3.client("sqs") <SQS.Client>`. Additional arguments (such as ``aws_conn_id``) may be specified and diff --git a/airflow/providers/amazon/aws/hooks/ssm.py b/airflow/providers/amazon/aws/hooks/ssm.py index 3078502266..4cc8c4f9af 100644 --- a/airflow/providers/amazon/aws/hooks/ssm.py +++ b/airflow/providers/amazon/aws/hooks/ssm.py @@ -25,6 +25,7 @@ from airflow.utils.types import NOTSET, ArgNotSet class SsmHook(AwsBaseHook): """ Interact with Amazon Systems Manager (SSM). + Provide thin wrapper around :external+boto3:py:class:`boto3.client("ssm") <SSM.Client>`. Additional arguments (such as ``aws_conn_id``) may be specified and @@ -40,8 +41,7 @@ class SsmHook(AwsBaseHook): def get_parameter_value(self, parameter: str, default: str | ArgNotSet = NOTSET) -> str: """ - Returns the value of the provided Parameter or an optional default. - If value exists, and it is encrypted, then decrypt and mask them for loggers. + Return the provided Parameter or an optional default; if it is encrypted, then decrypt and mask. .. seealso:: - :external+boto3:py:meth:`SSM.Client.get_parameter` diff --git a/airflow/providers/amazon/aws/hooks/step_function.py b/airflow/providers/amazon/aws/hooks/step_function.py index 8b5df2bdba..21bc51d8dc 100644 --- a/airflow/providers/amazon/aws/hooks/step_function.py +++ b/airflow/providers/amazon/aws/hooks/step_function.py @@ -24,6 +24,7 @@ from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook class StepFunctionHook(AwsBaseHook): """ Interact with an AWS Step Functions State Machine. + Provide thin wrapper around :external+boto3:py:class:`boto3.client("stepfunctions") <SFN.Client>`. Additional arguments (such as ``aws_conn_id``) may be specified and diff --git a/airflow/providers/amazon/aws/hooks/sts.py b/airflow/providers/amazon/aws/hooks/sts.py index ca481f4b30..9a4d389396 100644 --- a/airflow/providers/amazon/aws/hooks/sts.py +++ b/airflow/providers/amazon/aws/hooks/sts.py @@ -22,6 +22,7 @@ from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook class StsHook(AwsBaseHook): """ Interact with AWS Security Token Service (STS). + Provide thin wrapper around :external+boto3:py:class:`boto3.client("sts") <STS.Client>`. Additional arguments (such as ``aws_conn_id``) may be specified and diff --git a/airflow/providers/amazon/aws/links/emr.py b/airflow/providers/amazon/aws/links/emr.py index 358b8b302f..f5def55f87 100644 --- a/airflow/providers/amazon/aws/links/emr.py +++ b/airflow/providers/amazon/aws/links/emr.py @@ -51,8 +51,9 @@ def get_log_uri( *, cluster: dict[str, Any] | None = None, emr_client: boto3.client = None, job_flow_id: str | None = None ) -> str | None: """ - Retrieves the S3 URI to the EMR Job logs. Requires either the output of a - describe_cluster call or both an EMR Client and a job_flow_id to look it up. + Retrieves the S3 URI to the EMR Job logs. + + Requires either the output of a describe_cluster call or both an EMR Client and a job_flow_id.. """ if not exactly_one(bool(cluster), emr_client and job_flow_id): raise AirflowException( diff --git a/airflow/providers/amazon/aws/log/s3_task_handler.py b/airflow/providers/amazon/aws/log/s3_task_handler.py index 5ec02b44d3..a353d9b07e 100644 --- a/airflow/providers/amazon/aws/log/s3_task_handler.py +++ b/airflow/providers/amazon/aws/log/s3_task_handler.py @@ -44,9 +44,9 @@ def get_default_delete_local_copy(): class S3TaskHandler(FileTaskHandler, LoggingMixin): """ - S3TaskHandler is a python log handler that handles and reads - task instance logs. It extends airflow FileTaskHandler and - uploads to and reads from S3 remote storage. + S3TaskHandler is a python log handler that handles and reads task instance logs. + + It extends airflow FileTaskHandler and uploads to and reads from S3 remote storage. """ trigger_should_wrap = True @@ -133,6 +133,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin): def _read(self, ti, try_number, metadata=None): """ Read logs of given task instance and try_number from S3 remote storage. + If failed, read the log from task instance host machine. todo: when min airflow version >= 2.6 then remove this method (``_read``) @@ -168,8 +169,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin): def s3_read(self, remote_log_location: str, return_error: bool = False) -> str: """ - Returns the log found at the remote_log_location. Returns '' if no - logs are found or there is an error. + Returns the log found at the remote_log_location. Return '' if no logs are found or there is an error. :param remote_log_location: the log's location in remote storage :param return_error: if True, returns a string error message if an @@ -188,8 +188,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin): def s3_write(self, log: str, remote_log_location: str, append: bool = True, max_retry: int = 1) -> bool: """ - Writes the log to the remote_log_location and return `True` when done. Fails silently - and return `False` if no log was created. + Write the log to the remote_log_location; return `True` or fails silently and return `False`. :param log: the log to write to the remote_log_location :param remote_log_location: the log's location in remote storage diff --git a/airflow/providers/amazon/aws/operators/eks.py b/airflow/providers/amazon/aws/operators/eks.py index eb6ebb5b0f..9c27e88350 100644 --- a/airflow/providers/amazon/aws/operators/eks.py +++ b/airflow/providers/amazon/aws/operators/eks.py @@ -388,7 +388,7 @@ class EksCreateNodegroupOperator(BaseOperator): nodegroup_subnets_list: list[str] = [] if self.nodegroup_subnets != "": try: - nodegroup_subnets_list = cast(List, literal_eval(self.nodegroup_subnets)) + nodegroup_subnets_list = cast(list, literal_eval(self.nodegroup_subnets)) except ValueError: self.log.warning( "The nodegroup_subnets should be List or string representing " diff --git a/airflow/providers/amazon/aws/operators/emr.py b/airflow/providers/amazon/aws/operators/emr.py index 2c99d0e07f..f9eacdb79d 100644 --- a/airflow/providers/amazon/aws/operators/emr.py +++ b/airflow/providers/amazon/aws/operators/emr.py @@ -635,8 +635,8 @@ class EmrContainerOperator(BaseOperator): class EmrCreateJobFlowOperator(BaseOperator): """ Creates an EMR JobFlow, reading the config from the EMR connection. - A dictionary of JobFlow overrides can be passed that override - the config from the connection. + + A dictionary of JobFlow overrides can be passed that override the config from the connection. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -797,10 +797,7 @@ class EmrCreateJobFlowOperator(BaseOperator): return event["job_flow_id"] def on_kill(self) -> None: - """ - Terminate the EMR cluster (job flow). If TerminationProtected=True on the cluster, - termination will be unsuccessful. - """ + """Terminate the EMR cluster (job flow) unless TerminationProtected is enabled on the cluster.""" if self._job_flow_id: self.log.info("Terminating job flow %s", self._job_flow_id) self._emr_hook.conn.terminate_job_flows(JobFlowIds=[self._job_flow_id]) diff --git a/airflow/providers/amazon/aws/operators/glue_crawler.py b/airflow/providers/amazon/aws/operators/glue_crawler.py index bf0f75c450..a7efb9f5c0 100644 --- a/airflow/providers/amazon/aws/operators/glue_crawler.py +++ b/airflow/providers/amazon/aws/operators/glue_crawler.py @@ -32,9 +32,11 @@ from airflow.providers.amazon.aws.hooks.glue_crawler import GlueCrawlerHook class GlueCrawlerOperator(BaseOperator): """ - Creates, updates and triggers an AWS Glue Crawler. AWS Glue Crawler is a serverless - service that manages a catalog of metadata tables that contain the inferred - schema, format and data types of data stores within the AWS cloud. + Creates, updates and triggers an AWS Glue Crawler. + + AWS Glue Crawler is a serverless service that manages a catalog of + metadata tables that contain the inferred schema, format and data + types of data stores within the AWS cloud. .. seealso:: For more information on how to use this operator, take a look at the guide: diff --git a/airflow/providers/amazon/aws/operators/lambda_function.py b/airflow/providers/amazon/aws/operators/lambda_function.py index d996513436..93907634c1 100644 --- a/airflow/providers/amazon/aws/operators/lambda_function.py +++ b/airflow/providers/amazon/aws/operators/lambda_function.py @@ -120,8 +120,9 @@ class LambdaCreateFunctionOperator(BaseOperator): class LambdaInvokeFunctionOperator(BaseOperator): """ - Invokes an AWS Lambda function. You can invoke a function synchronously (and wait for the response), - or asynchronously. + Invokes an AWS Lambda function. + + You can invoke a function synchronously (and wait for the response), or asynchronously. To invoke a function asynchronously, set `invocation_type` to `Event`. For more details, review the boto3 Lambda invoke docs. diff --git a/airflow/providers/amazon/aws/operators/quicksight.py b/airflow/providers/amazon/aws/operators/quicksight.py index 85514af806..4268374117 100644 --- a/airflow/providers/amazon/aws/operators/quicksight.py +++ b/airflow/providers/amazon/aws/operators/quicksight.py @@ -29,8 +29,7 @@ DEFAULT_CONN_ID = "aws_default" class QuickSightCreateIngestionOperator(BaseOperator): """ - Creates and starts a new SPICE ingestion for a dataset. - Also, helps to Refresh existing SPICE datasets. + Creates and starts a new SPICE ingestion for a dataset; also helps to Refresh existing SPICE datasets. .. seealso:: For more information on how to use this operator, take a look at the guide: diff --git a/airflow/providers/amazon/aws/operators/rds.py b/airflow/providers/amazon/aws/operators/rds.py index 4d9b088978..5f7d423d4c 100644 --- a/airflow/providers/amazon/aws/operators/rds.py +++ b/airflow/providers/amazon/aws/operators/rds.py @@ -56,6 +56,7 @@ class RdsBaseOperator(BaseOperator): class RdsCreateDbSnapshotOperator(RdsBaseOperator): """ Creates a snapshot of a DB instance or DB cluster. + The source DB instance or cluster must be in the available or storage-optimization state. .. seealso:: diff --git a/airflow/providers/amazon/aws/operators/s3.py b/airflow/providers/amazon/aws/operators/s3.py index d1b77ba90a..c4bcaa61da 100644 --- a/airflow/providers/amazon/aws/operators/s3.py +++ b/airflow/providers/amazon/aws/operators/s3.py @@ -412,8 +412,7 @@ class S3CreateObjectOperator(BaseOperator): class S3DeleteObjectsOperator(BaseOperator): """ - To enable users to delete single object or multiple objects from - a bucket using a single HTTP request. + To enable users to delete single object or multiple objects from a bucket using a single HTTP request. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -482,10 +481,10 @@ class S3DeleteObjectsOperator(BaseOperator): class S3FileTransformOperator(BaseOperator): """ - Copies data from a source S3 location to a temporary location on the - local filesystem. Runs a transformation on this file as specified by - the transformation script and uploads the output to a destination S3 - location. + Copies data from a source S3 location to a temporary location on the local filesystem. + + Runs a transformation on this file as specified by the transformation + script and uploads the output to a destination S3 location. The locations of the source and the destination files in the local filesystem is provided as a first and second arguments to the diff --git a/airflow/providers/amazon/aws/operators/sagemaker.py b/airflow/providers/amazon/aws/operators/sagemaker.py index 8467459188..4dac7df007 100644 --- a/airflow/providers/amazon/aws/operators/sagemaker.py +++ b/airflow/providers/amazon/aws/operators/sagemaker.py @@ -106,6 +106,7 @@ class SageMakerBaseOperator(BaseOperator): def _create_integer_fields(self) -> None: """ Set fields which should be cast to integers. + Child classes should override this method if they need integer fields parsed. """ self.integer_fields = [] @@ -156,10 +157,11 @@ class SageMakerBaseOperator(BaseOperator): class SageMakerProcessingOperator(SageMakerBaseOperator): """ - Use Amazon SageMaker Processing to analyze data and evaluate machine learning - models on Amazon SageMake. With Processing, you can use a simplified, managed - experience on SageMaker to run your data processing workloads, such as feature - engineering, data validation, model evaluation, and model interpretation. + Use Amazon SageMaker Processing to analyze data and evaluate machine learning models on Amazon SageMaker. + + With Processing, you can use a simplified, managed experience on SageMaker + to run your data processing workloads, such as feature engineering, data + validation, model evaluation, and model interpretation. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -288,9 +290,9 @@ class SageMakerProcessingOperator(SageMakerBaseOperator): class SageMakerEndpointConfigOperator(SageMakerBaseOperator): """ - Creates an endpoint configuration that Amazon SageMaker hosting - services uses to deploy models. In the configuration, you identify - one or more models, created using the CreateModel API, to deploy and + Creates an endpoint configuration that Amazon SageMaker hosting services uses to deploy models. + + In the configuration, you identify one or more models, created using the CreateModel API, to deploy and the resources that you want Amazon SageMaker to provision. .. seealso:: @@ -333,10 +335,11 @@ class SageMakerEndpointConfigOperator(SageMakerBaseOperator): class SageMakerEndpointOperator(SageMakerBaseOperator): """ - When you create a serverless endpoint, SageMaker provisions and manages - the compute resources for you. Then, you can make inference requests to - the endpoint and receive model predictions in response. SageMaker scales - the compute resources up and down as needed to handle your request traffic. + When you create a serverless endpoint, SageMaker provisions and manages the compute resources for you. + + Then, you can make inference requests to the endpoint and receive model predictions + in response. SageMaker scales the compute resources up and down as needed to handle + your request traffic. Requires an Endpoint Config. @@ -492,8 +495,10 @@ class SageMakerEndpointOperator(SageMakerBaseOperator): class SageMakerTransformOperator(SageMakerBaseOperator): """ - Starts a transform job. A transform job uses a trained model to get inferences - on a dataset and saves these results to an Amazon S3 location that you specify. + Starts a transform job. + + A transform job uses a trained model to get inferences on a dataset + and saves these results to an Amazon S3 location that you specify. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -660,11 +665,13 @@ class SageMakerTransformOperator(SageMakerBaseOperator): class SageMakerTuningOperator(SageMakerBaseOperator): """ - Starts a hyperparameter tuning job. A hyperparameter tuning job finds the - best version of a model by running many training jobs on your dataset using - the algorithm you choose and values for hyperparameters within ranges that - you specify. It then chooses the hyperparameter values that result in a model - that performs the best, as measured by an objective metric that you choose. + Starts a hyperparameter tuning job. + + A hyperparameter tuning job finds the best version of a model by running + many training jobs on your dataset using the algorithm you choose and + values for hyperparameters within ranges that you specify. It then chooses + the hyperparameter values that result in a model that performs the best, + as measured by an objective metric that you choose. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -771,10 +778,12 @@ class SageMakerTuningOperator(SageMakerBaseOperator): class SageMakerModelOperator(SageMakerBaseOperator): """ - Creates a model in Amazon SageMaker. In the request, you name the model and - describe a primary container. For the primary container, you specify the Docker - image that contains inference code, artifacts (from prior training), and a custom - environment map that the inference code uses when you deploy the model for predictions. + Creates a model in Amazon SageMaker. + + In the request, you name the model and describe a primary container. For the + primary container, you specify the Docker image that contains inference code, + artifacts (from prior training), and a custom environment map that the inference + code uses when you deploy the model for predictions. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -808,8 +817,10 @@ class SageMakerModelOperator(SageMakerBaseOperator): class SageMakerTrainingOperator(SageMakerBaseOperator): """ - Starts a model training job. After training completes, Amazon SageMaker saves - the resulting model artifacts to an Amazon S3 location that you specify. + Starts a model training job. + + After training completes, Amazon SageMaker saves the resulting + model artifacts to an Amazon S3 location that you specify. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -1090,8 +1101,9 @@ class SageMakerStopPipelineOperator(SageMakerBaseOperator): class SageMakerRegisterModelVersionOperator(SageMakerBaseOperator): """ - Registers an Amazon SageMaker model by creating a model version that specifies the model group to which it - belongs. Will create the model group if it does not exist already. + Register a SageMaker model by creating a model version that specifies the model group to which it belongs. + + Will create the model group if it does not exist already. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -1180,6 +1192,7 @@ class SageMakerRegisterModelVersionOperator(SageMakerBaseOperator): class SageMakerAutoMLOperator(SageMakerBaseOperator): """ Creates an auto ML job, learning to predict the given column from the data provided through S3. + The learning output is written to the specified S3 location. .. seealso:: diff --git a/airflow/providers/amazon/aws/sensors/athena.py b/airflow/providers/amazon/aws/sensors/athena.py index 599341092e..70d7875629 100644 --- a/airflow/providers/amazon/aws/sensors/athena.py +++ b/airflow/providers/amazon/aws/sensors/athena.py @@ -30,8 +30,7 @@ from airflow.sensors.base import BaseSensorOperator class AthenaSensor(BaseSensorOperator): """ - Asks for the state of the Query until it reaches a failure state or success state. - If the query fails, the task will fail. + Poll the state of the Query until it reaches a terminal state; fails if the query fails. .. seealso:: For more information on how to use this sensor, take a look at the guide: diff --git a/airflow/providers/amazon/aws/sensors/batch.py b/airflow/providers/amazon/aws/sensors/batch.py index 475b0ecb71..2033d1e86b 100644 --- a/airflow/providers/amazon/aws/sensors/batch.py +++ b/airflow/providers/amazon/aws/sensors/batch.py @@ -33,8 +33,7 @@ if TYPE_CHECKING: class BatchSensor(BaseSensorOperator): """ - Asks for the state of the Batch Job execution until it reaches a failure state or success state. - If the job fails, the task will fail. + Poll the state of the Batch Job until it reaches a terminal state; fails if the job fails. .. seealso:: For more information on how to use this sensor, take a look at the guide: @@ -110,8 +109,8 @@ class BatchSensor(BaseSensorOperator): def execute_complete(self, context: Context, event: dict[str, Any]) -> None: """ Callback for when the trigger fires - returns immediately. - Relies on trigger to throw an exception, otherwise it assumes execution was - successful. + + Relies on trigger to throw an exception, otherwise it assumes execution was successful. """ if "status" in event and event["status"] == "failure": raise AirflowException(event["message"]) @@ -132,8 +131,7 @@ class BatchSensor(BaseSensorOperator): class BatchComputeEnvironmentSensor(BaseSensorOperator): """ - Asks for the state of the Batch compute environment until it reaches a failure state or success state. - If the environment fails, the task will fail. + Poll the state of the Batch environment until it reaches a terminal state; fails if the environment fails. .. seealso:: For more information on how to use this sensor, take a look at the guide: @@ -193,8 +191,7 @@ class BatchComputeEnvironmentSensor(BaseSensorOperator): class BatchJobQueueSensor(BaseSensorOperator): """ - Asks for the state of the Batch job queue until it reaches a failure state or success state. - If the queue fails, the task will fail. + Poll the state of the Batch job queue until it reaches a terminal state; fails if the queue fails. .. seealso:: For more information on how to use this sensor, take a look at the guide: diff --git a/airflow/providers/amazon/aws/sensors/ec2.py b/airflow/providers/amazon/aws/sensors/ec2.py index 08e8e59fb7..c5d7761031 100644 --- a/airflow/providers/amazon/aws/sensors/ec2.py +++ b/airflow/providers/amazon/aws/sensors/ec2.py @@ -31,8 +31,7 @@ if TYPE_CHECKING: class EC2InstanceStateSensor(BaseSensorOperator): """ - Check the state of the AWS EC2 instance until - state of the instance become equal to the target state. + Poll the state of the AWS EC2 instance until the instance reaches the target state. .. seealso:: For more information on how to use this sensor, take a look at the guide: diff --git a/airflow/providers/amazon/aws/sensors/ecs.py b/airflow/providers/amazon/aws/sensors/ecs.py index a150a8e87a..8580d0c479 100644 --- a/airflow/providers/amazon/aws/sensors/ecs.py +++ b/airflow/providers/amazon/aws/sensors/ecs.py @@ -64,8 +64,7 @@ class EcsBaseSensor(BaseSensorOperator): class EcsClusterStateSensor(EcsBaseSensor): """ - Polls the cluster state until it reaches a terminal state. Raises an - AirflowException with the failure reason if a failed state is reached. + Poll the cluster state until it reaches a terminal state; raises AirflowException with the failure reason. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -103,8 +102,7 @@ class EcsClusterStateSensor(EcsBaseSensor): class EcsTaskDefinitionStateSensor(EcsBaseSensor): """ - Polls the task definition state until it reaches a terminal state. Raises an - AirflowException with the failure reason if a failed state is reached. + Poll task definition until it reaches a terminal state; raise AirflowException with the failure reason. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -149,8 +147,7 @@ class EcsTaskDefinitionStateSensor(EcsBaseSensor): class EcsTaskStateSensor(EcsBaseSensor): """ - Polls the task state until it reaches a terminal state. Raises an - AirflowException with the failure reason if a failed state is reached. + Poll the task state until it reaches a terminal state; raises AirflowException with the failure reason. .. seealso:: For more information on how to use this operator, take a look at the guide: diff --git a/airflow/providers/amazon/aws/sensors/eks.py b/airflow/providers/amazon/aws/sensors/eks.py index e5be3340f7..4acf41fa03 100644 --- a/airflow/providers/amazon/aws/sensors/eks.py +++ b/airflow/providers/amazon/aws/sensors/eks.py @@ -61,6 +61,7 @@ UNEXPECTED_TERMINAL_STATE_MSG = ( class EksBaseSensor(BaseSensorOperator): """ Base class to check various EKS states. + Subclasses need to implement get_state and get_terminal_states methods. :param cluster_name: The name of the Cluster diff --git a/airflow/providers/amazon/aws/sensors/emr.py b/airflow/providers/amazon/aws/sensors/emr.py index f21d1aacd8..2f44caab06 100644 --- a/airflow/providers/amazon/aws/sensors/emr.py +++ b/airflow/providers/amazon/aws/sensors/emr.py @@ -116,8 +116,7 @@ class EmrBaseSensor(BaseSensorOperator): class EmrServerlessJobSensor(BaseSensorOperator): """ - Asks for the state of the job run until it reaches a failure state or success state. - If the job run fails, the task will fail. + Poll the state of the job run until it reaches a terminal state; fails if the job run fails. .. seealso:: For more information on how to use this sensor, take a look at the guide: @@ -178,8 +177,7 @@ class EmrServerlessJobSensor(BaseSensorOperator): class EmrServerlessApplicationSensor(BaseSensorOperator): """ - Asks for the state of the application until it reaches a failure state or success state. - If the application fails, the task will fail. + Poll the state of the application until it reaches a terminal state; fails if the application fails. .. seealso:: For more information on how to use this sensor, take a look at the guide: @@ -234,8 +232,7 @@ class EmrServerlessApplicationSensor(BaseSensorOperator): class EmrContainerSensor(BaseSensorOperator): """ - Asks for the state of the job run until it reaches a failure state or success state. - If the job run fails, the task will fail. + Poll the state of the job run until it reaches a terminal state; fail if the job run fails. .. seealso:: For more information on how to use this sensor, take a look at the guide: @@ -332,9 +329,7 @@ class EmrContainerSensor(BaseSensorOperator): class EmrNotebookExecutionSensor(EmrBaseSensor): """ - Polls the state of the EMR notebook execution until it reaches - any of the target states. - If a failure state is reached, the sensor throws an error, and fails the task. + Poll the EMR notebook until it reaches any of the target states; raise AirflowException on failure. .. seealso:: For more information on how to use this sensor, take a look at the guide: @@ -396,9 +391,7 @@ class EmrNotebookExecutionSensor(EmrBaseSensor): class EmrJobFlowSensor(EmrBaseSensor): """ - Asks for the state of the EMR JobFlow (Cluster) until it reaches - any of the target states. - If it fails the sensor errors, failing the task. + Poll the EMR JobFlow Cluster until it reaches any of the target states; raise AirflowException on failure. With the default target states, sensor waits cluster to be terminated. When target_states is set to ['RUNNING', 'WAITING'] sensor waits @@ -522,8 +515,7 @@ class EmrJobFlowSensor(EmrBaseSensor): class EmrStepSensor(EmrBaseSensor): """ - Asks for the state of the step until it reaches any of the target states. - If it fails the sensor errors, failing the task. + Poll the state of the step until it reaches any of the target states; raise AirflowException on failure. With the default target states, sensor waits step to be completed. diff --git a/airflow/providers/amazon/aws/sensors/lambda_function.py b/airflow/providers/amazon/aws/sensors/lambda_function.py index 2febaba7a6..772ed0689a 100644 --- a/airflow/providers/amazon/aws/sensors/lambda_function.py +++ b/airflow/providers/amazon/aws/sensors/lambda_function.py @@ -32,8 +32,7 @@ from airflow.sensors.base import BaseSensorOperator class LambdaFunctionStateSensor(BaseSensorOperator): """ - Asks for the state of the Lambda until it reaches a target state. - If the query fails, the task will fail. + Poll the state of the Lambda until it reaches a target state; fails if the query fails. .. seealso:: For more information on how to use this sensor, take a look at the guide: diff --git a/airflow/providers/amazon/aws/sensors/s3.py b/airflow/providers/amazon/aws/sensors/s3.py index 9eb1ab75d5..23b344fa51 100644 --- a/airflow/providers/amazon/aws/sensors/s3.py +++ b/airflow/providers/amazon/aws/sensors/s3.py @@ -38,6 +38,7 @@ from airflow.sensors.base import BaseSensorOperator, poke_mode_only class S3KeySensor(BaseSensorOperator): """ Waits for one or multiple keys (a file-like instance on S3) to be present in a S3 bucket. + The path is just a key/value pointer to a resource for the given S3 path. Note: S3 does not support folders directly, and only provides key/value pairs. @@ -162,8 +163,8 @@ class S3KeySensor(BaseSensorOperator): def execute_complete(self, context: Context, event: dict[str, Any]) -> bool | None: """ Callback for when the trigger fires - returns immediately. - Relies on trigger to throw an exception, otherwise it assumes execution was - successful. + + Relies on trigger to throw an exception, otherwise it assumes execution was successful. """ if event["status"] == "running": found_keys = self.check_fn(event["files"]) # type: ignore[misc] @@ -189,11 +190,10 @@ class S3KeySensor(BaseSensorOperator): @poke_mode_only class S3KeysUnchangedSensor(BaseSensorOperator): """ - Checks for changes in the number of objects at prefix in AWS S3 - bucket and returns True if the inactivity period has passed with no - increase in the number of objects. Note, this sensor will not behave correctly - in reschedule mode, as the state of the listed objects in the S3 bucket will - be lost between rescheduled invocations. + Return True if inactivity_period has passed with no increase in the number of objects matching prefix. + + Note, this sensor will not behave correctly in reschedule mode, as the state of the listed + objects in the S3 bucket will be lost between rescheduled invocations. .. seealso:: For more information on how to use this sensor, take a look at the guide: @@ -262,8 +262,7 @@ class S3KeysUnchangedSensor(BaseSensorOperator): def is_keys_unchanged(self, current_objects: set[str]) -> bool: """ - Checks whether new objects have been uploaded and the inactivity_period - has passed and updates the state of the sensor accordingly. + Check for new objects after the inactivity_period and update the sensor state accordingly. :param current_objects: set of object ids in bucket during last poke. """ diff --git a/airflow/providers/amazon/aws/sensors/sagemaker.py b/airflow/providers/amazon/aws/sensors/sagemaker.py index 7e8340ed90..6ede44bc43 100644 --- a/airflow/providers/amazon/aws/sensors/sagemaker.py +++ b/airflow/providers/amazon/aws/sensors/sagemaker.py @@ -93,8 +93,7 @@ class SageMakerBaseSensor(BaseSensorOperator): class SageMakerEndpointSensor(SageMakerBaseSensor): """ - Polls the endpoint state until it reaches a terminal state. Raises an - AirflowException with the failure reason if a failed state is reached. + Poll the endpoint state until it reaches a terminal state; raise AirflowException with the failure reason. .. seealso:: For more information on how to use this sensor, take a look at the guide: @@ -129,8 +128,7 @@ class SageMakerEndpointSensor(SageMakerBaseSensor): class SageMakerTransformSensor(SageMakerBaseSensor): """ - Polls the transform job until it reaches a terminal state. Raises an - AirflowException with the failure reason if a failed state is reached. + Poll the transform job until it reaches a terminal state; raise AirflowException with the failure reason. .. seealso:: For more information on how to use this sensor, take a look at the guide: @@ -165,8 +163,7 @@ class SageMakerTransformSensor(SageMakerBaseSensor): class SageMakerTuningSensor(SageMakerBaseSensor): """ - Asks for the state of the tuning state until it reaches a terminal state. - Raises an AirflowException with the failure reason if a failed state is reached. + Poll the tuning state until it reaches a terminal state; raise AirflowException with the failure reason. .. seealso:: For more information on how to use this sensor, take a look at the guide: @@ -201,8 +198,7 @@ class SageMakerTuningSensor(SageMakerBaseSensor): class SageMakerTrainingSensor(SageMakerBaseSensor): """ - Polls the training job until it reaches a terminal state. Raises an - AirflowException with the failure reason if a failed state is reached. + Poll the training job until it reaches a terminal state; raise AirflowException with the failure reason. .. seealso:: For more information on how to use this sensor, take a look at the guide: @@ -280,8 +276,7 @@ class SageMakerTrainingSensor(SageMakerBaseSensor): class SageMakerPipelineSensor(SageMakerBaseSensor): """ - Polls the pipeline until it reaches a terminal state. Raises an - AirflowException with the failure reason if a failed state is reached. + Poll the pipeline until it reaches a terminal state; raise AirflowException with the failure reason. .. seealso:: For more information on how to use this sensor, take a look at the guide: @@ -315,8 +310,7 @@ class SageMakerPipelineSensor(SageMakerBaseSensor): class SageMakerAutoMLSensor(SageMakerBaseSensor): """ - Polls the auto ML job until it reaches a terminal state. - Raises an AirflowException with the failure reason if a failed state is reached. + Poll the auto ML job until it reaches a terminal state; raise AirflowException with the failure reason. .. seealso:: For more information on how to use this sensor, take a look at the guide: diff --git a/airflow/providers/amazon/aws/sensors/sqs.py b/airflow/providers/amazon/aws/sensors/sqs.py index f5bd0b385c..da7f7e513e 100644 --- a/airflow/providers/amazon/aws/sensors/sqs.py +++ b/airflow/providers/amazon/aws/sensors/sqs.py @@ -37,6 +37,7 @@ if TYPE_CHECKING: class SqsSensor(BaseSensorOperator): """ Get messages from an Amazon SQS queue and then delete the messages from the queue. + If deletion of messages fails, an AirflowException is thrown. Otherwise, the messages are pushed through XCom with the key ``messages``. diff --git a/airflow/providers/amazon/aws/sensors/step_function.py b/airflow/providers/amazon/aws/sensors/step_function.py index ec6ee2b422..b6a48c0033 100644 --- a/airflow/providers/amazon/aws/sensors/step_function.py +++ b/airflow/providers/amazon/aws/sensors/step_function.py @@ -32,9 +32,7 @@ if TYPE_CHECKING: class StepFunctionExecutionSensor(BaseSensorOperator): """ - Asks for the state of the AWS Step Function State Machine Execution until it - reaches a failure state or success state. - If it fails, then fail the task. + Poll the Step Function State Machine Execution until it reaches a terminal state; fails if the task fails. On successful completion of the Execution the Sensor will do an XCom Push of the State Machine's output to `output` diff --git a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py index d2967235c1..e2d2c6ed7e 100644 --- a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py +++ b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py @@ -15,10 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -""" -This module contains operators to replicate records from -DynamoDB table to S3. -""" + from __future__ import annotations import json @@ -72,6 +69,7 @@ def _upload_file_to_s3( class DynamoDBToS3Operator(AwsToAwsBaseOperator): """ Replicates records from a DynamoDB table to S3. + It scans a DynamoDB table and writes the received records to a file on the local filesystem. It flushes the file to S3 once the file size exceeds the file size limit specified by the user. @@ -88,7 +86,7 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator): :param file_size: Flush file to s3 if file size >= file_size :param dynamodb_scan_kwargs: kwargs pass to <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Table.scan> :param s3_key_prefix: Prefix of s3 object key - :param process_func: How we transforms a dynamodb item to bytes. By default we dump the json + :param process_func: How we transform a dynamodb item to bytes. By default, we dump the json :param export_time: Time in the past from which to export table data, counted in seconds from the start of the Unix epoch. The table export will be a snapshot of the table's state at this point in time. :param export_format: The format for the exported data. Valid values for ExportFormat are DYNAMODB_JSON @@ -147,8 +145,9 @@ class DynamoDBToS3Operator(AwsToAwsBaseOperator): def _export_table_to_point_in_time(self): """ - Export data from start of epoc till `export_time`. Table export will be a snapshot of the table's - state at this point in time. + Export data from start of epoc till `export_time`. + + Table export will be a snapshot of the table's state at this point in time. """ if self.export_time and self.export_time > datetime.now(self.export_time.tzinfo): raise ValueError("The export_time parameter cannot be a future time.") diff --git a/airflow/providers/amazon/aws/transfers/ftp_to_s3.py b/airflow/providers/amazon/aws/transfers/ftp_to_s3.py index 45811a82e4..e13c98605d 100644 --- a/airflow/providers/amazon/aws/transfers/ftp_to_s3.py +++ b/airflow/providers/amazon/aws/transfers/ftp_to_s3.py @@ -30,8 +30,7 @@ if TYPE_CHECKING: class FTPToS3Operator(BaseOperator): """ - This operator enables the transfer of files from a FTP server to S3. It can be used to - transfer one or multiple files. + Transfer of one or more files from an FTP server to S3. .. seealso:: For more information on how to use this operator, take a look at the guide: diff --git a/airflow/providers/amazon/aws/transfers/hive_to_dynamodb.py b/airflow/providers/amazon/aws/transfers/hive_to_dynamodb.py index 167665d627..318f573250 100644 --- a/airflow/providers/amazon/aws/transfers/hive_to_dynamodb.py +++ b/airflow/providers/amazon/aws/transfers/hive_to_dynamodb.py @@ -31,9 +31,10 @@ if TYPE_CHECKING: class HiveToDynamoDBOperator(BaseOperator): """ - Moves data from Hive to DynamoDB, note that for now the data is loaded - into memory before being pushed to DynamoDB, so this operator should - be used for smallish amount of data. + Moves data from Hive to DynamoDB. + + Note that for now the data is loaded into memory before being pushed + to DynamoDB, so this operator should be used for smallish amount of data. .. seealso:: For more information on how to use this operator, take a look at the guide: diff --git a/airflow/providers/amazon/aws/transfers/sftp_to_s3.py b/airflow/providers/amazon/aws/transfers/sftp_to_s3.py index 00663f93b6..2d040fd04a 100644 --- a/airflow/providers/amazon/aws/transfers/sftp_to_s3.py +++ b/airflow/providers/amazon/aws/transfers/sftp_to_s3.py @@ -31,8 +31,7 @@ if TYPE_CHECKING: class SFTPToS3Operator(BaseOperator): """ - This operator enables the transferring of files from a SFTP server to - Amazon S3. + Transfer files from an SFTP server to Amazon S3. .. seealso:: For more information on how to use this operator, take a look at the guide: diff --git a/airflow/providers/amazon/aws/transfers/sql_to_s3.py b/airflow/providers/amazon/aws/transfers/sql_to_s3.py index aa8382541c..0324406820 100644 --- a/airflow/providers/amazon/aws/transfers/sql_to_s3.py +++ b/airflow/providers/amazon/aws/transfers/sql_to_s3.py @@ -133,6 +133,7 @@ class SqlToS3Operator(BaseOperator): def _fix_dtypes(df: DataFrame, file_format: FILE_FORMAT) -> None: """ Mutate DataFrame to set dtypes for float columns containing NaN values. + Set dtype of object to str to allow for downstream transformations. """ try: diff --git a/airflow/providers/amazon/aws/triggers/batch.py b/airflow/providers/amazon/aws/triggers/batch.py index b0bdbc0d45..f7d335280d 100644 --- a/airflow/providers/amazon/aws/triggers/batch.py +++ b/airflow/providers/amazon/aws/triggers/batch.py @@ -29,9 +29,7 @@ from airflow.triggers.base import BaseTrigger, TriggerEvent class BatchOperatorTrigger(BaseTrigger): """ - Trigger for BatchOperator. - The trigger will asynchronously poll the boto3 API and wait for the - Batch job to be in the `SUCCEEDED` state. + Asynchronously poll the boto3 API and wait for the Batch job to be in the `SUCCEEDED` state. :param job_id: A unique identifier for the cluster. :param max_retries: The maximum number of attempts to be made. @@ -111,6 +109,7 @@ class BatchOperatorTrigger(BaseTrigger): class BatchSensorTrigger(BaseTrigger): """ Checks for the status of a submitted job_id to AWS Batch until it reaches a failure or a success state. + BatchSensorTrigger is fired as deferred class with params to poll the job state in Triggerer. :param job_id: the job ID, to poll for job completion or not @@ -152,8 +151,7 @@ class BatchSensorTrigger(BaseTrigger): async def run(self): """ - Make async connection using aiobotocore library to AWS Batch, - periodically poll for the Batch job status. + Make async connection using aiobotocore library to AWS Batch, periodically poll for the job status. The status that indicates job completion are: 'SUCCEEDED'|'FAILED'. """ @@ -193,8 +191,7 @@ class BatchSensorTrigger(BaseTrigger): class BatchCreateComputeEnvironmentTrigger(BaseTrigger): """ - Trigger for BatchCreateComputeEnvironmentOperator. - The trigger will asynchronously poll the boto3 API and wait for the compute environment to be ready. + Asynchronously poll the boto3 API and wait for the compute environment to be ready. :param job_id: A unique identifier for the cluster. :param max_retries: The maximum number of attempts to be made. diff --git a/airflow/providers/amazon/aws/triggers/ec2.py b/airflow/providers/amazon/aws/triggers/ec2.py index 6372f2854d..8b70b7315b 100644 --- a/airflow/providers/amazon/aws/triggers/ec2.py +++ b/airflow/providers/amazon/aws/triggers/ec2.py @@ -26,8 +26,7 @@ from airflow.triggers.base import BaseTrigger, TriggerEvent class EC2StateSensorTrigger(BaseTrigger): """ - Trigger for EC2StateSensor. The Trigger polls the EC2 instance, and yields a TriggerEvent once - the state of the instance matches the `target_state`. + Poll the EC2 instance and yield a TriggerEvent once the state of the instance matches the target_state. :param instance_id: id of the AWS EC2 instance :param target_state: target state of instance diff --git a/airflow/providers/amazon/aws/triggers/ecs.py b/airflow/providers/amazon/aws/triggers/ecs.py index 8ba8350588..c8977d33e9 100644 --- a/airflow/providers/amazon/aws/triggers/ecs.py +++ b/airflow/providers/amazon/aws/triggers/ecs.py @@ -164,6 +164,7 @@ class TaskDoneTrigger(BaseTrigger): async def _forward_logs(self, logs_client, next_token: str | None = None) -> str | None: """ Reads logs from the cloudwatch stream and prints them to the task logs. + :return: the token to pass to the next iteration to resume where we started. """ while True: diff --git a/airflow/providers/amazon/aws/triggers/eks.py b/airflow/providers/amazon/aws/triggers/eks.py index be5d50ab4c..d01c88dc88 100644 --- a/airflow/providers/amazon/aws/triggers/eks.py +++ b/airflow/providers/amazon/aws/triggers/eks.py @@ -29,8 +29,7 @@ from airflow.triggers.base import BaseTrigger, TriggerEvent class EksCreateFargateProfileTrigger(BaseTrigger): """ - Trigger for EksCreateFargateProfileOperator. - The trigger will asynchronously wait for the fargate profile to be created. + Asynchronously wait for the fargate profile to be created. :param cluster_name: The name of the EKS cluster :param fargate_profile_name: The name of the fargate profile @@ -99,8 +98,7 @@ class EksCreateFargateProfileTrigger(BaseTrigger): class EksDeleteFargateProfileTrigger(BaseTrigger): """ - Trigger for EksDeleteFargateProfileOperator. - The trigger will asynchronously wait for the fargate profile to be deleted. + Asynchronously wait for the fargate profile to be deleted. :param cluster_name: The name of the EKS cluster :param fargate_profile_name: The name of the fargate profile diff --git a/airflow/providers/amazon/aws/triggers/emr.py b/airflow/providers/amazon/aws/triggers/emr.py index 632144c850..dbf620e9cb 100644 --- a/airflow/providers/amazon/aws/triggers/emr.py +++ b/airflow/providers/amazon/aws/triggers/emr.py @@ -31,9 +31,8 @@ from airflow.utils.helpers import prune_dict class EmrAddStepsTrigger(BaseTrigger): """ - AWS Emr Add Steps Trigger - The trigger will asynchronously poll the boto3 API and wait for the - steps to finish executing. + Asynchronously poll the boto3 API and wait for the steps to finish executing. + :param job_flow_id: The id of the job flow. :param step_ids: The id of the steps being waited upon. :param poll_interval: The amount of time in seconds to wait between attempts. @@ -105,9 +104,7 @@ class EmrAddStepsTrigger(BaseTrigger): class EmrCreateJobFlowTrigger(BaseTrigger): """ - Trigger for EmrCreateJobFlowOperator. - The trigger will asynchronously poll the boto3 API and wait for the - JobFlow to finish executing. + Asynchronously poll the boto3 API and wait for the JobFlow to finish executing. :param job_flow_id: The id of the job flow to wait for. :param poll_interval: The amount of time in seconds to wait between attempts. @@ -179,9 +176,7 @@ class EmrCreateJobFlowTrigger(BaseTrigger): class EmrTerminateJobFlowTrigger(BaseTrigger): """ - Trigger that terminates a running EMR Job Flow. - The trigger will asynchronously poll the boto3 API and wait for the - JobFlow to finish terminating. + Asynchronously poll the boto3 API and wait for the JobFlow to finish terminating. :param job_flow_id: ID of the EMR Job Flow to terminate :param poll_interval: The amount of time in seconds to wait between attempts. diff --git a/airflow/providers/amazon/aws/utils/tags.py b/airflow/providers/amazon/aws/utils/tags.py index f84f275735..5b8eb736bb 100644 --- a/airflow/providers/amazon/aws/utils/tags.py +++ b/airflow/providers/amazon/aws/utils/tags.py @@ -21,8 +21,11 @@ from typing import Any def format_tags(source: Any, *, key_label: str = "Key", value_label: str = "Value"): """ + Format tags for boto call which expect a given format. + If given a dictionary, formats it as an array of objects with a key and a value field to be passed to boto calls that expect this format. + Else, assumes that it's already in the right format and returns it as is. We do not validate the format here since it's done by boto anyway, and the error would not be clearer if thrown from here. diff --git a/airflow/providers/amazon/aws/utils/task_log_fetcher.py b/airflow/providers/amazon/aws/utils/task_log_fetcher.py index fc33219d72..f184df6234 100644 --- a/airflow/providers/amazon/aws/utils/task_log_fetcher.py +++ b/airflow/providers/amazon/aws/utils/task_log_fetcher.py @@ -29,10 +29,7 @@ from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook class AwsTaskLogFetcher(Thread): - """ - Fetches Cloudwatch log events with specific interval as a thread - and sends the log events to the info channel of the provided logger. - """ + """Fetch Cloudwatch log events with specific interval and send the log events to the logger.info.""" def __init__( self, @@ -95,7 +92,9 @@ class AwsTaskLogFetcher(Thread): def get_last_log_messages(self, number_messages) -> list: """ - Gets the last logs messages in one single request, so restrictions apply: + Gets the last logs messages in one single request. + + NOTE: some restrictions apply: - if logs are too old, the response will be empty - the max number of messages we can retrieve is constrained by cloudwatch limits (10,000). """ diff --git a/airflow/providers/amazon/aws/utils/waiter.py b/airflow/providers/amazon/aws/utils/waiter.py index 6817e31881..b096c30203 100644 --- a/airflow/providers/amazon/aws/utils/waiter.py +++ b/airflow/providers/amazon/aws/utils/waiter.py @@ -38,8 +38,7 @@ def waiter( check_interval_seconds: int = 60, ) -> None: """ - Will call get_state_callable until it reaches the desired_state or the failure_states. - It will also time out if it waits longer than countdown seconds. + Call get_state_callable until it reaches the desired_state or the failure_states. PLEASE NOTE: While not yet deprecated, we are moving away from this method and encourage using the custom boto waiters as explained in diff --git a/airflow/providers/amazon/aws/utils/waiter_with_logging.py b/airflow/providers/amazon/aws/utils/waiter_with_logging.py index 1689e66370..5f66b01cd2 100644 --- a/airflow/providers/amazon/aws/utils/waiter_with_logging.py +++ b/airflow/providers/amazon/aws/utils/waiter_with_logging.py @@ -39,9 +39,11 @@ def wait( status_args: list[str], ) -> None: """ - Use a boto waiter to poll an AWS service for the specified state. Although this function - uses boto waiters to poll the state of the service, it logs the response of the service - after every attempt, which is not currently supported by boto waiters. + Use a boto waiter to poll an AWS service for the specified state. + + Although this function uses boto waiters to poll the state of the + service, it logs the response of the service after every attempt, + which is not currently supported by boto waiters. :param waiter: The boto waiter to use. :param waiter_delay: The amount of time in seconds to wait between attempts. @@ -88,9 +90,11 @@ async def async_wait( status_args: list[str], ): """ - Use an async boto waiter to poll an AWS service for the specified state. Although this function - uses boto waiters to poll the state of the service, it logs the response of the service - after every attempt, which is not currently supported by boto waiters. + Use an async boto waiter to poll an AWS service for the specified state. + + Although this function uses boto waiters to poll the state of the + service, it logs the response of the service after every attempt, + which is not currently supported by boto waiters. :param waiter: The boto waiter to use. :param waiter_delay: The amount of time in seconds to wait between attempts. @@ -129,8 +133,8 @@ async def async_wait( class _LazyStatusFormatter: """ - a wrapper containing the info necessary to extract the status from a response, - that'll only compute the value when necessary. + Contains the info necessary to extract the status from a response; only computes the value when necessary. + Used to avoid computations if the logs are disabled at the given level. """ @@ -139,10 +143,7 @@ class _LazyStatusFormatter: self.response = response def __str__(self): - """ - Loops through the supplied args list and generates a string - which contains values from the waiter response. - """ + """Loop through the args list and generate a string containing values from the waiter response.""" values = [] for query in self.jmespath_queries: value = jmespath.search(query, self.response)