dahero95 opened a new issue, #33461: URL: https://github.com/apache/airflow/issues/33461
### Apache Airflow version Other Airflow 2 version (please specify below) ### What happened I'm using airflow 2.6.2 with apache-airflow-providers-amazon 8.5.1 When I use AppflowHook with the wait_for_completion parameter set to True the task execution never finishes. I have checked in Appflow and the flow executes correctly and finishes in a couple of seconds, however, AppflowHook does not finish responding. If I change wait_for_completion to False, everything works correctly. The logs show a "403 FORBIDDEN" error and marking the task as success or failed fixes the logs. **Logs during task execution:** 470b2412b735 *** Found local files: *** * /opt/airflow/logs/dag_id=stripe_ingest_flow/run_id=manual__2023-08-17T01:04:41.723261+00:00/task_id=extract_from_stripe/attempt=1.log *** !!!! Please make sure that all your Airflow components (e.g. schedulers, webservers, workers and triggerer) have the same 'secret_key' configured in 'webserver' section and time is synchronized on all your machines (for example with ntpd) See more at https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#secret-key *** Could not read served logs: Client error '403 FORBIDDEN' for url 'http://470b2412b735:8793/log/dag_id=stripe_ingest_flow/run_id=manual__2023-08-17T01:04:41.723261+00:00/task_id=extract_from_stripe/attempt=1.log' For more information check: https://httpstatuses.com/403 [2023-08-16, 19:04:44 CST] {logging_mixin.py:149} INFO - Changing /opt/***/logs/dag_id=stripe_ingest_flow/run_id=manual__2023-08-17T01:04:41.723261+00:00/task_id=extract_from_stripe permission to 509 [2023-08-16, 19:04:44 CST] {logging_mixin.py:149} INFO - Changing /opt/***/logs/dag_id=stripe_ingest_flow/run_id=manual__2023-08-17T01:04:41.723261+00:00/task_id=extract_from_stripe permission to 509 [2023-08-16, 19:04:44 CST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: stripe_ingest_flow.extract_from_stripe manual__2023-08-17T01:04:41.723261+00:00 [queued]> [2023-08-16, 19:04:44 CST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: stripe_ingest_flow.extract_from_stripe manual__2023-08-17T01:04:41.723261+00:00 [queued]> [2023-08-16, 19:04:44 CST] {taskinstance.py:1308} INFO - Starting attempt 1 of 1 [2023-08-16, 19:04:44 CST] {taskinstance.py:1327} INFO - Executing <Task(_PythonDecoratedOperator): extract_from_stripe> on 2023-08-17 01:04:41.723261+00:00 [2023-08-16, 19:04:44 CST] {standard_task_runner.py:57} INFO - Started process 796 to run task [2023-08-16, 19:04:44 CST] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'stripe_ingest_flow', 'extract_from_stripe', 'manual__2023-08-17T01:04:41.723261+00:00', '--job-id', '903', '--raw', '--subdir', 'DAGS_FOLDER/stripe_ingest_flow_to_lakehouse/dag.py', '--cfg-path', '/tmp/tmpqz8uvben'] [2023-08-16, 19:04:44 CST] {standard_task_runner.py:85} INFO - Job 903: Subtask extract_from_stripe [2023-08-16, 19:04:44 CST] {logging_mixin.py:149} INFO - Changing /opt/***/logs/dag_id=stripe_ingest_flow/run_id=manual__2023-08-17T01:04:41.723261+00:00/task_id=extract_from_stripe permission to 509 [2023-08-16, 19:04:44 CST] {task_command.py:410} INFO - Running <TaskInstance: stripe_ingest_flow.extract_from_stripe manual__2023-08-17T01:04:41.723261+00:00 [running]> on host 470b2412b735 [2023-08-16, 19:04:44 CST] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='dhernandez' AIRFLOW_CTX_DAG_ID='stripe_ingest_flow' AIRFLOW_CTX_TASK_ID='extract_from_stripe' AIRFLOW_CTX_EXECUTION_DATE='2023-08-17T01:04:41.723261+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-08-17T01:04:41.723261+00:00' [2023-08-16, 19:04:44 CST] {crypto.py:83} WARNING - empty cryptography key - values will not be stored encrypted. [2023-08-16, 19:04:44 CST] {base.py:73} INFO - Using connection ID 'siclo_***_lakehouse_conn' for task execution. [2023-08-16, 19:04:44 CST] {connection_wrapper.py:340} INFO - AWS Connection (conn_id='siclo_***_lakehouse_conn', conn_type='aws') credentials retrieved from login and password. [2023-08-16, 19:04:45 CST] {appflow.py:63} INFO - executionId: 58ad6275-0a70-48d9-8414-f0215924c876 **Logs when marking the task as success or failed** 470b2412b735 *** Found local files: *** * /opt/airflow/logs/dag_id=stripe_ingest_flow/run_id=manual__2023-08-17T01:04:41.723261+00:00/task_id=extract_from_stripe/attempt=1.log [2023-08-16, 19:04:44 CST] {logging_mixin.py:149} INFO - Changing /opt/***/logs/dag_id=stripe_ingest_flow/run_id=manual__2023-08-17T01:04:41.723261+00:00/task_id=extract_from_stripe permission to 509 [2023-08-16, 19:04:44 CST] {logging_mixin.py:149} INFO - Changing /opt/***/logs/dag_id=stripe_ingest_flow/run_id=manual__2023-08-17T01:04:41.723261+00:00/task_id=extract_from_stripe permission to 509 [2023-08-16, 19:04:44 CST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: stripe_ingest_flow.extract_from_stripe manual__2023-08-17T01:04:41.723261+00:00 [queued]> [2023-08-16, 19:04:44 CST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: stripe_ingest_flow.extract_from_stripe manual__2023-08-17T01:04:41.723261+00:00 [queued]> [2023-08-16, 19:04:44 CST] {taskinstance.py:1308} INFO - Starting attempt 1 of 1 [2023-08-16, 19:04:44 CST] {taskinstance.py:1327} INFO - Executing <Task(_PythonDecoratedOperator): extract_from_stripe> on 2023-08-17 01:04:41.723261+00:00 [2023-08-16, 19:04:44 CST] {standard_task_runner.py:57} INFO - Started process 796 to run task [2023-08-16, 19:04:44 CST] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'stripe_ingest_flow', 'extract_from_stripe', 'manual__2023-08-17T01:04:41.723261+00:00', '--job-id', '903', '--raw', '--subdir', 'DAGS_FOLDER/stripe_ingest_flow_to_lakehouse/dag.py', '--cfg-path', '/tmp/tmpqz8uvben'] [2023-08-16, 19:04:44 CST] {standard_task_runner.py:85} INFO - Job 903: Subtask extract_from_stripe [2023-08-16, 19:04:44 CST] {logging_mixin.py:149} INFO - Changing /opt/***/logs/dag_id=stripe_ingest_flow/run_id=manual__2023-08-17T01:04:41.723261+00:00/task_id=extract_from_stripe permission to 509 [2023-08-16, 19:04:44 CST] {task_command.py:410} INFO - Running <TaskInstance: stripe_ingest_flow.extract_from_stripe manual__2023-08-17T01:04:41.723261+00:00 [running]> on host 470b2412b735 [2023-08-16, 19:04:44 CST] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='dhernandez' AIRFLOW_CTX_DAG_ID='stripe_ingest_flow' AIRFLOW_CTX_TASK_ID='extract_from_stripe' AIRFLOW_CTX_EXECUTION_DATE='2023-08-17T01:04:41.723261+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-08-17T01:04:41.723261+00:00' [2023-08-16, 19:04:44 CST] {crypto.py:83} WARNING - empty cryptography key - values will not be stored encrypted. [2023-08-16, 19:04:44 CST] {base.py:73} INFO - Using connection ID 'siclo_***_lakehouse_conn' for task execution. [2023-08-16, 19:04:44 CST] {connection_wrapper.py:340} INFO - AWS Connection (conn_id='siclo_***_lakehouse_conn', conn_type='aws') credentials retrieved from login and password. [2023-08-16, 19:04:45 CST] {appflow.py:63} INFO - executionId: 58ad6275-0a70-48d9-8414-f0215924c876 [2023-08-16, 19:05:24 CST] {local_task_job_runner.py:291} WARNING - State of this instance has been externally set to failed. Terminating instance. [2023-08-16, 19:05:24 CST] {process_utils.py:131} INFO - Sending 15 to group 796. PIDs of all processes in the group: [796] [2023-08-16, 19:05:24 CST] {process_utils.py:86} INFO - Sending the signal 15 to group 796 [2023-08-16, 19:05:24 CST] {taskinstance.py:1517} ERROR - Received SIGTERM. Terminating subprocesses. ### What you think should happen instead That having wait_for_completion set to True, the task finishes successfully and retrieves the execution id from appflow. ### How to reproduce With a dag that has the following task ``` @task def extract(): appflow = AppflowHook( aws_conn_id='conn_id' ) execution_id = appflow.run_flow( flow_name='flow_name', wait_for_completion=True # with wait_for_completion=False if it works ) return execution_id ``` The aws connection has the following permissions - "appflow:DescribeFlow", - "appflow:StartFlow", - "appflow:RunFlow", - "appflow:ListFlows", - "appflow:DescribeFlowExecutionRecords" ### Operating System Debian GNU/Linux 11 (bullseye) ### Versions of Apache Airflow Providers apache-airflow==2.6.2 apache-airflow-providers-amazon==8.5.1 apache-airflow-providers-common-sql==1.5.1 apache-airflow-providers-http==4.4.1 boto3==1.26.76 asgiref==3.7.2 watchtower==2.0.1 jsonpath-ng==1.5.3 redshift-connector==2.0.911 sqlalchemy-redshift==0.8.14 mypy-boto3-appflow==1.28.16 mypy-boto3-rds==1.26.144 mypy-boto3-redshift-data==1.26.109 mypy-boto3-s3==1.26.153 celery==5.3.0 ### Deployment Docker-Compose ### Deployment details Docker 4.10.1 (82475) Airflow image apache/airflow:2.6.2-python3.11 ### Anything else _No response_ ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
