dahero95 opened a new issue, #33461:
URL: https://github.com/apache/airflow/issues/33461

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   I'm using airflow 2.6.2 with apache-airflow-providers-amazon 8.5.1
   
   When I use AppflowHook with the wait_for_completion parameter set to True 
the task execution never finishes.
   I have checked in Appflow and the flow executes correctly and finishes in a 
couple of seconds, however, AppflowHook does not finish responding.
   If I change wait_for_completion to False, everything works correctly.
   
   The logs show a "403 FORBIDDEN" error and marking the task as success or 
failed fixes the logs.
   
   **Logs during task execution:**
   470b2412b735 *** Found local files: 
   *** * 
/opt/airflow/logs/dag_id=stripe_ingest_flow/run_id=manual__2023-08-17T01:04:41.723261+00:00/task_id=extract_from_stripe/attempt=1.log
 *** !!!! Please make sure that all your Airflow components (e.g. schedulers, 
webservers, workers and triggerer) have the same 'secret_key' configured in 
'webserver' section and time is synchronized on all your machines (for example 
with ntpd) See more at 
https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#secret-key
 *** Could not read served logs: Client error '403 FORBIDDEN' for url 
'http://470b2412b735:8793/log/dag_id=stripe_ingest_flow/run_id=manual__2023-08-17T01:04:41.723261+00:00/task_id=extract_from_stripe/attempt=1.log'
 For more information check: https://httpstatuses.com/403
   [2023-08-16, 19:04:44 CST] {logging_mixin.py:149} INFO - Changing 
/opt/***/logs/dag_id=stripe_ingest_flow/run_id=manual__2023-08-17T01:04:41.723261+00:00/task_id=extract_from_stripe
 permission to 509
   [2023-08-16, 19:04:44 CST] {logging_mixin.py:149} INFO - Changing 
/opt/***/logs/dag_id=stripe_ingest_flow/run_id=manual__2023-08-17T01:04:41.723261+00:00/task_id=extract_from_stripe
 permission to 509
   [2023-08-16, 19:04:44 CST] {taskinstance.py:1103} INFO - Dependencies all 
met for dep_context=non-requeueable deps ti=<TaskInstance: 
stripe_ingest_flow.extract_from_stripe manual__2023-08-17T01:04:41.723261+00:00
   [queued]>
   [2023-08-16, 19:04:44 CST] {taskinstance.py:1103} INFO - Dependencies all 
met for dep_context=requeueable deps ti=<TaskInstance: 
stripe_ingest_flow.extract_from_stripe manual__2023-08-17T01:04:41.723261+00:00
   [queued]>
   [2023-08-16, 19:04:44 CST] {taskinstance.py:1308} INFO - Starting attempt 1 
of 1
   [2023-08-16, 19:04:44 CST] {taskinstance.py:1327} INFO - Executing 
<Task(_PythonDecoratedOperator): extract_from_stripe> on 2023-08-17 
01:04:41.723261+00:00
   [2023-08-16, 19:04:44 CST] {standard_task_runner.py:57} INFO - Started 
process 796 to run task
   [2023-08-16, 19:04:44 CST] {standard_task_runner.py:84} INFO - Running:
   ['***', 'tasks', 'run', 'stripe_ingest_flow', 'extract_from_stripe', 
'manual__2023-08-17T01:04:41.723261+00:00', '--job-id', '903', '--raw', 
'--subdir', 'DAGS_FOLDER/stripe_ingest_flow_to_lakehouse/dag.py', '--cfg-path', 
'/tmp/tmpqz8uvben']
   [2023-08-16, 19:04:44 CST] {standard_task_runner.py:85} INFO - Job 903: 
Subtask extract_from_stripe
   [2023-08-16, 19:04:44 CST] {logging_mixin.py:149} INFO - Changing 
