Taragolis commented on code in PR #26687:
URL: https://github.com/apache/airflow/pull/26687#discussion_r982935903
##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -77,22 +81,93 @@ def get_cluster_id_by_name(self, emr_cluster_name: str,
cluster_states: list[str
def create_job_flow(self, job_flow_overrides: dict[str, Any]) -> dict[str,
Any]:
"""
- Creates a job flow using the config from the EMR connection.
- Keys of the json extra hash may have the arguments of the boto3
- run_job_flow method.
- Overrides for this config may be passed as the job_flow_overrides.
+ Create and start running a new cluster (job flow).
+
+ This method uses ``EmrHook.emr_conn_id`` to receive the initial Amazon
EMR cluster configuration.
+ If ``EmrHook.emr_conn_id`` is empty or the connection does not exist,
then an empty initial
+ configuration is used.
+
+ :param job_flow_overrides: Is used to overwrite the parameters in the
initial Amazon EMR configuration
+ cluster. The resulting configuration will be used in the boto3 emr
client run_job_flow method.
+
+ .. seealso::
+ - :ref:`Amazon Elastic MapReduce Connection <howto/connection:emr>`
+ - `API RunJobFlow
<https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html>`_
+ - `boto3 emr client run_job_flow method
<https://boto3.amazonaws.com/v1/documentation/\
+
api/latest/reference/services/emr.html#EMR.Client.run_job_flow>`_.
"""
- try:
- emr_conn = self.get_connection(self.emr_conn_id)
- config = emr_conn.extra_dejson.copy()
- except AirflowNotFoundException:
- config = {}
+ config = {}
+ if self.emr_conn_id:
+ try:
+ emr_conn = self.get_connection(self.emr_conn_id)
+ except AirflowNotFoundException:
+ warnings.warn(
+ f"Unable to find Amazon Elastic MapReduce Connection ID
{self.emr_conn_id!r}, "
+ "using an empty initial configuration. If you want to get
rid of this warning "
+ "message please provide a valid `emr_conn_id` or set it to
None.",
+ UserWarning,
+ stacklevel=2,
+ )
+ else:
+ if emr_conn.conn_type and emr_conn.conn_type != "emr":
+ warnings.warn(
+ "Amazon Elastic MapReduce Connection expected
connection type 'emr', "
+ f"Connection {self.emr_conn_id!r} has
conn_type={emr_conn.conn_type!r}. "
+ f"This connection might not work correctly.",
+ UserWarning,
+ stacklevel=2,
+ )
+ config = emr_conn.extra_dejson.copy()
config.update(job_flow_overrides)
response = self.get_conn().run_job_flow(**config)
return response
+ def test_connection(self):
+ """
+ Return failed state for test Amazon Elastic MapReduce Connection
(untestable)
+
+ We need to overwrite this method because this hook is based on
+ :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsGenericHook`,
+ otherwise it will try to test connection to AWS STS by using the
default boto3 credential strategy.
+ """
+ return False, f"{self.hook_name} Connection cannot be tested."
Review Comment:
> why the connection cannot be tested in the response from the API/UI.
Oh actually by design of this connection. It more close to the Airflow
Variable rather than Airflow Connection.
So we just store this key-value pairs for
https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html
So nothing to test actually. It is even hardly possible to validate which
key are valid, since this information stored somewhere in `botocore` and depend
on which version of AWS API it supported.
The main problem in logic of providers manager which is responsible for a
decision is Connection testable or not.
https://github.com/apache/airflow/blob/c94f978a66a7cfc31b6d461bbcbfd0f2ddb2962e/airflow/providers_manager.py#L793-L800
In our case `EmrHook` based on `AwsGenericHook` which has `test_connection`
method so it pass connection to `emr_conn_id` which has no affect in
`AwsGenericHook.test_connection`.
But I agree that message from the current PR might be not so clear for
end-users.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]