Taragolis commented on code in PR #26687:
URL: https://github.com/apache/airflow/pull/26687#discussion_r982935903


##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -77,22 +81,93 @@ def get_cluster_id_by_name(self, emr_cluster_name: str, 
cluster_states: list[str
 
     def create_job_flow(self, job_flow_overrides: dict[str, Any]) -> dict[str, 
Any]:
         """
-        Creates a job flow using the config from the EMR connection.
-        Keys of the json extra hash may have the arguments of the boto3
-        run_job_flow method.
-        Overrides for this config may be passed as the job_flow_overrides.
+        Create and start running a new cluster (job flow).
+
+        This method uses ``EmrHook.emr_conn_id`` to receive the initial Amazon 
EMR cluster configuration.
+        If ``EmrHook.emr_conn_id`` is empty or the connection does not exist, 
then an empty initial
+        configuration is used.
+
+        :param job_flow_overrides: Is used to overwrite the parameters in the 
initial Amazon EMR configuration
+            cluster. The resulting configuration will be used in the boto3 emr 
client run_job_flow method.
+
+        .. seealso::
+            - :ref:`Amazon Elastic MapReduce Connection <howto/connection:emr>`
+            - `API RunJobFlow 
<https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html>`_
+            - `boto3 emr client run_job_flow method 
<https://boto3.amazonaws.com/v1/documentation/\
+               
api/latest/reference/services/emr.html#EMR.Client.run_job_flow>`_.
         """
-        try:
-            emr_conn = self.get_connection(self.emr_conn_id)
-            config = emr_conn.extra_dejson.copy()
-        except AirflowNotFoundException:
-            config = {}
+        config = {}
+        if self.emr_conn_id:
+            try:
+                emr_conn = self.get_connection(self.emr_conn_id)
+            except AirflowNotFoundException:
+                warnings.warn(
+                    f"Unable to find Amazon Elastic MapReduce Connection ID 
{self.emr_conn_id!r}, "
+                    "using an empty initial configuration. If you want to get 
rid of this warning "
+                    "message please provide a valid `emr_conn_id` or set it to 
None.",
+                    UserWarning,
+                    stacklevel=2,
+                )
+            else:
+                if emr_conn.conn_type and emr_conn.conn_type != "emr":
+                    warnings.warn(
+                        "Amazon Elastic MapReduce Connection expected 
connection type 'emr', "
+                        f"Connection {self.emr_conn_id!r} has 
conn_type={emr_conn.conn_type!r}. "
+                        f"This connection might not work correctly.",
+                        UserWarning,
+                        stacklevel=2,
+                    )
+                config = emr_conn.extra_dejson.copy()
         config.update(job_flow_overrides)
 
         response = self.get_conn().run_job_flow(**config)
 
         return response
 
+    def test_connection(self):
+        """
+        Return failed state for test Amazon Elastic MapReduce Connection 
(untestable)
+
+        We need to overwrite this method because this hook is based on
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsGenericHook`,
+        otherwise it will try to test connection to AWS STS by using the 
default boto3 credential strategy.
+        """
+        return False, f"{self.hook_name} Connection cannot be tested."

Review Comment:
   > why the connection cannot be tested in the response from the API/UI.
   
   Oh actually by design of this connection. It more close to the Airflow 
Variable rather than Airflow Connection.
   
   So we just store this key-value pairs for 
https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html
   
   So nothing to test actually. It is even hardly possible to validate which 
key are valid, since this information stored somewhere in `botocore` and depend 
on which version of AWS API it supported.
   
   The main problem in logic of providers manager which is responsible for a 
decision is Connection testable or not.
   
   
https://github.com/apache/airflow/blob/c94f978a66a7cfc31b6d461bbcbfd0f2ddb2962e/airflow/providers_manager.py#L793-L800
 
   
   In our case `EmrHook` based on `AwsGenericHook` which has `test_connection` 
method so it pass connection to `emr_conn_id` which has no affect in 
`AwsGenericHook.test_connection`.
   
   But I agree that message from the current PR might be not so clear for 
end-users.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to