/opt/***/logs/dag_id=stripe_ingest_flow/run_id=manual__2023-08-17T01:04:41.723261+00:00/task_id=extract_from_stripe
 permission to 509
   [2023-08-16, 19:04:44 CST] {task_command.py:410} INFO - Running 
<TaskInstance: stripe_ingest_flow.extract_from_stripe 
manual__2023-08-17T01:04:41.723261+00:00
   [running]> on host 470b2412b735
   [2023-08-16, 19:04:44 CST] {taskinstance.py:1545} INFO - Exporting env vars: 
AIRFLOW_CTX_DAG_OWNER='dhernandez' AIRFLOW_CTX_DAG_ID='stripe_ingest_flow' 
AIRFLOW_CTX_TASK_ID='extract_from_stripe' 
AIRFLOW_CTX_EXECUTION_DATE='2023-08-17T01:04:41.723261+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='manual__2023-08-17T01:04:41.723261+00:00'
   [2023-08-16, 19:04:44 CST] {crypto.py:83} WARNING - empty cryptography key - 
values will not be stored encrypted.
   [2023-08-16, 19:04:44 CST] {base.py:73} INFO - Using connection ID 
'siclo_***_lakehouse_conn' for task execution.
   [2023-08-16, 19:04:44 CST] {connection_wrapper.py:340} INFO - AWS Connection 
(conn_id='siclo_***_lakehouse_conn', conn_type='aws') credentials retrieved 
from login and password.
   [2023-08-16, 19:04:45 CST] {appflow.py:63} INFO - executionId: 
58ad6275-0a70-48d9-8414-f0215924c876
   
   **Logs when marking the task as success or failed**
   470b2412b735
   *** Found local files:
   ***   * 
/opt/airflow/logs/dag_id=stripe_ingest_flow/run_id=manual__2023-08-17T01:04:41.723261+00:00/task_id=extract_from_stripe/attempt=1.log
   [2023-08-16, 19:04:44 CST] {logging_mixin.py:149} INFO - Changing 
/opt/***/logs/dag_id=stripe_ingest_flow/run_id=manual__2023-08-17T01:04:41.723261+00:00/task_id=extract_from_stripe
 permission to 509
   [2023-08-16, 19:04:44 CST] {logging_mixin.py:149} INFO - Changing 
/opt/***/logs/dag_id=stripe_ingest_flow/run_id=manual__2023-08-17T01:04:41.723261+00:00/task_id=extract_from_stripe
 permission to 509
   [2023-08-16, 19:04:44 CST] {taskinstance.py:1103} INFO - Dependencies all 
met for dep_context=non-requeueable deps ti=<TaskInstance: 
stripe_ingest_flow.extract_from_stripe manual__2023-08-17T01:04:41.723261+00:00 
[queued]>
   [2023-08-16, 19:04:44 CST] {taskinstance.py:1103} INFO - Dependencies all 
met for dep_context=requeueable deps ti=<TaskInstance: 
stripe_ingest_flow.extract_from_stripe manual__2023-08-17T01:04:41.723261+00:00 
[queued]>
   [2023-08-16, 19:04:44 CST] {taskinstance.py:1308} INFO - Starting attempt 1 
of 1
   [2023-08-16, 19:04:44 CST] {taskinstance.py:1327} INFO - Executing 
<Task(_PythonDecoratedOperator): extract_from_stripe> on 2023-08-17 
01:04:41.723261+00:00
   [2023-08-16, 19:04:44 CST] {standard_task_runner.py:57} INFO - Started 
process 796 to run task
   [2023-08-16, 19:04:44 CST] {standard_task_runner.py:84} INFO - Running: 
['***', 'tasks', 'run', 'stripe_ingest_flow', 'extract_from_stripe', 
'manual__2023-08-17T01:04:41.723261+00:00', '--job-id', '903', '--raw', 
'--subdir', 'DAGS_FOLDER/stripe_ingest_flow_to_lakehouse/dag.py', '--cfg-path', 
'/tmp/tmpqz8uvben']
   [2023-08-16, 19:04:44 CST] {standard_task_runner.py:85} INFO - Job 903: 
