This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b88ce95188 Update doc and sample dag for EMR Containers (#24087)
b88ce95188 is described below

commit b88ce951881914e51058ad71858874fdc00a3cbe
Author: Vincent <[email protected]>
AuthorDate: Tue Jun 7 07:17:47 2022 -0400

    Update doc and sample dag for EMR Containers (#24087)
---
 ...emr_job_flow_manual_steps.py => example_emr.py} | 40 ++++++++---
 .../{example_emr_eks_job.py => example_emr_eks.py} | 22 +++---
 .../example_emr_job_flow_automatic_steps.py        | 84 ----------------------
 airflow/providers/amazon/aws/operators/emr.py      | 34 +++++----
 airflow/providers/amazon/aws/sensors/athena.py     |  2 +-
 .../amazon/aws/sensors/cloud_formation.py          |  4 +-
 airflow/providers/amazon/aws/sensors/dms.py        |  2 +-
 airflow/providers/amazon/aws/sensors/eks.py        |  2 +-
 airflow/providers/amazon/aws/sensors/glacier.py    |  2 +-
 airflow/providers/amazon/aws/sensors/rds.py        |  4 +-
 .../operators/emr.rst                              | 38 +++++-----
 .../operators/emr_eks.rst                          | 43 ++++++-----
 tests/always/test_project_structure.py             |  3 -
 13 files changed, 118 insertions(+), 162 deletions(-)

diff --git 
a/airflow/providers/amazon/aws/example_dags/example_emr_job_flow_manual_steps.py
 b/airflow/providers/amazon/aws/example_dags/example_emr.py
similarity index 71%
rename from 
airflow/providers/amazon/aws/example_dags/example_emr_job_flow_manual_steps.py
rename to airflow/providers/amazon/aws/example_dags/example_emr.py
index d18237ecb0..786b4a0ad4 100644
--- 
a/airflow/providers/amazon/aws/example_dags/example_emr_job_flow_manual_steps.py
+++ b/airflow/providers/amazon/aws/example_dags/example_emr.py
@@ -23,13 +23,15 @@ from airflow.models.baseoperator import chain
 from airflow.providers.amazon.aws.operators.emr import (
     EmrAddStepsOperator,
     EmrCreateJobFlowOperator,
+    EmrModifyClusterOperator,
     EmrTerminateJobFlowOperator,
 )
-from airflow.providers.amazon.aws.sensors.emr import EmrStepSensor
+from airflow.providers.amazon.aws.sensors.emr import EmrJobFlowSensor, 
EmrStepSensor
 
 JOB_FLOW_ROLE = os.getenv('EMR_JOB_FLOW_ROLE', 'EMR_EC2_DefaultRole')
 SERVICE_ROLE = os.getenv('EMR_SERVICE_ROLE', 'EMR_DefaultRole')
 
+# [START howto_operator_emr_steps_config]
 SPARK_STEPS = [
     {
         'Name': 'calculate_pi',
@@ -58,48 +60,66 @@ JOB_FLOW_OVERRIDES = {
         'KeepJobFlowAliveWhenNoSteps': False,
         'TerminationProtected': False,
     },
+    'Steps': SPARK_STEPS,
     'JobFlowRole': JOB_FLOW_ROLE,
     'ServiceRole': SERVICE_ROLE,
 }
-
+# [END howto_operator_emr_steps_config]
 
 with DAG(
-    dag_id='example_emr_job_flow_manual_steps',
+    dag_id='example_emr',
     schedule_interval=None,
     start_date=datetime(2021, 1, 1),
     tags=['example'],
     catchup=False,
 ) as dag:
-
-    cluster_creator = EmrCreateJobFlowOperator(
+    # [START howto_operator_emr_create_job_flow]
+    job_flow_creator = EmrCreateJobFlowOperator(
         task_id='create_job_flow',
         job_flow_overrides=JOB_FLOW_OVERRIDES,
     )
+    # [END howto_operator_emr_create_job_flow]
+
+    # [START howto_sensor_emr_job_flow]
+    job_sensor = EmrJobFlowSensor(
+        task_id='check_job_flow',
+        job_flow_id=job_flow_creator.output,
+    )
+    # [END howto_sensor_emr_job_flow]
+
+    # [START howto_operator_emr_modify_cluster]
+    cluster_modifier = EmrModifyClusterOperator(
+        task_id='modify_cluster', cluster_id=job_flow_creator.output, 
step_concurrency_level=1
+    )
+    # [END howto_operator_emr_modify_cluster]
 
     # [START howto_operator_emr_add_steps]
     step_adder = EmrAddStepsOperator(
         task_id='add_steps',
-        job_flow_id=cluster_creator.output,
+        job_flow_id=job_flow_creator.output,
         steps=SPARK_STEPS,
     )
     # [END howto_operator_emr_add_steps]
 
-    # [START howto_sensor_emr_step_sensor]
+    # [START howto_sensor_emr_step]
     step_checker = EmrStepSensor(
         task_id='watch_step',
-        job_flow_id=cluster_creator.output,
+        job_flow_id=job_flow_creator.output,
         step_id="{{ task_instance.xcom_pull(task_ids='add_steps', 
key='return_value')[0] }}",
     )
