This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new fa0aaf694e Resolving EMR notebook deprecated warnings (#39829)
fa0aaf694e is described below
commit fa0aaf694ecf71364c7e8032b803ecc5cef7ac3d
Author: Gopal Dirisala <[email protected]>
AuthorDate: Fri May 31 18:08:42 2024 +0530
Resolving EMR notebook deprecated warnings (#39829)
* Resolving EMR notebook deprecated warnings
* Resolving EMR notebook deprecated warnings
* Resolving EMR notebook deprecated warnings
---
airflow/providers/amazon/aws/operators/emr.py | 64 +++++++++++++++------------
tests/always/test_example_dags.py | 1 -
2 files changed, 36 insertions(+), 29 deletions(-)
diff --git a/airflow/providers/amazon/aws/operators/emr.py
b/airflow/providers/amazon/aws/operators/emr.py
index 664d4b6d84..de74951289 100644
--- a/airflow/providers/amazon/aws/operators/emr.py
+++ b/airflow/providers/amazon/aws/operators/emr.py
@@ -263,30 +263,34 @@ class EmrStartNotebookExecutionOperator(BaseOperator):
wait_for_completion: bool = False,
aws_conn_id: str | None = "aws_default",
# TODO: waiter_max_attempts and waiter_delay should default to None
when the other two are deprecated.
- waiter_max_attempts: int | None | ArgNotSet = NOTSET,
- waiter_delay: int | None | ArgNotSet = NOTSET,
- waiter_countdown: int = 25 * 60,
- waiter_check_interval_seconds: int = 60,
+ waiter_max_attempts: int | None = None,
+ waiter_delay: int | None = None,
+ waiter_countdown: int | None = None,
+ waiter_check_interval_seconds: int | None = None,
**kwargs: Any,
):
- if waiter_max_attempts is NOTSET:
+ if waiter_check_interval_seconds:
warnings.warn(
- "The parameter waiter_countdown has been deprecated to
standardize "
- "naming conventions. Please use waiter_max_attempts instead.
In the "
+ "The parameter `waiter_check_interval_seconds` has been
deprecated to "
+ "standardize naming conventions. Please `use waiter_delay
instead`. In the "
"future this will default to None and defer to the waiter's
default value.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
- waiter_max_attempts = waiter_countdown //
waiter_check_interval_seconds
- if waiter_delay is NOTSET:
+ else:
+ waiter_check_interval_seconds = 60
+ if waiter_countdown:
warnings.warn(
- "The parameter waiter_check_interval_seconds has been
deprecated to "
- "standardize naming conventions. Please use waiter_delay
instead. In the "
+ "The parameter waiter_countdown has been deprecated to
standardize "
+ "naming conventions. Please use waiter_max_attempts instead.
In the "
"future this will default to None and defer to the waiter's
default value.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
- waiter_delay = waiter_check_interval_seconds
+ # waiter_countdown defaults to never timing out, which is not
supported
+ # by boto waiters, so we will set it here to "a very long time"
for now.
+ waiter_max_attempts = (waiter_countdown or 999) //
waiter_check_interval_seconds
+
super().__init__(**kwargs)
self.editor_id = editor_id
self.relative_path = relative_path
@@ -298,8 +302,8 @@ class EmrStartNotebookExecutionOperator(BaseOperator):
self.wait_for_completion = wait_for_completion
self.cluster_id = cluster_id
self.aws_conn_id = aws_conn_id
- self.waiter_max_attempts = waiter_max_attempts
- self.waiter_delay = waiter_delay
+ self.waiter_max_attempts = waiter_max_attempts or 25
+ self.waiter_delay = waiter_delay or waiter_check_interval_seconds or 60
self.master_instance_security_group_id =
master_instance_security_group_id
def execute(self, context: Context):
@@ -387,36 +391,40 @@ class EmrStopNotebookExecutionOperator(BaseOperator):
wait_for_completion: bool = False,
aws_conn_id: str | None = "aws_default",
# TODO: waiter_max_attempts and waiter_delay should default to None
when the other two are deprecated.
- waiter_max_attempts: int | None | ArgNotSet = NOTSET,
- waiter_delay: int | None | ArgNotSet = NOTSET,
- waiter_countdown: int = 25 * 60,
- waiter_check_interval_seconds: int = 60,
+ waiter_max_attempts: int | None = None,
+ waiter_delay: int | None = None,
+ waiter_countdown: int | None = None,
+ waiter_check_interval_seconds: int | None = None,
**kwargs: Any,
):
- if waiter_max_attempts is NOTSET:
+ if waiter_check_interval_seconds:
warnings.warn(
- "The parameter waiter_countdown has been deprecated to
standardize "
- "naming conventions. Please use waiter_max_attempts instead.
In the "
+ "The parameter `waiter_check_interval_seconds` has been
deprecated to "
+ "standardize naming conventions. Please `use waiter_delay
instead`. In the "
"future this will default to None and defer to the waiter's
default value.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
- waiter_max_attempts = waiter_countdown //
waiter_check_interval_seconds
- if waiter_delay is NOTSET:
+ else:
+ waiter_check_interval_seconds = 60
+ if waiter_countdown:
warnings.warn(
- "The parameter waiter_check_interval_seconds has been
deprecated to "
- "standardize naming conventions. Please use waiter_delay
instead. In the "
+ "The parameter waiter_countdown has been deprecated to
standardize "
+ "naming conventions. Please use waiter_max_attempts instead.
In the "
"future this will default to None and defer to the waiter's
default value.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
- waiter_delay = waiter_check_interval_seconds
+ # waiter_countdown defaults to never timing out, which is not
supported
+ # by boto waiters, so we will set it here to "a very long time"
for now.
+ waiter_max_attempts = (waiter_countdown or 999) //
waiter_check_interval_seconds
+
super().__init__(**kwargs)
self.notebook_execution_id = notebook_execution_id
self.wait_for_completion = wait_for_completion
self.aws_conn_id = aws_conn_id
- self.waiter_max_attempts = waiter_max_attempts
- self.waiter_delay = waiter_delay
+ self.waiter_max_attempts = waiter_max_attempts or 25
+ self.waiter_delay = waiter_delay or waiter_check_interval_seconds or 60
def execute(self, context: Context) -> None:
emr_hook = EmrHook(aws_conn_id=self.aws_conn_id)
diff --git a/tests/always/test_example_dags.py
b/tests/always/test_example_dags.py
index 6affbe7aa5..7cd53f09e6 100644
--- a/tests/always/test_example_dags.py
+++ b/tests/always/test_example_dags.py
@@ -47,7 +47,6 @@ IGNORE_AIRFLOW_PROVIDER_DEPRECATION_WARNING: tuple[str, ...]
= (
# Generally, these should be resolved as soon as a parameter or operator
is deprecated.
# If the deprecation is postponed, the item should be added to this tuple,
# and a corresponding Issue should be created on GitHub.
- "tests/system/providers/amazon/aws/example_emr_notebook_execution.py",
"tests/system/providers/google/cloud/bigquery/example_bigquery_operations.py",
"tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py",
"tests/system/providers/google/cloud/gcs/example_gcs_sensor.py",