Subtask extract_from_stripe
   [2023-08-16, 19:04:44 CST] {logging_mixin.py:149} INFO - Changing 
/opt/***/logs/dag_id=stripe_ingest_flow/run_id=manual__2023-08-17T01:04:41.723261+00:00/task_id=extract_from_stripe
 permission to 509
   [2023-08-16, 19:04:44 CST] {task_command.py:410} INFO - Running 
<TaskInstance: stripe_ingest_flow.extract_from_stripe 
manual__2023-08-17T01:04:41.723261+00:00 [running]> on host 470b2412b735
   [2023-08-16, 19:04:44 CST] {taskinstance.py:1545} INFO - Exporting env vars: 
AIRFLOW_CTX_DAG_OWNER='dhernandez' AIRFLOW_CTX_DAG_ID='stripe_ingest_flow' 
AIRFLOW_CTX_TASK_ID='extract_from_stripe' 
AIRFLOW_CTX_EXECUTION_DATE='2023-08-17T01:04:41.723261+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='manual__2023-08-17T01:04:41.723261+00:00'
   [2023-08-16, 19:04:44 CST] {crypto.py:83} WARNING - empty cryptography key - 
values will not be stored encrypted.
   [2023-08-16, 19:04:44 CST] {base.py:73} INFO - Using connection ID 
'siclo_***_lakehouse_conn' for task execution.
   [2023-08-16, 19:04:44 CST] {connection_wrapper.py:340} INFO - AWS Connection 
(conn_id='siclo_***_lakehouse_conn', conn_type='aws') credentials retrieved 
from login and password.
   [2023-08-16, 19:04:45 CST] {appflow.py:63} INFO - executionId: 
58ad6275-0a70-48d9-8414-f0215924c876
   [2023-08-16, 19:05:24 CST] {local_task_job_runner.py:291} WARNING - State of 
this instance has been externally set to failed. Terminating instance.
   [2023-08-16, 19:05:24 CST] {process_utils.py:131} INFO - Sending 15 to group 
796. PIDs of all processes in the group: [796]
   [2023-08-16, 19:05:24 CST] {process_utils.py:86} INFO - Sending the signal 
15 to group 796
   [2023-08-16, 19:05:24 CST] {taskinstance.py:1517} ERROR - Received SIGTERM. 
Terminating subprocesses.
   
   ### What you think should happen instead
   
   That having wait_for_completion set to True, the task finishes successfully 
and retrieves the execution id from appflow.
   
   ### How to reproduce
   
   With a dag that has the following task
   
   ```
   @task
   def extract():
       appflow = AppflowHook(
           aws_conn_id='conn_id'
       )
       execution_id = appflow.run_flow(
           flow_name='flow_name',
           wait_for_completion=True
           # with wait_for_completion=False if it works
       )
       return execution_id
   ```
   
   The aws connection has the following permissions
   - "appflow:DescribeFlow",
   -  "appflow:StartFlow",
   - "appflow:RunFlow",
   - "appflow:ListFlows",
   - "appflow:DescribeFlowExecutionRecords"
   
   ### Operating System
   
   Debian GNU/Linux 11 (bullseye)
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow==2.6.2
   apache-airflow-providers-amazon==8.5.1
   apache-airflow-providers-common-sql==1.5.1
   apache-airflow-providers-http==4.4.1
   boto3==1.26.76
   asgiref==3.7.2
   watchtower==2.0.1
   jsonpath-ng==1.5.3
   redshift-connector==2.0.911
   sqlalchemy-redshift==0.8.14
   mypy-boto3-appflow==1.28.16
   mypy-boto3-rds==1.26.144
   mypy-boto3-redshift-data==1.26.109
   mypy-boto3-s3==1.26.153
   celery==5.3.0
   
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   Docker 4.10.1 (82475)
   Airflow image apache/airflow:2.6.2-python3.11
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to