Kannedhara opened a new issue, #34458:
URL: https://github.com/apache/airflow/issues/34458

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   Apache Airflow Version: 2.2.2
   EMR script: 
https://github.com/apache/airflow/blob/2.2.2/airflow/providers/amazon/aws/operators/emr_create_job_flow.py
   Error:
   create_emr_job_flow
       job_flow_id = job_flow_creator.execute(context)
     File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/providers/amazon/aws/operators/emr_create_job_flow.py",
 line 103, in execute
       response = emr.create_job_flow(job_flow_overrides)
     File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/providers/amazon/aws/hooks/emr.py",
 line 88, in create_job_flow
       response = self.get_conn().run_job_flow(**config)
     File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/client.py", 
line 388, in _api_call
       return self._make_api_call(operation_name, kwargs)
     File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/client.py", 
line 708, in _make_api_call
       raise error_class(parsed_response, operation_name)
   botocore.exceptions.ClientError: An error occurred (ThrottlingException) 
when calling the RunJobFlow operation (reached max retries: 4): Rate exceeded
   [2023-09-06, 18:11:51 UTC] {{taskinstance.py:1280}} INFO - Marking task as 
FAILED. dag_id=DMS_source_daily, task_id=create_emr_cluster_sf_xgcb0, 
execution_date=20230906T181058, start_date=20230906T181137, 
end_date=20230906T181151
   [2023-09-06, 18:11:51 UTC] {{standard_task_runner.py:91}} ERROR - Failed to 
execute job 118988 for task create_emr_cluster_sf_xgcb0
   Traceback (most recent call last):
     File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/task/task_runner/standard_task_runner.py",
 line 85, in _start_by_fork
       args.func(args, dag=self.dag)
     File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/cli/cli_parser.py",
 line 48, in command
       return func(*args, **kwargs)
     File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", 
line 92, in wrapper
       return f(*args, **kwargs)
     File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py",
 line 292, in task_run
       _run_task_by_selected_method(args, dag, ti)
     File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py",
 line 107, in _run_task_by_selected_method
       _run_raw_task(args, ti)
     File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py",
 line 184, in _run_raw_task
       error_file=args.error_file,
     File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py",
 line 70, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py",
 line 1332, in _run_raw_task
       self._execute_task_with_callbacks(context)
     File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py",
 line 1458, in _execute_task_with_callbacks
       result = self._execute_task(context, self.task)
     File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py",
 line 1514, in _execute_task
       result = execute_callable(context=context)
     File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/operators/python.py",
 line 151, in execute
       return_value = self.execute_callable()
     File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/operators/python.py",
 line 162, in execute_callable
       return self.python_callable(*self.op_args, **self.op_kwargs)
     File "/usr/local/airflow/dags/DAG_DMS_source_daily.py", line 447, in 
create_emr_job_flow
       job_flow_id = job_flow_creator.execute(context)
     File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/providers/amazon/aws/operators/emr_create_job_flow.py",
 line 103, in execute
       response = emr.create_job_flow(job_flow_overrides)
     File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/providers/amazon/aws/hooks/emr.py",
 line 88, in create_job_flow
       response = self.get_conn().run_job_flow(**config)
     File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/client.py", 
line 388, in _api_call
       return self._make_api_call(operation_name, kwargs)
     File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/client.py", 
line 708, in _make_api_call
       raise error_class(parsed_response, operation_name)
   botocore.exceptions.ClientError: An error occurred (ThrottlingException) 
when calling the RunJobFlow operation (reached max retries: 4): Rate exceeded
   
   As suggested by Airflow document I have add extra field for AWS default 
connection but still no change.
   {"config_kwargs": {"retries": {"mode": "standard", "max_attempts": 10}}}
   
https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/connections/aws.html#set-by-environment-variables
   
   ### What you think should happen instead
   
   Airflow should be able to create EMR cluster and check the status of the EMR 
step with out any issues. The issue is happening randomly
   
   ### How to reproduce
   
   from airflow.providers.amazon.aws.operators.ecs import ECSOperator
   from airflow.providers.amazon.aws.operators.emr_add_steps import 
EmrAddStepsOperator
   from airflow.providers.amazon.aws.operators.emr_create_job_flow import 
EmrCreateJobFlowOperator
   from airflow.providers.amazon.aws.operators.emr_terminate_job_flow import 
EmrTerminateJobFlowOperator
   from airflow.providers.amazon.aws.sensors.emr_job_flow import 
EmrJobFlowSensor
   from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor
   
   create_emr= EmrCreateJobFlowOperator(
               task_id='Create_EMR_Job_Flow',
               job_flow_overrides="{" + cluster_config(**context) + "}"
           )
           job_flow_id = create_emr.execute(context)
   
   add_steps_sensor = EmrStepSensor(
           task_id="watch_spark_step",
           job_flow_id="{{ 
task_instance.xcom_pull(task_ids='create_emr_cluster', key='emr_cluster_id') 
}}",
           step_id="{{ task_instance.xcom_pull(task_ids='add_spark_step', 
key='spark_step_id') }}",
           on_success_callback=notify_task_success_to_slack,
           on_failure_callback=notify_task_failure_to_slack
       )
   
   ### Operating System
   
   MWAA
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-amazon==2.4.0
   
   ### Deployment
   
   Amazon (AWS) MWAA
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to