Kannedhara opened a new issue, #34458: URL: https://github.com/apache/airflow/issues/34458
### Apache Airflow version Other Airflow 2 version (please specify below) ### What happened Apache Airflow Version: 2.2.2 EMR script: https://github.com/apache/airflow/blob/2.2.2/airflow/providers/amazon/aws/operators/emr_create_job_flow.py Error: create_emr_job_flow job_flow_id = job_flow_creator.execute(context) File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/providers/amazon/aws/operators/emr_create_job_flow.py", line 103, in execute response = emr.create_job_flow(job_flow_overrides) File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/providers/amazon/aws/hooks/emr.py", line 88, in create_job_flow response = self.get_conn().run_job_flow(**config) File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/client.py", line 388, in _api_call return self._make_api_call(operation_name, kwargs) File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/client.py", line 708, in _make_api_call raise error_class(parsed_response, operation_name) botocore.exceptions.ClientError: An error occurred (ThrottlingException) when calling the RunJobFlow operation (reached max retries: 4): Rate exceeded [2023-09-06, 18:11:51 UTC] {{taskinstance.py:1280}} INFO - Marking task as FAILED. dag_id=DMS_source_daily, task_id=create_emr_cluster_sf_xgcb0, execution_date=20230906T181058, start_date=20230906T181137, end_date=20230906T181151 [2023-09-06, 18:11:51 UTC] {{standard_task_runner.py:91}} ERROR - Failed to execute job 118988 for task create_emr_cluster_sf_xgcb0 Traceback (most recent call last): File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/task/task_runner/standard_task_runner.py", line 85, in _start_by_fork args.func(args, dag=self.dag) File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 48, in command return func(*args, **kwargs) File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", line 92, in wrapper return f(*args, **kwargs) File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 292, in task_run _run_task_by_selected_method(args, dag, ti) File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 107, in _run_task_by_selected_method _run_raw_task(args, ti) File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 184, in _run_raw_task error_file=args.error_file, File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 70, in wrapper return func(*args, session=session, **kwargs) File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task self._execute_task_with_callbacks(context) File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks result = self._execute_task(context, self.task) File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task result = execute_callable(context=context) File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/operators/python.py", line 151, in execute return_value = self.execute_callable() File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/operators/python.py", line 162, in execute_callable return self.python_callable(*self.op_args, **self.op_kwargs) File "/usr/local/airflow/dags/DAG_DMS_source_daily.py", line 447, in create_emr_job_flow job_flow_id = job_flow_creator.execute(context) File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/providers/amazon/aws/operators/emr_create_job_flow.py", line 103, in execute response = emr.create_job_flow(job_flow_overrides) File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/providers/amazon/aws/hooks/emr.py", line 88, in create_job_flow response = self.get_conn().run_job_flow(**config) File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/client.py", line 388, in _api_call return self._make_api_call(operation_name, kwargs) File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/client.py", line 708, in _make_api_call raise error_class(parsed_response, operation_name) botocore.exceptions.ClientError: An error occurred (ThrottlingException) when calling the RunJobFlow operation (reached max retries: 4): Rate exceeded As suggested by Airflow document I have add extra field for AWS default connection but still no change. {"config_kwargs": {"retries": {"mode": "standard", "max_attempts": 10}}} https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/connections/aws.html#set-by-environment-variables ### What you think should happen instead Airflow should be able to create EMR cluster and check the status of the EMR step with out any issues. The issue is happening randomly ### How to reproduce from airflow.providers.amazon.aws.operators.ecs import ECSOperator from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator from airflow.providers.amazon.aws.operators.emr_terminate_job_flow import EmrTerminateJobFlowOperator from airflow.providers.amazon.aws.sensors.emr_job_flow import EmrJobFlowSensor from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor create_emr= EmrCreateJobFlowOperator( task_id='Create_EMR_Job_Flow', job_flow_overrides="{" + cluster_config(**context) + "}" ) job_flow_id = create_emr.execute(context) add_steps_sensor = EmrStepSensor( task_id="watch_spark_step", job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='emr_cluster_id') }}", step_id="{{ task_instance.xcom_pull(task_ids='add_spark_step', key='spark_step_id') }}", on_success_callback=notify_task_success_to_slack, on_failure_callback=notify_task_failure_to_slack ) ### Operating System MWAA ### Versions of Apache Airflow Providers apache-airflow-providers-amazon==2.4.0 ### Deployment Amazon (AWS) MWAA ### Deployment details _No response_ ### Anything else _No response_ ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
