This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 99d9833631 Fix: `emr_conn_id` should be optional in 
`EmrCreateJobFlowOperator` (#24306)
99d9833631 is described below

commit 99d98336312d188a078721579a3f71060bdde542
Author: Pankaj Singh <[email protected]>
AuthorDate: Fri Jun 10 18:55:12 2022 +0530

    Fix: `emr_conn_id` should be optional in `EmrCreateJobFlowOperator` (#24306)
    
    Closes: #24318
---
 airflow/providers/amazon/aws/hooks/emr.py     | 17 ++++++++---------
 airflow/providers/amazon/aws/operators/emr.py |  9 +++++++--
 2 files changed, 15 insertions(+), 11 deletions(-)

diff --git a/airflow/providers/amazon/aws/hooks/emr.py 
b/airflow/providers/amazon/aws/hooks/emr.py
index 143bdcdcc8..2141b38ed8 100644
--- a/airflow/providers/amazon/aws/hooks/emr.py
+++ b/airflow/providers/amazon/aws/hooks/emr.py
@@ -20,7 +20,7 @@ from typing import Any, Dict, List, Optional
 
 from botocore.exceptions import ClientError
 
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, AirflowNotFoundException
 from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
 
 
@@ -41,8 +41,8 @@ class EmrHook(AwsBaseHook):
     conn_type = 'emr'
     hook_name = 'Amazon Elastic MapReduce'
 
-    def __init__(self, emr_conn_id: Optional[str] = default_conn_name, *args, 
**kwargs) -> None:
-        self.emr_conn_id = emr_conn_id
+    def __init__(self, emr_conn_id: str = default_conn_name, *args, **kwargs) 
-> None:
+        self.emr_conn_id: str = emr_conn_id
         kwargs["client_type"] = "emr"
         super().__init__(*args, **kwargs)
 
@@ -78,12 +78,11 @@ class EmrHook(AwsBaseHook):
         run_job_flow method.
         Overrides for this config may be passed as the job_flow_overrides.
         """
-        if not self.emr_conn_id:
-            raise AirflowException('emr_conn_id must be present to use 
create_job_flow')
-
-        emr_conn = self.get_connection(self.emr_conn_id)
-
-        config = emr_conn.extra_dejson.copy()
+        try:
+            emr_conn = self.get_connection(self.emr_conn_id)
+            config = emr_conn.extra_dejson.copy()
+        except AirflowNotFoundException:
+            config = {}
         config.update(job_flow_overrides)
 
         response = self.get_conn().run_job_flow(**config)
diff --git a/airflow/providers/amazon/aws/operators/emr.py 
b/airflow/providers/amazon/aws/operators/emr.py
index 510c77184f..67ae54af50 100644
--- a/airflow/providers/amazon/aws/operators/emr.py
+++ b/airflow/providers/amazon/aws/operators/emr.py
@@ -285,8 +285,13 @@ class EmrCreateJobFlowOperator(BaseOperator):
         For more information on how to use this operator, take a look at the 
guide:
         :ref:`howto/operator:EmrCreateJobFlowOperator`
 
-    :param aws_conn_id: aws connection to uses
-    :param emr_conn_id: emr connection to use
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is None or empty then the default boto3 behaviour is used. If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node)
+    :param emr_conn_id: emr connection to use for run_job_flow request body.
+        This will be overridden by the job_flow_overrides param
     :param job_flow_overrides: boto3 style arguments or reference to an 
arguments file
         (must be '.json') to override emr_connection extra. (templated)
     :param region_name: Region named passed to EmrHook

Reply via email to