-    # [END howto_sensor_emr_step_sensor]
+    # [END howto_sensor_emr_step]
 
     # [START howto_operator_emr_terminate_job_flow]
     cluster_remover = EmrTerminateJobFlowOperator(
         task_id='remove_cluster',
-        job_flow_id=cluster_creator.output,
+        job_flow_id=job_flow_creator.output,
     )
     # [END howto_operator_emr_terminate_job_flow]
 
     chain(
+        job_flow_creator,
+        job_sensor,
+        cluster_modifier,
         step_adder,
         step_checker,
         cluster_remover,
diff --git a/airflow/providers/amazon/aws/example_dags/example_emr_eks_job.py 
b/airflow/providers/amazon/aws/example_dags/example_emr_eks.py
similarity index 82%
rename from airflow/providers/amazon/aws/example_dags/example_emr_eks_job.py
rename to airflow/providers/amazon/aws/example_dags/example_emr_eks.py
index e8932630d9..467db6aadb 100644
--- a/airflow/providers/amazon/aws/example_dags/example_emr_eks_job.py
+++ b/airflow/providers/amazon/aws/example_dags/example_emr_eks.py
@@ -19,7 +19,9 @@ import os
 from datetime import datetime
 
 from airflow import DAG
+from airflow.models.baseoperator import chain
 from airflow.providers.amazon.aws.operators.emr import EmrContainerOperator
+from airflow.providers.amazon.aws.sensors.emr import EmrContainerSensor
 
 VIRTUAL_CLUSTER_ID = os.getenv("VIRTUAL_CLUSTER_ID", "test-cluster")
 JOB_ROLE_ARN = os.getenv("JOB_ROLE_ARN", 
"arn:aws:iam::012345678912:role/emr_eks_default_role")
@@ -51,18 +53,13 @@ CONFIGURATION_OVERRIDES_ARG = {
 # [END howto_operator_emr_eks_config]
 
 with DAG(
-    dag_id='example_emr_eks_job',
+    dag_id='example_emr_eks',
     schedule_interval=None,
     start_date=datetime(2021, 1, 1),
     tags=['example'],
     catchup=False,
 ) as dag:
-
-    # An example of how to get the cluster id and arn from an Airflow 
connection
-    # VIRTUAL_CLUSTER_ID = '{{ conn.emr_eks.extra_dejson["virtual_cluster_id"] 
}}'
-    # JOB_ROLE_ARN = '{{ conn.emr_eks.extra_dejson["job_role_arn"] }}'
-
-    # [START howto_operator_emr_eks_job]
+    # [START howto_operator_emr_container]
     job_starter = EmrContainerOperator(
         task_id="start_job",
         virtual_cluster_id=VIRTUAL_CLUSTER_ID,
@@ -71,5 +68,14 @@ with DAG(
         job_driver=JOB_DRIVER_ARG,
         configuration_overrides=CONFIGURATION_OVERRIDES_ARG,
         name="pi.py",
+        wait_for_completion=False,
     )
-    # [END howto_operator_emr_eks_job]
+    # [END howto_operator_emr_container]
+
+    # [START howto_sensor_emr_container]
+    job_waiter = EmrContainerSensor(
+        task_id="job_waiter", virtual_cluster_id=VIRTUAL_CLUSTER_ID, 
job_id=str(job_starter.output)
+    )
+    # [END howto_sensor_emr_container]
+
+    chain(job_starter, job_waiter)
diff --git 
a/airflow/providers/amazon/aws/example_dags/example_emr_job_flow_automatic_steps.py
 
b/airflow/providers/amazon/aws/example_dags/example_emr_job_flow_automatic_steps.py
deleted file mode 100644
index ab92a3cb21..0000000000
--- 
a/airflow/providers/amazon/aws/example_dags/example_emr_job_flow_automatic_steps.py
+++ /dev/null
@@ -1,84 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-import os
-from datetime import datetime
-
-from airflow import DAG
-from airflow.providers.amazon.aws.operators.emr import EmrCreateJobFlowOperator
-from airflow.providers.amazon.aws.sensors.emr import EmrJobFlowSensor
-
-JOB_FLOW_ROLE = os.getenv('EMR_JOB_FLOW_ROLE', 'EMR_EC2_DefaultRole')
-SERVICE_ROLE = os.getenv('EMR_SERVICE_ROLE', 'EMR_DefaultRole')
-
-# [START howto_operator_emr_automatic_steps_config]
-SPARK_STEPS = [
-    {
-        'Name': 'calculate_pi',
-        'ActionOnFailure': 'CONTINUE',
-        'HadoopJarStep': {
-            'Jar': 'command-runner.jar',
-            'Args': ['/usr/lib/spark/bin/run-example', 'SparkPi', '10'],
-        },
-    }
-]
-
-JOB_FLOW_OVERRIDES = {
-    'Name': 'PiCalc',
-    'ReleaseLabel': 'emr-5.29.0',
-    'Applications': [{'Name': 'Spark'}],
-    'Instances': {
-        'InstanceGroups': [
-            {
-                'Name': 'Primary node',
-                'Market': 'ON_DEMAND',
-                'InstanceRole': 'MASTER',
-                'InstanceType': 'm5.xlarge',
-                'InstanceCount': 1,
-            },
-        ],
-        'KeepJobFlowAliveWhenNoSteps': False,
-        'TerminationProtected': False,
-    },
-    'Steps': SPARK_STEPS,
-    'JobFlowRole': JOB_FLOW_ROLE,
-    'ServiceRole': SERVICE_ROLE,
-}
-# [END howto_operator_emr_automatic_steps_config]
-
-
-with DAG(
-    dag_id='example_emr_job_flow_automatic_steps',
-    schedule_interval=None,
-    start_date=datetime(2021, 1, 1),
-    tags=['example'],
-    catchup=False,
-) as dag:
-
-    # [START howto_operator_emr_create_job_flow]
-    job_flow_creator = EmrCreateJobFlowOperator(
-        task_id='create_job_flow',
-        job_flow_overrides=JOB_FLOW_OVERRIDES,
-    )
-    # [END howto_operator_emr_create_job_flow]
-
-    # [START howto_sensor_emr_job_flow_sensor]
-    job_sensor = EmrJobFlowSensor(
-        task_id='check_job_flow',
-        job_flow_id=job_flow_creator.output,
-    )
-    # [END howto_sensor_emr_job_flow_sensor]
diff --git a/airflow/providers/amazon/aws/operators/emr.py 
b/airflow/providers/amazon/aws/operators/emr.py
index a1f3fa753d..510c77184f 100644
--- a/airflow/providers/amazon/aws/operators/emr.py
+++ b/airflow/providers/amazon/aws/operators/emr.py
@@ -122,6 +122,10 @@ class EmrContainerOperator(BaseOperator):
     """
     An operator that submits jobs to EMR on EKS virtual clusters.
 
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EmrContainerOperator`
+
     :param name: The name of the job run.
     :param virtual_cluster_id: The EMR on EKS virtual cluster ID
     :param execution_role_arn: The IAM role ARN associated with the job run.
@@ -133,6 +137,7 @@ class EmrContainerOperator(BaseOperator):
         Use this if you want to specify a unique ID to prevent two jobs from 
getting started.
         If no token is provided, a UUIDv4 token will be generated for you.
     :param aws_conn_id: The Airflow connection used for AWS credentials.
+    :param wait_for_completion: Whether or not to wait in the operator for the 
job to complete.
     :param poll_interval: Time (in seconds) to wait between two consecutive 
calls to check query status on EMR
     :param max_tries: Maximum number of times to wait for the job run to 
finish.
         Defaults to None, which will poll until the job is *not* in a pending, 
submitted, or running state.
@@ -160,6 +165,7 @@ class EmrContainerOperator(BaseOperator):
         configuration_overrides: Optional[dict] = None,
         client_request_token: Optional[str] = None,
         aws_conn_id: str = "aws_default",
+        wait_for_completion: bool = True,
         poll_interval: int = 30,
         max_tries: Optional[int] = None,
         tags: Optional[dict] = None,
@@ -174,6 +180,7 @@ class EmrContainerOperator(BaseOperator):
         self.configuration_overrides = configuration_overrides or {}
         self.aws_conn_id = aws_conn_id
         self.client_request_token = client_request_token or str(uuid4())
+        self.wait_for_completion = wait_for_completion
         self.poll_interval = poll_interval
         self.max_tries = max_tries
         self.tags = tags
@@ -198,19 +205,20 @@ class EmrContainerOperator(BaseOperator):
             self.client_request_token,
             self.tags,
         )
-        query_status = self.hook.poll_query_status(self.job_id, 
self.max_tries, self.poll_interval)
-
-        if query_status in EmrContainerHook.FAILURE_STATES:
-            error_message = self.hook.get_job_failure_reason(self.job_id)
-            raise AirflowException(
-                f"EMR Containers job failed. Final state is {query_status}. "
-                f"query_execution_id is {self.job_id}. Error: {error_message}"
-            )
-        elif not query_status or query_status in 
EmrContainerHook.INTERMEDIATE_STATES:
-            raise AirflowException(
-                f"Final state of EMR Containers job is {query_status}. "
-                f"Max tries of poll status exceeded, query_execution_id is 
{self.job_id}."
-            )
+        if self.wait_for_completion:
+            query_status = self.hook.poll_query_status(self.job_id, 
self.max_tries, self.poll_interval)
+
+            if query_status in EmrContainerHook.FAILURE_STATES:
+                error_message = self.hook.get_job_failure_reason(self.job_id)
+                raise AirflowException(
+                    f"EMR Containers job failed. Final state is 
{query_status}. "
+                    f"query_execution_id is {self.job_id}. Error: 
{error_message}"
+                )
+            elif not query_status or query_status in 
EmrContainerHook.INTERMEDIATE_STATES:
+                raise AirflowException(
+                    f"Final state of EMR Containers job is {query_status}. "
+                    f"Max tries of poll status exceeded, query_execution_id is 
{self.job_id}."
+                )
 
         return self.job_id
 
diff --git a/airflow/providers/amazon/aws/sensors/athena.py 
b/airflow/providers/amazon/aws/sensors/athena.py
index 927f512143..1186f8cdef 100644
--- a/airflow/providers/amazon/aws/sensors/athena.py
+++ b/airflow/providers/amazon/aws/sensors/athena.py
@@ -37,7 +37,7 @@ class AthenaSensor(BaseSensorOperator):
     If the query fails, the task will fail.
 
     .. seealso::
-        For more information on how to use this operator, take a look at the 
guide:
+        For more information on how to use this sensor, take a look at the 
guide:
         :ref:`howto/sensor:AthenaSensor`
 
 
diff --git a/airflow/providers/amazon/aws/sensors/cloud_formation.py 
b/airflow/providers/amazon/aws/sensors/cloud_formation.py
index fb01bdc7f6..972dbace37 100644
--- a/airflow/providers/amazon/aws/sensors/cloud_formation.py
+++ b/airflow/providers/amazon/aws/sensors/cloud_formation.py
@@ -36,7 +36,7 @@ class CloudFormationCreateStackSensor(BaseSensorOperator):
     Waits for a stack to be created successfully on AWS CloudFormation.
 
     .. seealso::
-        For more information on how to use this operator, take a look at the 
guide:
+        For more information on how to use this sensor, take a look at the 
guide:
         :ref:`howto/sensor:CloudFormationCreateStackSensor`
 
 
@@ -74,7 +74,7 @@ class CloudFormationDeleteStackSensor(BaseSensorOperator):
     Waits for a stack to be deleted successfully on AWS CloudFormation.
 
     .. seealso::
-        For more information on how to use this operator, take a look at the 
guide:
+        For more information on how to use this sensor, take a look at the 
guide:
         :ref:`howto/sensor:CloudFormationDeleteStackSensor`
 
     :param stack_name: The name of the stack to wait for (templated)
diff --git a/airflow/providers/amazon/aws/sensors/dms.py 
b/airflow/providers/amazon/aws/sensors/dms.py
index 26e6b7148f..0437ee4d98 100644
--- a/airflow/providers/amazon/aws/sensors/dms.py
+++ b/airflow/providers/amazon/aws/sensors/dms.py
@@ -91,7 +91,7 @@ class DmsTaskCompletedSensor(DmsTaskBaseSensor):
     Pokes DMS task until it is completed.
 
     .. seealso::
-        For more information on how to use this operator, take a look at the 
guide:
+        For more information on how to use this sensor, take a look at the 
guide:
         :ref:`howto/sensor:DmsTaskCompletedSensor`
 
     :param replication_task_arn: AWS DMS replication task ARN
diff --git a/airflow/providers/amazon/aws/sensors/eks.py 
b/airflow/providers/amazon/aws/sensors/eks.py
index 92ed55da4d..76b2703e89 100644
--- a/airflow/providers/amazon/aws/sensors/eks.py
+++ b/airflow/providers/amazon/aws/sensors/eks.py
@@ -121,7 +121,7 @@ class EksFargateProfileStateSensor(BaseSensorOperator):
     Check the state of an AWS Fargate profile until it reaches the target 
state or another terminal state.
 
     .. seealso::
-        For more information on how to use this operator, take a look at the 
guide:
+        For more information on how to use this sensor, take a look at the 
guide:
         :ref:`howto/sensor:EksFargateProfileStateSensor`
 
     :param cluster_name: The name of the Cluster which the AWS Fargate profile 
is attached to. (templated)
diff --git a/airflow/providers/amazon/aws/sensors/glacier.py 
b/airflow/providers/amazon/aws/sensors/glacier.py
index e92f5a4326..8e8b74c58c 100644
--- a/airflow/providers/amazon/aws/sensors/glacier.py
+++ b/airflow/providers/amazon/aws/sensors/glacier.py
@@ -38,7 +38,7 @@ class GlacierJobOperationSensor(BaseSensorOperator):
     Glacier sensor for checking job state. This operator runs only in 
reschedule mode.
 
     .. seealso::
-        For more information on how to use this operator, take a look at the 
guide:
+        For more information on how to use this sensor, take a look at the 
guide:
         :ref:`howto/sensor:GlacierJobOperationSensor`
 
     :param aws_conn_id: The reference to the AWS connection details
diff --git a/airflow/providers/amazon/aws/sensors/rds.py 
b/airflow/providers/amazon/aws/sensors/rds.py
index 3c24c82fbf..54ee50875e 100644
--- a/airflow/providers/amazon/aws/sensors/rds.py
+++ b/airflow/providers/amazon/aws/sensors/rds.py
@@ -70,7 +70,7 @@ class RdsSnapshotExistenceSensor(RdsBaseSensor):
     Waits for RDS snapshot with a specific status.
 
     .. seealso::
-        For more information on how to use this operator, take a look at the 
guide:
+        For more information on how to use this sensor, take a look at the 
guide:
         :ref:`howto/sensor:RdsSnapshotExistenceSensor`
 
     :param db_type: Type of the DB - either "instance" or "cluster"
@@ -112,7 +112,7 @@ class RdsExportTaskExistenceSensor(RdsBaseSensor):
     Waits for RDS export task with a specific status.
 
     .. seealso::
-        For more information on how to use this operator, take a look at the 
guide:
+        For more information on how to use this sensor, take a look at the 
guide:
         :ref:`howto/sensor:RdsExportTaskExistenceSensor`
 
     :param export_task_identifier: A unique identifier for the snapshot export 
task.
diff --git a/docs/apache-airflow-providers-amazon/operators/emr.rst 
b/docs/apache-airflow-providers-amazon/operators/emr.rst
index a62b503740..6bb16bd494 100644
--- a/docs/apache-airflow-providers-amazon/operators/emr.rst
+++ b/docs/apache-airflow-providers-amazon/operators/emr.rst
@@ -53,10 +53,10 @@ JobFlow configuration
 
 To create a job flow on EMR, you need to specify the configuration for the EMR 
cluster:
 
-.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_emr_job_flow_automatic_steps.py
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_emr.py
     :language: python
-    :start-after: [START howto_operator_emr_automatic_steps_config]
-    :end-before: [END howto_operator_emr_automatic_steps_config]
+    :start-after: [START howto_operator_emr_steps_config]
+    :end-before: [END howto_operator_emr_steps_config]
 
 Here we create an EMR single-node Cluster *PiCalc*. It only has a single step 
*calculate_pi* which
 calculates the value of ``Pi`` using Spark.  The config 
``'KeepJobFlowAliveWhenNoSteps': False``
@@ -76,7 +76,7 @@ Create the Job Flow
 
 In the following code we are creating a new job flow using the configuration 
as explained above.
 
-.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_emr_job_flow_automatic_steps.py
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_emr.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_emr_create_job_flow]
@@ -90,7 +90,7 @@ Add Steps to an EMR job flow
 To add steps to an existing EMR Job flow you can use
 :class:`~airflow.providers.amazon.aws.operators.emr.EmrAddStepsOperator`.
 
-.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_emr_job_flow_manual_steps.py
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_emr.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_emr_add_steps]
@@ -104,7 +104,7 @@ Terminate an EMR job flow
 To terminate an EMR Job Flow you can use
 
:class:`~airflow.providers.amazon.aws.operators.emr.EmrTerminateJobFlowOperator`.
 
-.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_emr_job_flow_manual_steps.py
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_emr.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_emr_terminate_job_flow]
@@ -118,17 +118,15 @@ Modify Amazon EMR container
 To modify an existing EMR container you can use
 :class:`~airflow.providers.amazon.aws.sensors.emr.EmrContainerSensor`.
 
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_emr.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_emr_modify_cluster]
+    :end-before: [END howto_operator_emr_modify_cluster]
+
 Sensors
 -------
 
-.. _howto/sensor:EmrContainerSensor:
-
-Wait on an Amazon EMR container state
-=====================================
-
-To monitor the state of an EMR container you can use
-:class:`~airflow.providers.amazon.aws.sensors.emr.EmrContainerSensor`.
-
 .. _howto/sensor:EmrJobFlowSensor:
 
 Wait on an Amazon EMR job flow state
@@ -137,11 +135,11 @@ Wait on an Amazon EMR job flow state
 To monitor the state of an EMR job flow you can use
 :class:`~airflow.providers.amazon.aws.sensors.emr.EmrJobFlowSensor`.
 
-.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_emr_job_flow_automatic_steps.py
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_emr.py
     :language: python
     :dedent: 4
-    :start-after: [START howto_sensor_emr_job_flow_sensor]
-    :end-before: [END howto_sensor_emr_job_flow_sensor]
+    :start-after: [START howto_sensor_emr_job_flow]
+    :end-before: [END howto_sensor_emr_job_flow]
 
 .. _howto/sensor:EmrStepSensor:
 
@@ -151,11 +149,11 @@ Wait on an Amazon EMR step state
 To monitor the state of a step running an existing EMR Job flow you can use
 :class:`~airflow.providers.amazon.aws.sensors.emr.EmrStepSensor`.
 
-.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_emr_job_flow_manual_steps.py
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_emr.py
     :language: python
     :dedent: 4
-    :start-after: [START howto_sensor_emr_step_sensor]
-    :end-before: [END howto_sensor_emr_step_sensor]
+    :start-after: [START howto_sensor_emr_step]
+    :end-before: [END howto_sensor_emr_step]
 
 Reference
 ---------
diff --git a/docs/apache-airflow-providers-amazon/operators/emr_eks.rst 
b/docs/apache-airflow-providers-amazon/operators/emr_eks.rst
index b84fbf5ee5..3e9e58fbca 100644
--- a/docs/apache-airflow-providers-amazon/operators/emr_eks.rst
+++ b/docs/apache-airflow-providers-amazon/operators/emr_eks.rst
@@ -23,9 +23,6 @@ Amazon EMR on Amazon EKS
 provides a deployment option for Amazon EMR that allows you to run open-source 
big data frameworks on
 Amazon EKS.
 
-Airflow provides the 
:class:`~airflow.providers.amazon.aws.operators.emr.EmrContainerOperator`
-to submit Apache Spark jobs to your EMR on EKS virtual cluster.
-
 Prerequisite Tasks
 ------------------
 
@@ -34,18 +31,18 @@ Prerequisite Tasks
 Operators
 ---------
 
-.. _howto/operator:EMRContainersOperators:
+.. _howto/operator:EmrContainerOperator:
 
-Run a Spark job on EMR on EKS
-=============================
+Submit a job to an Amazon EMR virtual cluster
+=============================================
 
 .. note::
   This example assumes that you already have an EMR on EKS virtual cluster 
configured. See the
   `EMR on EKS Getting Started guide 
<https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/getting-started.html>`__
   for more information.
 
-The ``EMRContainerOperator`` will submit a new job to an Amazon EMR on Amazon 
EKS virtual cluster and wait for
-the job to complete. The example job below calculates the mathematical 
constant ``Pi``. In a
+The ``EmrContainerOperator`` will submit a new job to an Amazon EMR on Amazon 
EKS virtual cluster
+The example job below calculates the mathematical constant ``Pi``. In a
 production job, you would usually refer to a Spark script on Amazon Simple 
Storage Service (S3).
 
 To create a job for Amazon EMR on Amazon EKS, you need to specify your virtual 
cluster ID, the release of Amazon EMR you
@@ -59,28 +56,42 @@ and ``monitoringConfiguration`` to send logs to the 
``/aws/emr-eks-spark`` log g
 Refer to the `EMR on EKS guide 
<https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/emr-eks-jobs-CLI.html#emr-eks-jobs-parameters>`__
 for more details on job configuration.
 
-.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_emr_eks_job.py
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_emr_eks.py
     :language: python
     :start-after: [START howto_operator_emr_eks_config]
     :end-before: [END howto_operator_emr_eks_config]
 
-
 We pass the ``virtual_cluster_id`` and ``execution_role_arn`` values as 
operator parameters, but you
 can store them in a connection or provide them in the DAG. Your AWS region 
should be defined either
 in the ``aws_default`` connection as ``{"region_name": "us-east-1"}`` or a 
custom connection name
-that gets passed to the operator with the ``aws_conn_id`` parameter.
+that gets passed to the operator with the ``aws_conn_id`` parameter. The 
operator returns the Job ID of the job run.
 
-.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_emr_eks_job.py
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_emr_eks.py
     :language: python
     :dedent: 4
-    :start-after: [START howto_operator_emr_eks_job]
-    :end-before: [END howto_operator_emr_eks_job]
+    :start-after: [START howto_operator_emr_container]
+    :end-before: [END howto_operator_emr_container]
+
+Sensors
+-------
+
+.. _howto/sensor:EmrContainerSensor:
 
-With the ``EmrContainerOperator``, it will wait until the successful 
completion of the job or raise
-an ``AirflowException`` if there is an error. The operator returns the Job ID 
of the job run.
+Wait on an Amazon EMR virtual cluster job
+=========================================
+
+To wait on the status of an Amazon EMR virtual cluster job to reach a terminal 
state, you can use
+:class:`~airflow.providers.amazon.aws.sensors.emr.EmrContainerSensor`
+
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_emr_eks.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_sensor_emr_container]
+    :end-before: [END howto_sensor_emr_container]
 
 Reference
 ---------
 
+* `AWS boto3 library documentation for EMR Containers 
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr-containers.html>`__
 * `Amazon EMR on EKS Job runs 
<https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/job-runs.html>`__
 * `EMR on EKS Best Practices 
<https://aws.github.io/aws-emr-containers-best-practices/>`__
diff --git a/tests/always/test_project_structure.py 
b/tests/always/test_project_structure.py
index 1c1bedf7fc..d0b9e1c060 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -433,9 +433,6 @@ class 
TestAmazonProviderProjectStructure(ExampleCoverageTest):
     }
 
     MISSING_EXAMPLES_FOR_CLASSES = {
-        # EMR legitimately missing, needs development
-        'airflow.providers.amazon.aws.operators.emr.EmrModifyClusterOperator',
-        'airflow.providers.amazon.aws.sensors.emr.EmrContainerSensor',
         # S3 Exasol transfer difficult to test, see: 
https://github.com/apache/airflow/issues/22632
         
'airflow.providers.amazon.aws.transfers.exasol_to_s3.ExasolToS3Operator',
         # Glue Catalog sensor difficult to test

Reply via email to