henriquemeloo opened a new issue, #37449:
URL: https://github.com/apache/airflow/issues/37449
### Apache Airflow Provider(s)
airbyte
### Versions of Apache Airflow Providers
apache-airflow-providers-airbyte==3.6.0
apache-airflow-providers-http==4.5.1
### Apache Airflow version
2.7.1
### Operating System
Debian GNU/Linux 11 (bullseye)
### Deployment
Docker-Compose
### Deployment details
_No response_
### What happened
`AirbyteTriggerSyncOperator` does not work properly in deferrable mode.
### What you think should happen instead
_No response_
### How to reproduce
Create the following DAG, replacing
`"id_of_airbyte_connection_to_be_synced"` with the corresponding value.
```python
from datetime import datetime
from airflow import DAG
from airflow.providers.airbyte.operators.airbyte import \
AirbyteTriggerSyncOperator
AIRFLOW_AIRBYTE_CONN_ID = "airbyte_default"
AIRBYTE_CONNECTION_ID = "id_of_airbyte_connection_to_be_synced"
with DAG("test_dag", start_date=datetime.min, catchup=False) as dag:
not_deferrable = AirbyteTriggerSyncOperator(
task_id="not_deferrable",
airbyte_conn_id=AIRFLOW_AIRBYTE_CONN_ID,
connection_id=AIRBYTE_CONNECTION_ID,
deferrable=False
)
deferrable = AirbyteTriggerSyncOperator(
task_id="deferrable",
airbyte_conn_id=AIRFLOW_AIRBYTE_CONN_ID,
connection_id=AIRBYTE_CONNECTION_ID,
deferrable=True
)
```
The `not_deferrable` task works, while the `deferrable` task fails. The
`"airbyte_default"` connection is set in all containers via the environment
variable
```
AIRFLOW_CONN_AIRBYTE_DEFAULT='{
"conn_type": "airbyte",
"host": "airbyte-proxy",
"port": 8000
}'
```
### Anything else
The deferrable task fails with the following log:
<details><summary>deferrable.log</summary>
```
78f82177fe4f
*** Found local files:
*** *
/opt/airflow/logs/dag_id=test_dag/run_id=scheduled__2024-02-14T15:14:37.364888+00:00/task_id=deferrable/attempt=1.log
*** *
/opt/airflow/logs/dag_id=test_dag/run_id=scheduled__2024-02-14T15:14:37.364888+00:00/task_id=deferrable/attempt=1.log.trigger.1205.log
[2024-02-15, 15:14:38 UTC] {taskinstance.py:1157} INFO - Dependencies all
met for dep_context=non-requeueable deps ti=<TaskInstance: test_dag.deferrable
scheduled__2024-02-14T15:14:37.364888+00:00 [queued]>
[2024-02-15, 15:14:38 UTC] {taskinstance.py:1157} INFO - Dependencies all
met for dep_context=requeueable deps ti=<TaskInstance: test_dag.deferrable
scheduled__2024-02-14T15:14:37.364888+00:00 [queued]>
[2024-02-15, 15:14:38 UTC] {taskinstance.py:1359} INFO - Starting attempt 1
of 1
[2024-02-15, 15:14:38 UTC] {taskinstance.py:1380} INFO - Executing
<Task(AirbyteTriggerSyncOperator): deferrable> on 2024-02-14
15:14:37.364888+00:00
[2024-02-15, 15:14:38 UTC] {standard_task_runner.py:57} INFO - Started
process 7210 to run task
[2024-02-15, 15:14:38 UTC] {standard_task_runner.py:84} INFO - Running:
['***', 'tasks', 'run', 'test_dag', 'deferrable',
'scheduled__2024-02-14T15:14:37.364888+00:00', '--job-id', '1213', '--raw',
'--subdir', 'DAGS_FOLDER/test_dag.py', '--cfg-path', '/var/tmp/tmp7v9zx5oh']
[2024-02-15, 15:14:38 UTC] {standard_task_runner.py:85} INFO - Job 1213:
Subtask deferrable
[2024-02-15, 15:14:38 UTC] {task_command.py:415} INFO - Running
<TaskInstance: test_dag.deferrable scheduled__2024-02-14T15:14:37.364888+00:00
[running]> on host 78f82177fe4f
[2024-02-15, 15:14:38 UTC] {taskinstance.py:1660} INFO - Exporting env vars:
AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='test_dag'
AIRFLOW_CTX_TASK_ID='deferrable'
AIRFLOW_CTX_EXECUTION_DATE='2024-02-14T15:14:37.364888+00:00'
AIRFLOW_CTX_TRY_NUMBER='1'
AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-02-14T15:14:37.364888+00:00'
[2024-02-15, 15:14:38 UTC] {base.py:73} INFO - Using connection ID
'airbyte_default' for task execution.
[2024-02-15, 15:14:43 UTC] {airbyte.py:86} INFO - Job 224 was submitted to
Airbyte Server
[2024-02-15, 15:14:43 UTC] {airbyte.py:88} INFO - Waiting for job 224 to
complete
[2024-02-15, 15:14:43 UTC] {taskinstance.py:1524} INFO - Pausing task as
DEFERRED. dag_id=test_dag, task_id=deferrable, execution_date=20240214T151437,
start_date=20240215T151438
[2024-02-15, 15:14:44 UTC] {local_task_job_runner.py:225} INFO - Task exited
with return code 100 (task deferral)
[2024-02-15, 15:14:44 UTC] {airbyte.py:104} INFO - Getting the status of job
run 224.
[2024-02-15, 15:14:44 UTC] {base.py:73} INFO - Using connection ID
'airbyte_default' for task execution.
[2024-02-15, 15:14:44 UTC] {airbyte.py:88} INFO - URL for api request:
airbyte-proxy/api/v1/jobs/get
[2024-02-15, 15:14:44 UTC] {triggerer_job_runner.py:599} INFO - Trigger
test_dag/scheduled__2024-02-14T15:14:37.364888+00:00/deferrable/-1/1 (ID 17)
fired: TriggerEvent<{'status': 'error', 'message':
'airbyte-proxy/api/v1/jobs/get', 'job_id': 224}>
[2024-02-15, 15:14:46 UTC] {taskinstance.py:1157} INFO - Dependencies all
met for dep_context=non-requeueable deps ti=<TaskInstance: test_dag.deferrable
scheduled__2024-02-14T15:14:37.364888+00:00 [queued]>
[2024-02-15, 15:14:46 UTC] {taskinstance.py:1157} INFO - Dependencies all
met for dep_context=requeueable deps ti=<TaskInstance: test_dag.deferrable
scheduled__2024-02-14T15:14:37.364888+00:00 [queued]>
[2024-02-15, 15:14:46 UTC] {taskinstance.py:1357} INFO - Resuming after
deferral
[2024-02-15, 15:14:46 UTC] {taskinstance.py:1380} INFO - Executing
<Task(AirbyteTriggerSyncOperator): deferrable> on 2024-02-14
15:14:37.364888+00:00
[2024-02-15, 15:14:46 UTC] {standard_task_runner.py:57} INFO - Started
process 7238 to run task
[2024-02-15, 15:14:46 UTC] {standard_task_runner.py:84} INFO - Running:
['***', 'tasks', 'run', 'test_dag', 'deferrable',
'scheduled__2024-02-14T15:14:37.364888+00:00', '--job-id', '1214', '--raw',
'--subdir', 'DAGS_FOLDER/test_dag.py', '--cfg-path', '/var/tmp/tmp7eh43099']
[2024-02-15, 15:14:46 UTC] {standard_task_runner.py:85} INFO - Job 1214:
Subtask deferrable
[2024-02-15, 15:14:46 UTC] {task_command.py:415} INFO - Running
<TaskInstance: test_dag.deferrable scheduled__2024-02-14T15:14:37.364888+00:00
[running]> on host 78f82177fe4f
[2024-02-15, 15:14:46 UTC] {taskinstance.py:1935} ERROR - Task failed with
exception
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py",
line 1608, in resume_execution
return execute_callable(context)
^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/airbyte/operators/airbyte.py",
line 124, in execute_complete
raise AirflowException(event["message"])
airflow.exceptions.AirflowException: airbyte-proxy/api/v1/jobs/get
[2024-02-15, 15:14:46 UTC] {taskinstance.py:1398} INFO - Marking task as
FAILED. dag_id=test_dag, task_id=deferrable, execution_date=20240214T151437,
start_date=20240215T151438, end_date=20240215T151446
[2024-02-15, 15:14:46 UTC] {standard_task_runner.py:104} ERROR - Failed to
execute job 1214 for task deferrable (airbyte-proxy/api/v1/jobs/get; 7238)
[2024-02-15, 15:14:46 UTC] {local_task_job_runner.py:228} INFO - Task exited
with return code 1
[2024-02-15, 15:14:46 UTC] {taskinstance.py:2776} INFO - 0 downstream tasks
scheduled from follow-on schedule check
```
</details>
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]