This is an automated email from the ASF dual-hosted git repository.
husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8a191938b6 Remove @poke_mode_only from EmrStepSensor (#30774)
8a191938b6 is described below
commit 8a191938b62edcce8093d2522759ede960a00f5f
Author: Vincent <[email protected]>
AuthorDate: Thu Apr 20 17:21:52 2023 -0600
Remove @poke_mode_only from EmrStepSensor (#30774)
* Remove @poke_mode_only from EmrStepSensor
* Add EmrStepSensor to system test and documentation
* Fix test
---
airflow/providers/amazon/aws/sensors/emr.py | 3 +--
.../operators/emr/emr.rst | 9 +++++++++
tests/always/test_project_structure.py | 2 --
tests/system/providers/amazon/aws/example_emr.py | 16 +++++++++++++++-
4 files changed, 25 insertions(+), 5 deletions(-)
diff --git a/airflow/providers/amazon/aws/sensors/emr.py
b/airflow/providers/amazon/aws/sensors/emr.py
index 811846ba45..140edec404 100644
--- a/airflow/providers/amazon/aws/sensors/emr.py
+++ b/airflow/providers/amazon/aws/sensors/emr.py
@@ -25,7 +25,7 @@ from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.emr import EmrContainerHook, EmrHook,
EmrServerlessHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.links.emr import EmrLogsLink
-from airflow.sensors.base import BaseSensorOperator, poke_mode_only
+from airflow.sensors.base import BaseSensorOperator
if TYPE_CHECKING:
from airflow.utils.context import Context
@@ -451,7 +451,6 @@ class EmrJobFlowSensor(EmrBaseSensor):
return None
-@poke_mode_only
class EmrStepSensor(EmrBaseSensor):
"""
Asks for the state of the step until it reaches any of the target states.
diff --git a/docs/apache-airflow-providers-amazon/operators/emr/emr.rst
b/docs/apache-airflow-providers-amazon/operators/emr/emr.rst
index 4b517dab8f..2e3330922c 100644
--- a/docs/apache-airflow-providers-amazon/operators/emr/emr.rst
+++ b/docs/apache-airflow-providers-amazon/operators/emr/emr.rst
@@ -188,6 +188,15 @@ To monitor the state of an EMR job flow you can use
Wait on an Amazon EMR step state
================================
+To monitor the state of an EMR job step you can use
+:class:`~airflow.providers.amazon.aws.sensors.emr.EmrStepSensor`.
+
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_emr.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_sensor_emr_step]
+ :end-before: [END howto_sensor_emr_step]
+
Reference
---------
diff --git a/tests/always/test_project_structure.py
b/tests/always/test_project_structure.py
index ec26f526a8..9a9da7c850 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -405,8 +405,6 @@ class
TestAmazonProviderProjectStructure(ExampleCoverageTest):
"airflow.providers.amazon.aws.transfers.exasol_to_s3.ExasolToS3Operator",
# Glue Catalog sensor difficult to test
"airflow.providers.amazon.aws.sensors.glue_catalog_partition.GlueCatalogPartitionSensor",
- # EMR Step sensor difficult to test, see:
https://github.com/apache/airflow/pull/27286
- "airflow.providers.amazon.aws.sensors.emr.EmrStepSensor",
}
DEPRECATED_CLASSES = {
diff --git a/tests/system/providers/amazon/aws/example_emr.py
b/tests/system/providers/amazon/aws/example_emr.py
index 06d8b28f45..8ca3d5e97e 100644
--- a/tests/system/providers/amazon/aws/example_emr.py
+++ b/tests/system/providers/amazon/aws/example_emr.py
@@ -34,7 +34,7 @@ from airflow.providers.amazon.aws.operators.emr import (
EmrTerminateJobFlowOperator,
)
from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator,
S3DeleteBucketOperator
-from airflow.providers.amazon.aws.sensors.emr import EmrJobFlowSensor
+from airflow.providers.amazon.aws.sensors.emr import EmrJobFlowSensor,
EmrStepSensor
from airflow.utils.trigger_rule import TriggerRule
from tests.system.providers.amazon.aws.utils import ENV_ID_KEY,
SystemTestContextBuilder
@@ -117,6 +117,11 @@ def delete_security_config(config_name: str):
)
+@task
+def get_step_id(step_ids: list):
+ return step_ids[0]
+
+
sys_test_context_task =
SystemTestContextBuilder().add_variable(EXECUTION_ROLE_ARN_KEY).build()
with DAG(
@@ -164,6 +169,14 @@ with DAG(
)
# [END howto_operator_emr_add_steps]
+ # [START howto_sensor_emr_step]
+ wait_for_step = EmrStepSensor(
+ task_id="wait_for_step",
+ job_flow_id=create_job_flow.output,
+ step_id=get_step_id(add_steps.output),
+ )
+ # [END howto_sensor_emr_step]
+
# [START howto_operator_emr_terminate_job_flow]
remove_cluster = EmrTerminateJobFlowOperator(
task_id="remove_cluster",
@@ -195,6 +208,7 @@ with DAG(
create_job_flow,
modify_cluster,
add_steps,
+ wait_for_step,
# TEST TEARDOWN
remove_cluster,
check_job_flow,