Lee-W opened a new issue, #31480:
URL: https://github.com/apache/airflow/issues/31480

   ### Apache Airflow version
   
   main (development)
   
   ### What happened
   
   Encounter the following error when executing `EmrCreateJobFlowOperator`
   
   ```
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.10/site-packages/airflow/providers/amazon/aws/operators/emr.py",
 line 695, in execute
       log_uri=get_log_uri(emr_client=self._emr_hook.conn, 
job_flow_id=self._job_flow_id),
     File 
"/usr/local/lib/python3.10/site-packages/airflow/providers/amazon/aws/links/emr.py",
 line 61, in get_log_uri
       log_uri = S3Hook.parse_s3_url(response["Cluster"]["LogUri"])
   KeyError: 'LogUri'
   ```
   
   According to the [this 
document](https://docs.aws.amazon.com/cli/latest/reference/emr/describe-cluster.html),
 it seems we might not always be able to get `["Cluster"]["LogUri"]`  and we 
encounter errors after the release in 
https://github.com/apache/airflow/issues/31322.
   
   ### What you think should happen instead
   
   The `EmrCreateJobFlowOperator` should finish execution without error.
   
   ### How to reproduce
   
   1. `git clone github.com/apache/airflow/`
   2. `cd airflow`
   3. `git checkout c082aec089405ed0399cfee548011b0520be0011` (the main branch 
when I found this issue)
   4. Add the following DAG to `files/dags/` and name it as `example_emr.py`
   
   
   ```python
   import os
   from datetime import datetime, timedelta
   
   from airflow import DAG
   from airflow.providers.amazon.aws.operators.emr import 
EmrCreateJobFlowOperator, EmrTerminateJobFlowOperator
   
   
   JOB_FLOW_OVERRIDES = {
       "Name": "example_emr_sensor_cluster",
       "ReleaseLabel": "emr-5.29.0",
       "Applications": [{"Name": "Spark"}],
       "Instances": {
           "InstanceGroups": [
               {
                   "Name": "Primary node",
                   "Market": "ON_DEMAND",
                   "InstanceRole": "MASTER",
                   "InstanceType": "m4.large",
                   "InstanceCount": 1,
               },
           ],
           "KeepJobFlowAliveWhenNoSteps": False,
           "TerminationProtected": False,
       },
       "JobFlowRole": "EMR_EC2_DefaultRole"
       "ServiceRole": "EMR_DefaultRole",
   }
   
   DEFAULT_ARGS = {
       "execution_timeout": timedelta(hours=6),
       "retries": 2,
       "retry_delay": 60,
   }
   
   
   with DAG(
       dag_id="example_emr_sensor",
       schedule=None,
       start_date=datetime(2022, 1, 1),
       default_args=DEFAULT_ARGS,
       catchup=False,
   ) as dag:
       create_job_flow = EmrCreateJobFlowOperator(
           task_id="create_job_flow",
           job_flow_overrides=JOB_FLOW_OVERRIDES,
           aws_conn_id="aws_default",
       )
   
       remove_job_flow = EmrTerminateJobFlowOperator(
           task_id="remove_job_flow",
           job_flow_id=create_job_flow.output,
           aws_conn_id="aws_default",
           trigger_rule="all_done",
       )
   
       create_job_flow >> remove_job_flow
   ```
   
   5. `breeze --python 3.8 --backend sqlite start-airflow`
   6. Trigger the DAG from web UI
   
   ### Operating System
   
   mac OS 13.4
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   https://github.com/apache/airflow/issues/31322#issuecomment-1556876252
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to