mrexhepi opened a new issue, #43449:
URL: https://github.com/apache/airflow/issues/43449
### Apache Airflow version
Other Airflow 2 version (please specify below)
### If "Other Airflow 2 version" selected, which one?
2.9.3
### What happened?
Whenever I start a DAG, Iam getting a timeout in scheduler and exit code
with some errors as below:
```
[34m2024-10-28T20:03:57.467+0000[0m]
{[34mscheduler_job_runner.py:[0m417} INFO[0m - 1 tasks up for execution:
--
<TaskInstance: edgetag_currency.task_pull
manual__2024-10-28T19:57:46.193365+00:00 [scheduled]>[0m
[[34m2024-10-28T20:03:57.468+0000[0m]
{[34mscheduler_job_runner.py:[0m480} INFO[0m - DAG edgetag_currency has 0/16
running and queued tasks[0m
[[34m2024-10-28T20:03:57.469+0000[0m]
{[34mscheduler_job_runner.py:[0m596} INFO[0m - Setting the following tasks
to queued state:
<TaskInstance: edgetag_currency.task_pull
manual__2024-10-28T19:57:46.193365+00:00 [scheduled]>[0m
[[34m2024-10-28T20:03:57.479+0000[0m]
{[34mscheduler_job_runner.py:[0m639} INFO[0m - Sending
TaskInstanceKey(dag_id='edgetag_currency', task_id='task_pull',
run_id='manual__2024-10-28T19:57:46.193365+00:00', try_number=2, map_index=-1)
to executor with priority 1 and queue default[0m
[[34m2024-10-28T20:03:57.480+0000[0m] {[34mbase_executor.py:[0m149}
INFO[0m - Adding to queue: ['airflow', 'tasks', 'run', 'edgetag_currency',
'task_pull', 'manual__2024-10-28T19:57:46.193365+00:00', '--local', '--subdir',
'DAGS_FOLDER/dags/currency_pull.py'][0m
[[34m2024-10-28T20:03:58.482+0000[0m] {[34mtimeout.py:[0m68} ERROR[0m -
Process timed out, PID: 7[0m
[[34m2024-10-28T20:03:58.491+0000[0m] {[34mcelery_executor.py:[0m279}
INFO[0m - [Try 1 of 3] Task Timeout Error for Task:
(TaskInstanceKey(dag_id='edgetag_currency', task_id='task_pull',
run_id='manual__2024-10-28T19:57:46.193365+00:00', try_number=2,
map_index=-1)).[0m
[[34m2024-10-28T20:03:59.567+0000[0m] {[34mtimeout.py:[0m68} ERROR[0m -
Process timed out, PID: 7[0m
[[34m2024-10-28T20:03:59.571+0000[0m] {[34mcelery_executor.py:[0m279}
INFO[0m - [Try 2 of 3] Task Timeout Error for Task:
(TaskInstanceKey(dag_id='edgetag_currency', task_id='task_pull',
run_id='manual__2024-10-28T19:57:46.193365+00:00', try_number=2,
map_index=-1)).[0m
[[34m2024-10-28T20:04:01.715+0000[0m] {[34mtimeout.py:[0m68} ERROR[0m -
Process timed out, PID: 7[0m
[[34m2024-10-28T20:04:01.720+0000[0m] {[34mcelery_executor.py:[0m279}
INFO[0m - [Try 3 of 3] Task Timeout Error for Task:
(TaskInstanceKey(dag_id='edgetag_currency', task_id='task_pull',
run_id='manual__2024-10-28T19:57:46.193365+00:00', try_number=2,
map_index=-1)).[0m
[[34m2024-10-28T20:04:03.529+0000[0m] {[34mtimeout.py:[0m68} ERROR[0m -
Process timed out, PID: 7[0m
[[34m2024-10-28T20:04:03.533+0000[0m] {[34mcelery_executor.py:[0m290}
ERROR[0m - Error sending Celery task: Timeout, PID: 7
Celery Task ID: TaskInstanceKey(dag_id='edgetag_currency',
task_id='task_pull', run_id='manual__2024-10-28T19:57:46.193365+00:00',
try_number=2, map_index=-1)
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.12/site-packages/kombu/utils/functional.py",
line 32, in __call__
return self.__value__
^^^^^^^^^^^^^^
AttributeError: 'ChannelPromise' object has no attribute '__value__'. Did
you mean: '__call__'?
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.12/site-packages/kombu/transport/virtual/base.py",
line 951, in create_channel
return self._avail_channels.pop()
^^^^^^^^^^^^^^^^^^^^^^^^^^
IndexError: pop from empty list
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/celery/executors/celery_executor_utils.py",
line 220, in send_task_to_executor
result = task_to_run.apply_async(args=[command], queue=queue)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/celery/app/task.py",
line 594, in apply_async
return app.send_task(
^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/celery/app/base.py",
line 801, in send_task
amqp.send_task_message(P, name, message, **options)
File "/home/airflow/.local/lib/python3.12/site-packages/celery/app/amqp.py",
line 518, in send_task_message
ret = producer.publish(
^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/kombu/messaging.py",
line 186, in publish
return _publish(
^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/kombu/connection.py", line
556, in _ensured
return fun(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/kombu/messaging.py",
line 195, in _publish
channel = self.channel
^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/kombu/messaging.py",
line 218, in _get_channel
channel = self._channel = channel()
^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/kombu/utils/functional.py",
line 34, in __call__
value = self.__value__ = self.__contract__()
^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/kombu/messaging.py",
line 234, in <lambda>
channel = ChannelPromise(lambda: connection.default_channel)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/kombu/connection.py", line
953, in default_channel
self._ensure_connection(**conn_opts)
File
"/home/airflow/.local/lib/python3.12/site-packages/kombu/connection.py", line
459, in _ensure_connection
return retry_over_time(
^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/kombu/utils/functional.py",
line 318, in retry_over_time
return fun(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/kombu/connection.py", line
934, in _connection_factory
self._connection = self._establish_connection()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/kombu/connection.py", line
860, in _establish_connection
conn = self.transport.establish_connection()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/kombu/transport/virtual/base.py",
line 975, in establish_connection
self._avail_channels.append(self.create_channel(self))
^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/kombu/transport/virtual/base.py",
line 953, in create_channel
channel = self.Channel(connection)
^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/kombu/transport/redis.py",
line 744, in __init__
self.client.ping()
File
"/home/airflow/.local/lib/python3.12/site-packages/redis/commands/core.py",
line 1217, in ping
return self.execute_command("PING", **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/redis/client.py",
line 542, in execute_command
conn = self.connection or pool.get_connection(command_name, **options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/redis/connection.py", line
1109, in get_connection
connection.connect()
File
"/home/airflow/.local/lib/python3.12/site-packages/redis/connection.py", line
288, in connect
self.on_connect()
File
"/home/airflow/.local/lib/python3.12/site-packages/redis/connection.py", line
354, in on_connect
auth_response = self.read_response()
^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/redis/connection.py", line
512, in read_response
response = self._parser.read_response(disable_decoding=disable_decoding)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/redis/_parsers/resp2.py",
line 15, in read_response
result = self._read_response(disable_decoding=disable_decoding)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/redis/_parsers/resp2.py",
line 25, in _read_response
raw = self._buffer.readline()
^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/redis/_parsers/socket.py",
line 115, in readline
self._read_from_socket()
File
"/home/airflow/.local/lib/python3.12/site-packages/redis/_parsers/socket.py",
line 65, in _read_from_socket
data = self._sock.recv(socket_read_size)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/timeout.py",
line 69, in handle_timeout
raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 7
[0m
[[34m2024-10-28T20:04:03.534+0000[0m]
{[34mscheduler_job_runner.py:[0m689} INFO[0m - Received executor event with
state failed for task instance TaskInstanceKey(dag_id='edgetag_currency',
task_id='task_pull', run_id='manual__2024-10-28T19:57:46.193365+00:00',
try_number=2, map_index=-1)[0m
[[34m2024-10-28T20:04:03.545+0000[0m]
{[34mscheduler_job_runner.py:[0m721} INFO[0m - TaskInstance Finished:
dag_id=edgetag_currency, task_id=task_pull,
run_id=manual__2024-10-28T19:57:46.193365+00:00, map_index=-1,
run_start_date=None, run_end_date=2024-10-28 19:57:57.429347+00:00,
run_duration=None, state=queued, executor_state=failed, try_number=2,
max_tries=1, job_id=None, pool=default_pool, queue=default, priority_weight=1,
operator=_PythonDecoratedOperator, queued_dttm=2024-10-28
20:03:57.470287+00:00, queued_by_job_id=1, pid=None[0m
[[34m2024-10-28T20:04:03.545+0000[0m] {[34mtask_context_logger.py:[0m91}
ERROR[0m - The executor reported that the task instance <TaskInstance:
edgetag_currency.task_pull manual__2024-10-28T19:57:46.193365+00:00 [queued]>
finished with state failed, but the task instance's state attribute is queued.
Learn more:
https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally[0m
[[34m2024-10-28T20:04:04.619+0000[0m] {[34mdagrun.py:[0m819} ERROR[0m -
Marking run <DagRun edgetag_currency @ 2024-10-28 19:57:46.193365+00:00:
manual__2024-10-28T19:57:46.193365+00:00, state:running, queued_at: 2024-10-28
19:57:46.229834+00:00. externally triggered: True> failed[0m
[[34m2024-10-28T20:04:04.620+0000[0m] {[34mdagrun.py:[0m901} INFO[0m -
DagRun Finished: dag_id=edgetag_currency, execution_date=2024-10-28
19:57:46.193365+00:00, run_id=manual__2024-10-28T19:57:46.193365+00:00,
run_start_date=2024-10-28 19:57:47.166462+00:00, run_end_date=2024-10-28
20:04:04.619902+00:00, run_duration=377.45344, state=failed,
external_trigger=True, run_type=manual, data_interval_start=2024-10-28
08:00:00+00:00, data_interval_end=2024-10-28 16:00:00+00:00,
dag_hash=eff246fcd6ccb6a38bec38f44b82f466[0m
```
Current needed vars for timeout are been extended mostly by default values.
This is AWS ECS fargate with 3 services (webserver,scheduler and worker)
Current redis is 7.0.7 , I did test also with version 6
Postgres is pg14
### What you think should happen instead?
Dag should run succesfully
### How to reproduce
Deploy Airflow 2.9.3 to AWS ECS fargate and try to run a DAG
### Operating System
Windows/Ubuntu
### Versions of Apache Airflow Providers
n/a
### Deployment
Other Docker-based deployment
### Deployment details
AWS ECS Fargate
### Anything else?
My current vars
AIRFLOW__CELERY__BROKER_URL = **********
AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT=600
AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT=180
AIRFLOW__CORE__EXECUTOR=CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN=**********
AIRFLOW__CORE__STORE_SERIALIZED_DAGS=True
AIRFLOW__WEBSERVER__LOG_FETCH_DELAY_SEC=30
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC=60
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]