This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b88ce95188 Update doc and sample dag for EMR Containers (#24087)
b88ce95188 is described below
commit b88ce951881914e51058ad71858874fdc00a3cbe
Author: Vincent <[email protected]>
AuthorDate: Tue Jun 7 07:17:47 2022 -0400
Update doc and sample dag for EMR Containers (#24087)
---
...emr_job_flow_manual_steps.py => example_emr.py} | 40 ++++++++---
.../{example_emr_eks_job.py => example_emr_eks.py} | 22 +++---
.../example_emr_job_flow_automatic_steps.py | 84 ----------------------
airflow/providers/amazon/aws/operators/emr.py | 34 +++++----
airflow/providers/amazon/aws/sensors/athena.py | 2 +-
.../amazon/aws/sensors/cloud_formation.py | 4 +-
airflow/providers/amazon/aws/sensors/dms.py | 2 +-
airflow/providers/amazon/aws/sensors/eks.py | 2 +-
airflow/providers/amazon/aws/sensors/glacier.py | 2 +-
airflow/providers/amazon/aws/sensors/rds.py | 4 +-
.../operators/emr.rst | 38 +++++-----
.../operators/emr_eks.rst | 43 ++++++-----
tests/always/test_project_structure.py | 3 -
13 files changed, 118 insertions(+), 162 deletions(-)
diff --git
a/airflow/providers/amazon/aws/example_dags/example_emr_job_flow_manual_steps.py
b/airflow/providers/amazon/aws/example_dags/example_emr.py
similarity index 71%
rename from
airflow/providers/amazon/aws/example_dags/example_emr_job_flow_manual_steps.py
rename to airflow/providers/amazon/aws/example_dags/example_emr.py
index d18237ecb0..786b4a0ad4 100644
---
a/airflow/providers/amazon/aws/example_dags/example_emr_job_flow_manual_steps.py
+++ b/airflow/providers/amazon/aws/example_dags/example_emr.py
@@ -23,13 +23,15 @@ from airflow.models.baseoperator import chain
from airflow.providers.amazon.aws.operators.emr import (
EmrAddStepsOperator,
EmrCreateJobFlowOperator,
+ EmrModifyClusterOperator,
EmrTerminateJobFlowOperator,
)
-from airflow.providers.amazon.aws.sensors.emr import EmrStepSensor
+from airflow.providers.amazon.aws.sensors.emr import EmrJobFlowSensor,
EmrStepSensor
JOB_FLOW_ROLE = os.getenv('EMR_JOB_FLOW_ROLE', 'EMR_EC2_DefaultRole')
SERVICE_ROLE = os.getenv('EMR_SERVICE_ROLE', 'EMR_DefaultRole')
+# [START howto_operator_emr_steps_config]
SPARK_STEPS = [
{
'Name': 'calculate_pi',
@@ -58,48 +60,66 @@ JOB_FLOW_OVERRIDES = {
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
},
+ 'Steps': SPARK_STEPS,
'JobFlowRole': JOB_FLOW_ROLE,
'ServiceRole': SERVICE_ROLE,
}
-
+# [END howto_operator_emr_steps_config]
with DAG(
- dag_id='example_emr_job_flow_manual_steps',
+ dag_id='example_emr',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
tags=['example'],
catchup=False,
) as dag:
-
- cluster_creator = EmrCreateJobFlowOperator(
+ # [START howto_operator_emr_create_job_flow]
+ job_flow_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES,
)
+ # [END howto_operator_emr_create_job_flow]
+
+ # [START howto_sensor_emr_job_flow]
+ job_sensor = EmrJobFlowSensor(
+ task_id='check_job_flow',
+ job_flow_id=job_flow_creator.output,
+ )
+ # [END howto_sensor_emr_job_flow]
+
+ # [START howto_operator_emr_modify_cluster]
+ cluster_modifier = EmrModifyClusterOperator(
+ task_id='modify_cluster', cluster_id=job_flow_creator.output,
step_concurrency_level=1
+ )
+ # [END howto_operator_emr_modify_cluster]
# [START howto_operator_emr_add_steps]
step_adder = EmrAddStepsOperator(
task_id='add_steps',
- job_flow_id=cluster_creator.output,
+ job_flow_id=job_flow_creator.output,
steps=SPARK_STEPS,
)
# [END howto_operator_emr_add_steps]
- # [START howto_sensor_emr_step_sensor]
+ # [START howto_sensor_emr_step]
step_checker = EmrStepSensor(
task_id='watch_step',
- job_flow_id=cluster_creator.output,
+ job_flow_id=job_flow_creator.output,
step_id="{{ task_instance.xcom_pull(task_ids='add_steps',
key='return_value')[0] }}",
)
- # [END howto_sensor_emr_step_sensor]
+ # [END howto_sensor_emr_step]
# [START howto_operator_emr_terminate_job_flow]
cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
- job_flow_id=cluster_creator.output,
+ job_flow_id=job_flow_creator.output,
)
# [END howto_operator_emr_terminate_job_flow]
chain(
+ job_flow_creator,
+ job_sensor,
+ cluster_modifier,
step_adder,
step_checker,
cluster_remover,
diff --git a/airflow/providers/amazon/aws/example_dags/example_emr_eks_job.py
b/airflow/providers/amazon/aws/example_dags/example_emr_eks.py
similarity index 82%
rename from airflow/providers/amazon/aws/example_dags/example_emr_eks_job.py
rename to airflow/providers/amazon/aws/example_dags/example_emr_eks.py
index e8932630d9..467db6aadb 100644
--- a/airflow/providers/amazon/aws/example_dags/example_emr_eks_job.py
+++ b/airflow/providers/amazon/aws/example_dags/example_emr_eks.py
@@ -19,7 +19,9 @@ import os
from datetime import datetime
from airflow import DAG
+from airflow.models.baseoperator import chain
from airflow.providers.amazon.aws.operators.emr import EmrContainerOperator
+from airflow.providers.amazon.aws.sensors.emr import EmrContainerSensor
VIRTUAL_CLUSTER_ID = os.getenv("VIRTUAL_CLUSTER_ID", "test-cluster")
JOB_ROLE_ARN = os.getenv("JOB_ROLE_ARN",
"arn:aws:iam::012345678912:role/emr_eks_default_role")
@@ -51,18 +53,13 @@ CONFIGURATION_OVERRIDES_ARG = {
# [END howto_operator_emr_eks_config]
with DAG(
- dag_id='example_emr_eks_job',
+ dag_id='example_emr_eks',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
tags=['example'],
catchup=False,
) as dag:
-
- # An example of how to get the cluster id and arn from an Airflow
connection
- # VIRTUAL_CLUSTER_ID = '{{ conn.emr_eks.extra_dejson["virtual_cluster_id"]
}}'
- # JOB_ROLE_ARN = '{{ conn.emr_eks.extra_dejson["job_role_arn"] }}'
-
- # [START howto_operator_emr_eks_job]
+ # [START howto_operator_emr_container]
job_starter = EmrContainerOperator(
task_id="start_job",
virtual_cluster_id=VIRTUAL_CLUSTER_ID,
@@ -71,5 +68,14 @@ with DAG(
job_driver=JOB_DRIVER_ARG,
configuration_overrides=CONFIGURATION_OVERRIDES_ARG,
name="pi.py",
+ wait_for_completion=False,
)
- # [END howto_operator_emr_eks_job]
+ # [END howto_operator_emr_container]
+
+ # [START howto_sensor_emr_container]
+ job_waiter = EmrContainerSensor(
+ task_id="job_waiter", virtual_cluster_id=VIRTUAL_CLUSTER_ID,
job_id=str(job_starter.output)
+ )
+ # [END howto_sensor_emr_container]
+
+ chain(job_starter, job_waiter)
diff --git
a/airflow/providers/amazon/aws/example_dags/example_emr_job_flow_automatic_steps.py
b/airflow/providers/amazon/aws/example_dags/example_emr_job_flow_automatic_steps.py
deleted file mode 100644
index ab92a3cb21..0000000000
---
a/airflow/providers/amazon/aws/example_dags/example_emr_job_flow_automatic_steps.py
+++ /dev/null
@@ -1,84 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-import os
-from datetime import datetime
-
-from airflow import DAG
-from airflow.providers.amazon.aws.operators.emr import EmrCreateJobFlowOperator
-from airflow.providers.amazon.aws.sensors.emr import EmrJobFlowSensor
-
-JOB_FLOW_ROLE = os.getenv('EMR_JOB_FLOW_ROLE', 'EMR_EC2_DefaultRole')
-SERVICE_ROLE = os.getenv('EMR_SERVICE_ROLE', 'EMR_DefaultRole')
-
-# [START howto_operator_emr_automatic_steps_config]
-SPARK_STEPS = [
- {
- 'Name': 'calculate_pi',
- 'ActionOnFailure': 'CONTINUE',
- 'HadoopJarStep': {
- 'Jar': 'command-runner.jar',
- 'Args': ['/usr/lib/spark/bin/run-example', 'SparkPi', '10'],
- },
- }
-]
-
-JOB_FLOW_OVERRIDES = {
- 'Name': 'PiCalc',
- 'ReleaseLabel': 'emr-5.29.0',
- 'Applications': [{'Name': 'Spark'}],
- 'Instances': {
- 'InstanceGroups': [
- {
- 'Name': 'Primary node',
- 'Market': 'ON_DEMAND',
- 'InstanceRole': 'MASTER',
- 'InstanceType': 'm5.xlarge',
- 'InstanceCount': 1,
- },
- ],
- 'KeepJobFlowAliveWhenNoSteps': False,
- 'TerminationProtected': False,
- },
- 'Steps': SPARK_STEPS,
- 'JobFlowRole': JOB_FLOW_ROLE,
- 'ServiceRole': SERVICE_ROLE,
-}
-# [END howto_operator_emr_automatic_steps_config]
-
-
-with DAG(
- dag_id='example_emr_job_flow_automatic_steps',
- schedule_interval=None,
- start_date=datetime(2021, 1, 1),
- tags=['example'],
- catchup=False,
-) as dag:
-
- # [START howto_operator_emr_create_job_flow]
- job_flow_creator = EmrCreateJobFlowOperator(
- task_id='create_job_flow',
- job_flow_overrides=JOB_FLOW_OVERRIDES,
- )
- # [END howto_operator_emr_create_job_flow]
-
- # [START howto_sensor_emr_job_flow_sensor]
- job_sensor = EmrJobFlowSensor(
- task_id='check_job_flow',
- job_flow_id=job_flow_creator.output,
- )
- # [END howto_sensor_emr_job_flow_sensor]
diff --git a/airflow/providers/amazon/aws/operators/emr.py
b/airflow/providers/amazon/aws/operators/emr.py
index a1f3fa753d..510c77184f 100644
--- a/airflow/providers/amazon/aws/operators/emr.py
+++ b/airflow/providers/amazon/aws/operators/emr.py
@@ -122,6 +122,10 @@ class EmrContainerOperator(BaseOperator):
"""
An operator that submits jobs to EMR on EKS virtual clusters.
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:EmrContainerOperator`
+
:param name: The name of the job run.
:param virtual_cluster_id: The EMR on EKS virtual cluster ID
:param execution_role_arn: The IAM role ARN associated with the job run.
@@ -133,6 +137,7 @@ class EmrContainerOperator(BaseOperator):
Use this if you want to specify a unique ID to prevent two jobs from
getting started.
If no token is provided, a UUIDv4 token will be generated for you.
:param aws_conn_id: The Airflow connection used for AWS credentials.
+ :param wait_for_completion: Whether or not to wait in the operator for the
job to complete.
:param poll_interval: Time (in seconds) to wait between two consecutive
calls to check query status on EMR
:param max_tries: Maximum number of times to wait for the job run to
finish.
Defaults to None, which will poll until the job is *not* in a pending,
submitted, or running state.
@@ -160,6 +165,7 @@ class EmrContainerOperator(BaseOperator):
configuration_overrides: Optional[dict] = None,
client_request_token: Optional[str] = None,
aws_conn_id: str = "aws_default",
+ wait_for_completion: bool = True,
poll_interval: int = 30,
max_tries: Optional[int] = None,
tags: Optional[dict] = None,
@@ -174,6 +180,7 @@ class EmrContainerOperator(BaseOperator):
self.configuration_overrides = configuration_overrides or {}
self.aws_conn_id = aws_conn_id
self.client_request_token = client_request_token or str(uuid4())
+ self.wait_for_completion = wait_for_completion
self.poll_interval = poll_interval
self.max_tries = max_tries
self.tags = tags
@@ -198,19 +205,20 @@ class EmrContainerOperator(BaseOperator):
self.client_request_token,
self.tags,
)
- query_status = self.hook.poll_query_status(self.job_id,
self.max_tries, self.poll_interval)
-
- if query_status in EmrContainerHook.FAILURE_STATES:
- error_message = self.hook.get_job_failure_reason(self.job_id)
- raise AirflowException(
- f"EMR Containers job failed. Final state is {query_status}. "
- f"query_execution_id is {self.job_id}. Error: {error_message}"
- )
- elif not query_status or query_status in
EmrContainerHook.INTERMEDIATE_STATES:
- raise AirflowException(
- f"Final state of EMR Containers job is {query_status}. "
- f"Max tries of poll status exceeded, query_execution_id is
{self.job_id}."
- )
+ if self.wait_for_completion:
+ query_status = self.hook.poll_query_status(self.job_id,
self.max_tries, self.poll_interval)
+
+ if query_status in EmrContainerHook.FAILURE_STATES:
+ error_message = self.hook.get_job_failure_reason(self.job_id)
+ raise AirflowException(
+ f"EMR Containers job failed. Final state is
{query_status}. "
+ f"query_execution_id is {self.job_id}. Error:
{error_message}"
+ )
+ elif not query_status or query_status in
EmrContainerHook.INTERMEDIATE_STATES:
+ raise AirflowException(
+ f"Final state of EMR Containers job is {query_status}. "
+ f"Max tries of poll status exceeded, query_execution_id is
{self.job_id}."
+ )
return self.job_id
diff --git a/airflow/providers/amazon/aws/sensors/athena.py
b/airflow/providers/amazon/aws/sensors/athena.py
index 927f512143..1186f8cdef 100644
--- a/airflow/providers/amazon/aws/sensors/athena.py
+++ b/airflow/providers/amazon/aws/sensors/athena.py
@@ -37,7 +37,7 @@ class AthenaSensor(BaseSensorOperator):
If the query fails, the task will fail.
.. seealso::
- For more information on how to use this operator, take a look at the
guide:
+ For more information on how to use this sensor, take a look at the
guide:
:ref:`howto/sensor:AthenaSensor`
diff --git a/airflow/providers/amazon/aws/sensors/cloud_formation.py
b/airflow/providers/amazon/aws/sensors/cloud_formation.py
index fb01bdc7f6..972dbace37 100644
--- a/airflow/providers/amazon/aws/sensors/cloud_formation.py
+++ b/airflow/providers/amazon/aws/sensors/cloud_formation.py
@@ -36,7 +36,7 @@ class CloudFormationCreateStackSensor(BaseSensorOperator):
Waits for a stack to be created successfully on AWS CloudFormation.
.. seealso::
- For more information on how to use this operator, take a look at the
guide:
+ For more information on how to use this sensor, take a look at the
guide:
:ref:`howto/sensor:CloudFormationCreateStackSensor`
@@ -74,7 +74,7 @@ class CloudFormationDeleteStackSensor(BaseSensorOperator):
Waits for a stack to be deleted successfully on AWS CloudFormation.
.. seealso::
- For more information on how to use this operator, take a look at the
guide:
+ For more information on how to use this sensor, take a look at the
guide:
:ref:`howto/sensor:CloudFormationDeleteStackSensor`
:param stack_name: The name of the stack to wait for (templated)
diff --git a/airflow/providers/amazon/aws/sensors/dms.py
b/airflow/providers/amazon/aws/sensors/dms.py
index 26e6b7148f..0437ee4d98 100644
--- a/airflow/providers/amazon/aws/sensors/dms.py
+++ b/airflow/providers/amazon/aws/sensors/dms.py
@@ -91,7 +91,7 @@ class DmsTaskCompletedSensor(DmsTaskBaseSensor):
Pokes DMS task until it is completed.
.. seealso::
- For more information on how to use this operator, take a look at the
guide:
+ For more information on how to use this sensor, take a look at the
guide:
:ref:`howto/sensor:DmsTaskCompletedSensor`
:param replication_task_arn: AWS DMS replication task ARN
diff --git a/airflow/providers/amazon/aws/sensors/eks.py
b/airflow/providers/amazon/aws/sensors/eks.py
index 92ed55da4d..76b2703e89 100644
--- a/airflow/providers/amazon/aws/sensors/eks.py
+++ b/airflow/providers/amazon/aws/sensors/eks.py
@@ -121,7 +121,7 @@ class EksFargateProfileStateSensor(BaseSensorOperator):
Check the state of an AWS Fargate profile until it reaches the target
state or another terminal state.
.. seealso::
- For more information on how to use this operator, take a look at the
guide:
+ For more information on how to use this sensor, take a look at the
guide:
:ref:`howto/sensor:EksFargateProfileStateSensor`
:param cluster_name: The name of the Cluster which the AWS Fargate profile
is attached to. (templated)
diff --git a/airflow/providers/amazon/aws/sensors/glacier.py
b/airflow/providers/amazon/aws/sensors/glacier.py
index e92f5a4326..8e8b74c58c 100644
--- a/airflow/providers/amazon/aws/sensors/glacier.py
+++ b/airflow/providers/amazon/aws/sensors/glacier.py
@@ -38,7 +38,7 @@ class GlacierJobOperationSensor(BaseSensorOperator):
Glacier sensor for checking job state. This operator runs only in
reschedule mode.
.. seealso::
- For more information on how to use this operator, take a look at the
guide:
+ For more information on how to use this sensor, take a look at the
guide:
:ref:`howto/sensor:GlacierJobOperationSensor`
:param aws_conn_id: The reference to the AWS connection details
diff --git a/airflow/providers/amazon/aws/sensors/rds.py
b/airflow/providers/amazon/aws/sensors/rds.py
index 3c24c82fbf..54ee50875e 100644
--- a/airflow/providers/amazon/aws/sensors/rds.py
+++ b/airflow/providers/amazon/aws/sensors/rds.py
@@ -70,7 +70,7 @@ class RdsSnapshotExistenceSensor(RdsBaseSensor):
Waits for RDS snapshot with a specific status.
.. seealso::
- For more information on how to use this operator, take a look at the
guide:
+ For more information on how to use this sensor, take a look at the
guide:
:ref:`howto/sensor:RdsSnapshotExistenceSensor`
:param db_type: Type of the DB - either "instance" or "cluster"
@@ -112,7 +112,7 @@ class RdsExportTaskExistenceSensor(RdsBaseSensor):
Waits for RDS export task with a specific status.
.. seealso::
- For more information on how to use this operator, take a look at the
guide:
+ For more information on how to use this sensor, take a look at the
guide:
:ref:`howto/sensor:RdsExportTaskExistenceSensor`
:param export_task_identifier: A unique identifier for the snapshot export
task.
diff --git a/docs/apache-airflow-providers-amazon/operators/emr.rst
b/docs/apache-airflow-providers-amazon/operators/emr.rst
index a62b503740..6bb16bd494 100644
--- a/docs/apache-airflow-providers-amazon/operators/emr.rst
+++ b/docs/apache-airflow-providers-amazon/operators/emr.rst
@@ -53,10 +53,10 @@ JobFlow configuration
To create a job flow on EMR, you need to specify the configuration for the EMR
cluster:
-.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_emr_job_flow_automatic_steps.py
+.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_emr.py
:language: python
- :start-after: [START howto_operator_emr_automatic_steps_config]
- :end-before: [END howto_operator_emr_automatic_steps_config]
+ :start-after: [START howto_operator_emr_steps_config]
+ :end-before: [END howto_operator_emr_steps_config]
Here we create an EMR single-node Cluster *PiCalc*. It only has a single step
*calculate_pi* which
calculates the value of ``Pi`` using Spark. The config
``'KeepJobFlowAliveWhenNoSteps': False``
@@ -76,7 +76,7 @@ Create the Job Flow
In the following code we are creating a new job flow using the configuration
as explained above.
-.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_emr_job_flow_automatic_steps.py
+.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_emr.py
:language: python
:dedent: 4
:start-after: [START howto_operator_emr_create_job_flow]
@@ -90,7 +90,7 @@ Add Steps to an EMR job flow
To add steps to an existing EMR Job flow you can use
:class:`~airflow.providers.amazon.aws.operators.emr.EmrAddStepsOperator`.
-.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_emr_job_flow_manual_steps.py
+.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_emr.py
:language: python
:dedent: 4
:start-after: [START howto_operator_emr_add_steps]
@@ -104,7 +104,7 @@ Terminate an EMR job flow
To terminate an EMR Job Flow you can use
:class:`~airflow.providers.amazon.aws.operators.emr.EmrTerminateJobFlowOperator`.
-.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_emr_job_flow_manual_steps.py
+.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_emr.py
:language: python
:dedent: 4
:start-after: [START howto_operator_emr_terminate_job_flow]
@@ -118,17 +118,15 @@ Modify Amazon EMR container
To modify an existing EMR container you can use
:class:`~airflow.providers.amazon.aws.sensors.emr.EmrContainerSensor`.
+.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_emr.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_emr_modify_cluster]
+ :end-before: [END howto_operator_emr_modify_cluster]
+
Sensors
-------
-.. _howto/sensor:EmrContainerSensor:
-
-Wait on an Amazon EMR container state
-=====================================
-
-To monitor the state of an EMR container you can use
-:class:`~airflow.providers.amazon.aws.sensors.emr.EmrContainerSensor`.
-
.. _howto/sensor:EmrJobFlowSensor:
Wait on an Amazon EMR job flow state
@@ -137,11 +135,11 @@ Wait on an Amazon EMR job flow state
To monitor the state of an EMR job flow you can use
:class:`~airflow.providers.amazon.aws.sensors.emr.EmrJobFlowSensor`.
-.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_emr_job_flow_automatic_steps.py
+.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_emr.py
:language: python
:dedent: 4
- :start-after: [START howto_sensor_emr_job_flow_sensor]
- :end-before: [END howto_sensor_emr_job_flow_sensor]
+ :start-after: [START howto_sensor_emr_job_flow]
+ :end-before: [END howto_sensor_emr_job_flow]
.. _howto/sensor:EmrStepSensor:
@@ -151,11 +149,11 @@ Wait on an Amazon EMR step state
To monitor the state of a step running an existing EMR Job flow you can use
:class:`~airflow.providers.amazon.aws.sensors.emr.EmrStepSensor`.
-.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_emr_job_flow_manual_steps.py
+.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_emr.py
:language: python
:dedent: 4
- :start-after: [START howto_sensor_emr_step_sensor]
- :end-before: [END howto_sensor_emr_step_sensor]
+ :start-after: [START howto_sensor_emr_step]
+ :end-before: [END howto_sensor_emr_step]
Reference
---------
diff --git a/docs/apache-airflow-providers-amazon/operators/emr_eks.rst
b/docs/apache-airflow-providers-amazon/operators/emr_eks.rst
index b84fbf5ee5..3e9e58fbca 100644
--- a/docs/apache-airflow-providers-amazon/operators/emr_eks.rst
+++ b/docs/apache-airflow-providers-amazon/operators/emr_eks.rst
@@ -23,9 +23,6 @@ Amazon EMR on Amazon EKS
provides a deployment option for Amazon EMR that allows you to run open-source
big data frameworks on
Amazon EKS.
-Airflow provides the
:class:`~airflow.providers.amazon.aws.operators.emr.EmrContainerOperator`
-to submit Apache Spark jobs to your EMR on EKS virtual cluster.
-
Prerequisite Tasks
------------------
@@ -34,18 +31,18 @@ Prerequisite Tasks
Operators
---------
-.. _howto/operator:EMRContainersOperators:
+.. _howto/operator:EmrContainerOperator:
-Run a Spark job on EMR on EKS
-=============================
+Submit a job to an Amazon EMR virtual cluster
+=============================================
.. note::
This example assumes that you already have an EMR on EKS virtual cluster
configured. See the
`EMR on EKS Getting Started guide
<https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/getting-started.html>`__
for more information.
-The ``EMRContainerOperator`` will submit a new job to an Amazon EMR on Amazon
EKS virtual cluster and wait for
-the job to complete. The example job below calculates the mathematical
constant ``Pi``. In a
+The ``EmrContainerOperator`` will submit a new job to an Amazon EMR on Amazon
EKS virtual cluster
+The example job below calculates the mathematical constant ``Pi``. In a
production job, you would usually refer to a Spark script on Amazon Simple
Storage Service (S3).
To create a job for Amazon EMR on Amazon EKS, you need to specify your virtual
cluster ID, the release of Amazon EMR you
@@ -59,28 +56,42 @@ and ``monitoringConfiguration`` to send logs to the
``/aws/emr-eks-spark`` log g
Refer to the `EMR on EKS guide
<https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/emr-eks-jobs-CLI.html#emr-eks-jobs-parameters>`__
for more details on job configuration.
-.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_emr_eks_job.py
+.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_emr_eks.py
:language: python
:start-after: [START howto_operator_emr_eks_config]
:end-before: [END howto_operator_emr_eks_config]
-
We pass the ``virtual_cluster_id`` and ``execution_role_arn`` values as
operator parameters, but you
can store them in a connection or provide them in the DAG. Your AWS region
should be defined either
in the ``aws_default`` connection as ``{"region_name": "us-east-1"}`` or a
custom connection name
-that gets passed to the operator with the ``aws_conn_id`` parameter.
+that gets passed to the operator with the ``aws_conn_id`` parameter. The
operator returns the Job ID of the job run.
-.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_emr_eks_job.py
+.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_emr_eks.py
:language: python
:dedent: 4
- :start-after: [START howto_operator_emr_eks_job]
- :end-before: [END howto_operator_emr_eks_job]
+ :start-after: [START howto_operator_emr_container]
+ :end-before: [END howto_operator_emr_container]
+
+Sensors
+-------
+
+.. _howto/sensor:EmrContainerSensor:
-With the ``EmrContainerOperator``, it will wait until the successful
completion of the job or raise
-an ``AirflowException`` if there is an error. The operator returns the Job ID
of the job run.
+Wait on an Amazon EMR virtual cluster job
+=========================================
+
+To wait on the status of an Amazon EMR virtual cluster job to reach a terminal
state, you can use
+:class:`~airflow.providers.amazon.aws.sensors.emr.EmrContainerSensor`
+
+.. exampleinclude::
/../../airflow/providers/amazon/aws/example_dags/example_emr_eks.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_sensor_emr_container]
+ :end-before: [END howto_sensor_emr_container]
Reference
---------
+* `AWS boto3 library documentation for EMR Containers
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr-containers.html>`__
* `Amazon EMR on EKS Job runs
<https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/job-runs.html>`__
* `EMR on EKS Best Practices
<https://aws.github.io/aws-emr-containers-best-practices/>`__
diff --git a/tests/always/test_project_structure.py
b/tests/always/test_project_structure.py
index 1c1bedf7fc..d0b9e1c060 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -433,9 +433,6 @@ class
TestAmazonProviderProjectStructure(ExampleCoverageTest):
}
MISSING_EXAMPLES_FOR_CLASSES = {
- # EMR legitimately missing, needs development
- 'airflow.providers.amazon.aws.operators.emr.EmrModifyClusterOperator',
- 'airflow.providers.amazon.aws.sensors.emr.EmrContainerSensor',
# S3 Exasol transfer difficult to test, see:
https://github.com/apache/airflow/issues/22632
'airflow.providers.amazon.aws.transfers.exasol_to_s3.ExasolToS3Operator',
# Glue Catalog sensor difficult to test