Taragolis commented on code in PR #26687:
URL: https://github.com/apache/airflow/pull/26687#discussion_r982116922
##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -77,22 +81,92 @@ def get_cluster_id_by_name(self, emr_cluster_name: str,
cluster_states: list[str
def create_job_flow(self, job_flow_overrides: dict[str, Any]) -> dict[str,
Any]:
"""
- Creates a job flow using the config from the EMR connection.
- Keys of the json extra hash may have the arguments of the boto3
- run_job_flow method.
- Overrides for this config may be passed as the job_flow_overrides.
+ Create and start running a new cluster (job flow).
+
+ This method use ``EmrHook.emr_conn_id`` for receive initial Amazon EMR
cluster configuration.
+ If ``EmrHook.emr_conn_id`` is empty or connection not exists than
empty initial configuration is used.
+
+ :param job_flow_overrides: Uses for overwrite parameters in initial
Amazon EMR configuration cluster.
+ The resulting configuration will be used in the boto3 emr client
run_job_flow method.
+
+ .. seealso::
+ - :ref:`Amazon Elastic MapReduce Connection <howto/connection:emr>`
+ - `API RunJobFlow
<https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html>`_
+ - `boto3 emr client run_job_flow method
<https://boto3.amazonaws.com/v1/documentation/\
+
api/latest/reference/services/emr.html#EMR.Client.run_job_flow>`_.
"""
- try:
- emr_conn = self.get_connection(self.emr_conn_id)
- config = emr_conn.extra_dejson.copy()
- except AirflowNotFoundException:
- config = {}
+ config = {}
+ if self.emr_conn_id:
+ try:
+ emr_conn = self.get_connection(self.emr_conn_id)
+ except AirflowNotFoundException:
+ warnings.warn(
+ f"Unable to find Amazon Elastic MapReduce Connection ID
{self.emr_conn_id!r}, "
+ "use empty initial configuration. If you want to get rid
of this warning "
+ "message please set `emr_conn_id` to None.",
+ UserWarning,
+ stacklevel=2,
+ )
+ else:
+ if emr_conn.conn_type and emr_conn.conn_type != "emr":
Review Comment:
**tl;dr**:
Right now in the PR I suggest that if user not provide any of conn_type,
that user mean `emr` however we could do differently if it not set (or empty)
than assume that might something went wrong: `if emr_conn.conn_type != "emr":`
**Long explanation**
First of all why we need use some magic: `conn_type` not a mandatory
attribute for class `airflow.models.connection.Connection` and could be an
empty string/None.
Actually it not easy to achieve this, for connection in the UI I thought it
almost not possible,
URI base should miss schema something like this
`://AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI%2FK7MDENG%2FbPxRfiCYEXAMPLEKEY@` or for
JSON it might be
```json
{
"login": "AKIAIOSFODNN7EXAMPLE",
"password": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
}
```
Connection object do not provided any abilities for actual validate
conn_type. So we should do something like
1. `if emr_conn.conn_type and emr_conn.conn_type != "emr":`
2. `if (emr_conn.conn_type or "emr") != "emr":`
3. `if emr_conn.conn_type not in ("emr", None):`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]