This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 99d9833631 Fix: `emr_conn_id` should be optional in
`EmrCreateJobFlowOperator` (#24306)
99d9833631 is described below
commit 99d98336312d188a078721579a3f71060bdde542
Author: Pankaj Singh <[email protected]>
AuthorDate: Fri Jun 10 18:55:12 2022 +0530
Fix: `emr_conn_id` should be optional in `EmrCreateJobFlowOperator` (#24306)
Closes: #24318
---
airflow/providers/amazon/aws/hooks/emr.py | 17 ++++++++---------
airflow/providers/amazon/aws/operators/emr.py | 9 +++++++--
2 files changed, 15 insertions(+), 11 deletions(-)
diff --git a/airflow/providers/amazon/aws/hooks/emr.py
b/airflow/providers/amazon/aws/hooks/emr.py
index 143bdcdcc8..2141b38ed8 100644
--- a/airflow/providers/amazon/aws/hooks/emr.py
+++ b/airflow/providers/amazon/aws/hooks/emr.py
@@ -20,7 +20,7 @@ from typing import Any, Dict, List, Optional
from botocore.exceptions import ClientError
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, AirflowNotFoundException
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
@@ -41,8 +41,8 @@ class EmrHook(AwsBaseHook):
conn_type = 'emr'
hook_name = 'Amazon Elastic MapReduce'
- def __init__(self, emr_conn_id: Optional[str] = default_conn_name, *args,
**kwargs) -> None:
- self.emr_conn_id = emr_conn_id
+ def __init__(self, emr_conn_id: str = default_conn_name, *args, **kwargs)
-> None:
+ self.emr_conn_id: str = emr_conn_id
kwargs["client_type"] = "emr"
super().__init__(*args, **kwargs)
@@ -78,12 +78,11 @@ class EmrHook(AwsBaseHook):
run_job_flow method.
Overrides for this config may be passed as the job_flow_overrides.
"""
- if not self.emr_conn_id:
- raise AirflowException('emr_conn_id must be present to use
create_job_flow')
-
- emr_conn = self.get_connection(self.emr_conn_id)
-
- config = emr_conn.extra_dejson.copy()
+ try:
+ emr_conn = self.get_connection(self.emr_conn_id)
+ config = emr_conn.extra_dejson.copy()
+ except AirflowNotFoundException:
+ config = {}
config.update(job_flow_overrides)
response = self.get_conn().run_job_flow(**config)
diff --git a/airflow/providers/amazon/aws/operators/emr.py
b/airflow/providers/amazon/aws/operators/emr.py
index 510c77184f..67ae54af50 100644
--- a/airflow/providers/amazon/aws/operators/emr.py
+++ b/airflow/providers/amazon/aws/operators/emr.py
@@ -285,8 +285,13 @@ class EmrCreateJobFlowOperator(BaseOperator):
For more information on how to use this operator, take a look at the
guide:
:ref:`howto/operator:EmrCreateJobFlowOperator`
- :param aws_conn_id: aws connection to uses
- :param emr_conn_id: emr connection to use
+ :param aws_conn_id: The Airflow connection used for AWS credentials.
+ If this is None or empty then the default boto3 behaviour is used. If
+ running Airflow in a distributed manner and aws_conn_id is None or
+ empty, then default boto3 configuration would be used (and must be
+ maintained on each worker node)
+ :param emr_conn_id: emr connection to use for run_job_flow request body.
+ This will be overridden by the job_flow_overrides param
:param job_flow_overrides: boto3 style arguments or reference to an
arguments file
(must be '.json') to override emr_connection extra. (templated)
:param region_name: Region named passed to EmrHook