This is an automated email from the ASF dual-hosted git repository.

husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8a191938b6 Remove @poke_mode_only from EmrStepSensor (#30774)
8a191938b6 is described below

commit 8a191938b62edcce8093d2522759ede960a00f5f
Author: Vincent <[email protected]>
AuthorDate: Thu Apr 20 17:21:52 2023 -0600

    Remove @poke_mode_only from EmrStepSensor (#30774)
    
    * Remove @poke_mode_only from EmrStepSensor
    
    * Add EmrStepSensor to system test and documentation
    
    * Fix test
---
 airflow/providers/amazon/aws/sensors/emr.py              |  3 +--
 .../operators/emr/emr.rst                                |  9 +++++++++
 tests/always/test_project_structure.py                   |  2 --
 tests/system/providers/amazon/aws/example_emr.py         | 16 +++++++++++++++-
 4 files changed, 25 insertions(+), 5 deletions(-)

diff --git a/airflow/providers/amazon/aws/sensors/emr.py 
b/airflow/providers/amazon/aws/sensors/emr.py
index 811846ba45..140edec404 100644
--- a/airflow/providers/amazon/aws/sensors/emr.py
+++ b/airflow/providers/amazon/aws/sensors/emr.py
@@ -25,7 +25,7 @@ from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.hooks.emr import EmrContainerHook, EmrHook, 
EmrServerlessHook
 from airflow.providers.amazon.aws.hooks.s3 import S3Hook
 from airflow.providers.amazon.aws.links.emr import EmrLogsLink
-from airflow.sensors.base import BaseSensorOperator, poke_mode_only
+from airflow.sensors.base import BaseSensorOperator
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
@@ -451,7 +451,6 @@ class EmrJobFlowSensor(EmrBaseSensor):
         return None
 
 
-@poke_mode_only
 class EmrStepSensor(EmrBaseSensor):
     """
     Asks for the state of the step until it reaches any of the target states.
diff --git a/docs/apache-airflow-providers-amazon/operators/emr/emr.rst 
b/docs/apache-airflow-providers-amazon/operators/emr/emr.rst
index 4b517dab8f..2e3330922c 100644
--- a/docs/apache-airflow-providers-amazon/operators/emr/emr.rst
+++ b/docs/apache-airflow-providers-amazon/operators/emr/emr.rst
@@ -188,6 +188,15 @@ To monitor the state of an EMR job flow you can use
 Wait on an Amazon EMR step state
 ================================
 
+To monitor the state of an EMR job step you can use
+:class:`~airflow.providers.amazon.aws.sensors.emr.EmrStepSensor`.
+
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_emr.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_sensor_emr_step]
+    :end-before: [END howto_sensor_emr_step]
+
 Reference
 ---------
 
diff --git a/tests/always/test_project_structure.py 
b/tests/always/test_project_structure.py
index ec26f526a8..9a9da7c850 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -405,8 +405,6 @@ class 
TestAmazonProviderProjectStructure(ExampleCoverageTest):
         
"airflow.providers.amazon.aws.transfers.exasol_to_s3.ExasolToS3Operator",
         # Glue Catalog sensor difficult to test
         
"airflow.providers.amazon.aws.sensors.glue_catalog_partition.GlueCatalogPartitionSensor",
-        # EMR Step sensor difficult to test, see: 
https://github.com/apache/airflow/pull/27286
-        "airflow.providers.amazon.aws.sensors.emr.EmrStepSensor",
     }
 
     DEPRECATED_CLASSES = {
diff --git a/tests/system/providers/amazon/aws/example_emr.py 
b/tests/system/providers/amazon/aws/example_emr.py
index 06d8b28f45..8ca3d5e97e 100644
--- a/tests/system/providers/amazon/aws/example_emr.py
+++ b/tests/system/providers/amazon/aws/example_emr.py
@@ -34,7 +34,7 @@ from airflow.providers.amazon.aws.operators.emr import (
     EmrTerminateJobFlowOperator,
 )
 from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, 
S3DeleteBucketOperator
-from airflow.providers.amazon.aws.sensors.emr import EmrJobFlowSensor
+from airflow.providers.amazon.aws.sensors.emr import EmrJobFlowSensor, 
EmrStepSensor
 from airflow.utils.trigger_rule import TriggerRule
 from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, 
SystemTestContextBuilder
 
@@ -117,6 +117,11 @@ def delete_security_config(config_name: str):
     )
 
 
+@task
+def get_step_id(step_ids: list):
+    return step_ids[0]
+
+
 sys_test_context_task = 
SystemTestContextBuilder().add_variable(EXECUTION_ROLE_ARN_KEY).build()
 
 with DAG(
@@ -164,6 +169,14 @@ with DAG(
     )
     # [END howto_operator_emr_add_steps]
 
+    # [START howto_sensor_emr_step]
+    wait_for_step = EmrStepSensor(
+        task_id="wait_for_step",
+        job_flow_id=create_job_flow.output,
+        step_id=get_step_id(add_steps.output),
+    )
+    # [END howto_sensor_emr_step]
+
     # [START howto_operator_emr_terminate_job_flow]
     remove_cluster = EmrTerminateJobFlowOperator(
         task_id="remove_cluster",
@@ -195,6 +208,7 @@ with DAG(
         create_job_flow,
         modify_cluster,
         add_steps,
+        wait_for_step,
         # TEST TEARDOWN
         remove_cluster,
         check_job_flow,

Reply via email to