This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2f96127b9e Minor name change for the util wait method. (#32152)
2f96127b9e is described below
commit 2f96127b9eb84c045632c44fdc6871e04d3b0634
Author: Syed Hussaain <[email protected]>
AuthorDate: Mon Jun 26 13:02:28 2023 -0700
Minor name change for the util wait method. (#32152)
---
airflow/providers/amazon/aws/hooks/athena.py | 2 +-
airflow/providers/amazon/aws/operators/eks.py | 4 ++--
airflow/providers/amazon/aws/operators/emr.py | 12 ++++++------
.../providers/amazon/aws/utils/waiter_with_logging.py | 12 ++++++------
.../amazon/aws/utils/test_waiter_with_logging.py | 16 ++++++++--------
5 files changed, 23 insertions(+), 23 deletions(-)
diff --git a/airflow/providers/amazon/aws/hooks/athena.py
b/airflow/providers/amazon/aws/hooks/athena.py
index b0d1878507..7341ebcf70 100644
--- a/airflow/providers/amazon/aws/hooks/athena.py
+++ b/airflow/providers/amazon/aws/hooks/athena.py
@@ -254,7 +254,7 @@ class AthenaHook(AwsBaseHook):
wait(
waiter=self.get_waiter("query_complete"),
waiter_delay=sleep_time or self.sleep_time,
- max_attempts=max_polling_attempts or 120,
+ waiter_max_attempts=max_polling_attempts or 120,
args={"QueryExecutionId": query_execution_id},
failure_message=f"Error while waiting for query
{query_execution_id} to complete",
status_message=f"Query execution id: {query_execution_id}, "
diff --git a/airflow/providers/amazon/aws/operators/eks.py
b/airflow/providers/amazon/aws/operators/eks.py
index e280da5e5a..75fb48fdfe 100644
--- a/airflow/providers/amazon/aws/operators/eks.py
+++ b/airflow/providers/amazon/aws/operators/eks.py
@@ -98,7 +98,7 @@ def _create_compute(
wait(
waiter=eks_hook.conn.get_waiter("nodegroup_active"),
waiter_delay=waiter_delay,
- max_attempts=waiter_max_attempts,
+ waiter_max_attempts=waiter_max_attempts,
args={"clusterName": cluster_name, "nodegroupName":
nodegroup_name},
failure_message="Nodegroup creation failed",
status_message="Nodegroup status is",
@@ -122,7 +122,7 @@ def _create_compute(
wait(
waiter=eks_hook.conn.get_waiter("fargate_profile_active"),
waiter_delay=waiter_delay,
- max_attempts=waiter_max_attempts,
+ waiter_max_attempts=waiter_max_attempts,
args={"clusterName": cluster_name, "fargateProfileName":
fargate_profile_name},
failure_message="Fargate profile creation failed",
status_message="Fargate profile status is",
diff --git a/airflow/providers/amazon/aws/operators/emr.py
b/airflow/providers/amazon/aws/operators/emr.py
index 9fdad3b918..61b53b8bcb 100644
--- a/airflow/providers/amazon/aws/operators/emr.py
+++ b/airflow/providers/amazon/aws/operators/emr.py
@@ -1025,7 +1025,7 @@ class
EmrServerlessCreateApplicationOperator(BaseOperator):
wait(
waiter=waiter,
waiter_delay=self.waiter_delay,
- max_attempts=self.waiter_max_attempts,
+ waiter_max_attempts=self.waiter_max_attempts,
args={"applicationId": application_id},
failure_message="Serverless Application creation failed",
status_message="Serverless Application status is",
@@ -1038,7 +1038,7 @@ class
EmrServerlessCreateApplicationOperator(BaseOperator):
waiter = self.hook.get_waiter("serverless_app_started")
wait(
waiter=waiter,
- max_attempts=self.waiter_max_attempts,
+ waiter_max_attempts=self.waiter_max_attempts,
waiter_delay=self.waiter_delay,
args={"applicationId": application_id},
failure_message="Serverless Application failed to start",
@@ -1158,7 +1158,7 @@ class EmrServerlessStartJobOperator(BaseOperator):
wait(
waiter=waiter,
- max_attempts=self.waiter_max_attempts,
+ waiter_max_attempts=self.waiter_max_attempts,
waiter_delay=self.waiter_delay,
args={"applicationId": self.application_id},
failure_message="Serverless Application failed to start",
@@ -1185,7 +1185,7 @@ class EmrServerlessStartJobOperator(BaseOperator):
waiter = self.hook.get_waiter("serverless_job_completed")
wait(
waiter=waiter,
- max_attempts=self.waiter_max_attempts,
+ waiter_max_attempts=self.waiter_max_attempts,
waiter_delay=self.waiter_delay,
args={"applicationId": self.application_id, "jobRunId":
self.job_id},
failure_message="Serverless Job failed",
@@ -1314,7 +1314,7 @@ class EmrServerlessStopApplicationOperator(BaseOperator):
waiter = self.hook.get_waiter("serverless_app_stopped")
wait(
waiter=waiter,
- max_attempts=self.waiter_max_attempts,
+ waiter_max_attempts=self.waiter_max_attempts,
waiter_delay=self.waiter_delay,
args={"applicationId": self.application_id},
failure_message="Error stopping application",
@@ -1409,7 +1409,7 @@ class
EmrServerlessDeleteApplicationOperator(EmrServerlessStopApplicationOperato
wait(
waiter=waiter,
- max_attempts=self.waiter_max_attempts,
+ waiter_max_attempts=self.waiter_max_attempts,
waiter_delay=self.waiter_delay,
args={"applicationId": self.application_id},
failure_message="Error terminating application",
diff --git a/airflow/providers/amazon/aws/utils/waiter_with_logging.py
b/airflow/providers/amazon/aws/utils/waiter_with_logging.py
index b883e36bdb..1689e66370 100644
--- a/airflow/providers/amazon/aws/utils/waiter_with_logging.py
+++ b/airflow/providers/amazon/aws/utils/waiter_with_logging.py
@@ -32,7 +32,7 @@ from airflow.exceptions import AirflowException
def wait(
waiter: Waiter,
waiter_delay: int,
- max_attempts: int,
+ waiter_max_attempts: int,
args: dict[str, Any],
failure_message: str,
status_message: str,
@@ -45,7 +45,7 @@ def wait(
:param waiter: The boto waiter to use.
:param waiter_delay: The amount of time in seconds to wait between
attempts.
- :param max_attempts: The maximum number of attempts to be made.
+ :param waiter_max_attempts: The maximum number of attempts to be made.
:param args: The arguments to pass to the waiter.
:param failure_message: The message to log if a failure state is reached.
:param status_message: The message logged when printing the status of the
service.
@@ -72,7 +72,7 @@ def wait(
raise AirflowException(f"{failure_message}: {error}")
log.info("%s: %s", status_message,
_LazyStatusFormatter(status_args, error.last_response))
- if attempt >= max_attempts:
+ if attempt >= waiter_max_attempts:
raise AirflowException("Waiter error: max attempts reached")
time.sleep(waiter_delay)
@@ -81,7 +81,7 @@ def wait(
async def async_wait(
waiter: Waiter,
waiter_delay: int,
- max_attempts: int,
+ waiter_max_attempts: int,
args: dict[str, Any],
failure_message: str,
status_message: str,
@@ -94,7 +94,7 @@ async def async_wait(
:param waiter: The boto waiter to use.
:param waiter_delay: The amount of time in seconds to wait between
attempts.
- :param max_attempts: The maximum number of attempts to be made.
+ :param waiter_max_attempts: The maximum number of attempts to be made.
:param args: The arguments to pass to the waiter.
:param failure_message: The message to log if a failure state is reached.
:param status_message: The message logged when printing the status of the
service.
@@ -121,7 +121,7 @@ async def async_wait(
raise AirflowException(f"{failure_message}: {error}")
log.info("%s: %s", status_message,
_LazyStatusFormatter(status_args, error.last_response))
- if attempt >= max_attempts:
+ if attempt >= waiter_max_attempts:
raise AirflowException("Waiter error: max attempts reached")
await asyncio.sleep(waiter_delay)
diff --git a/tests/providers/amazon/aws/utils/test_waiter_with_logging.py
b/tests/providers/amazon/aws/utils/test_waiter_with_logging.py
index 4580c21054..03427c2ac5 100644
--- a/tests/providers/amazon/aws/utils/test_waiter_with_logging.py
+++ b/tests/providers/amazon/aws/utils/test_waiter_with_logging.py
@@ -51,7 +51,7 @@ class TestWaiter:
wait(
waiter=mock_waiter,
waiter_delay=123,
- max_attempts=456,
+ waiter_max_attempts=456,
args={"test_arg": "test_value"},
failure_message="test failure message",
status_message="test status message",
@@ -92,7 +92,7 @@ class TestWaiter:
await async_wait(
waiter=mock_waiter,
waiter_delay=0,
- max_attempts=456,
+ waiter_max_attempts=456,
args={"test_arg": "test_value"},
failure_message="test failure message",
status_message="test status message",
@@ -122,7 +122,7 @@ class TestWaiter:
wait(
waiter=mock_waiter,
waiter_delay=123,
- max_attempts=2,
+ waiter_max_attempts=2,
args={"test_arg": "test_value"},
failure_message="test failure message",
status_message="test status message",
@@ -169,7 +169,7 @@ class TestWaiter:
wait(
waiter=mock_waiter,
waiter_delay=123,
- max_attempts=10,
+ waiter_max_attempts=10,
args={"test_arg": "test_value"},
failure_message="test failure message",
status_message="test status message",
@@ -217,7 +217,7 @@ class TestWaiter:
wait(
waiter=mock_waiter,
waiter_delay=123,
- max_attempts=456,
+ waiter_max_attempts=456,
args={"test_arg": "test_value"},
failure_message="test failure message",
status_message="test status message",
@@ -266,7 +266,7 @@ class TestWaiter:
wait(
waiter=mock_waiter,
waiter_delay=123,
- max_attempts=456,
+ waiter_max_attempts=456,
args={"test_arg": "test_value"},
failure_message="test failure message",
status_message="test status message",
@@ -314,7 +314,7 @@ class TestWaiter:
wait(
waiter=mock_waiter,
waiter_delay=123,
- max_attempts=456,
+ waiter_max_attempts=456,
args={"test_arg": "test_value"},
failure_message="test failure message",
status_message="test status message",
@@ -348,7 +348,7 @@ class TestWaiter:
wait(
waiter=mock_waiter,
waiter_delay=0,
- max_attempts=456,
+ waiter_max_attempts=456,
args={"test_arg": "test_value"},
failure_message="test failure message",
status_message="test status message",