xmariachi opened a new issue, #35474:
URL: https://github.com/apache/airflow/issues/35474
### Apache Airflow version
Other Airflow 2 version (please specify below)
### What happened
Version: 2.5.1
Run env: MWAA on AWS
Summary: Once every ~500-1000 runs approximately, the task hangs up
infinitely until manually killed, not allowing any other task to be placed for
this dag; and so its `execution_timeout` is not enforced.
In my experience, it only happens on tasks that consume from Kafka using
library `confluent_kafka`. The `execution_timeout` is enforced in other tasks.
Dag definition code:
```
# Dag Info
default_args = {
"retries": 3,
"on_failure_callback": on_failure_callback,
"sla": timedelta(hours=2),
"execution_timeout": timedelta(hours=4),
}
@dag(SERVICE_NAME,
default_args=default_args,
schedule_interval="*/5 * * * *",
start_date=pendulum.datetime(2023, 7, 3, 9, tz="UTC"),
catchup=True,
tags=['critical', 'dumper', 'kafka'],
max_active_runs=1)
def process_records():
ingest_from_kafka_and_save()
```
The `ingest_from_kafka_and_save()` contains code that consumes from Kafka,
providing a callback function to the consumption (which I suspect may have
something to do with the problem, since it happens asynchronously).
It's hard to reproduce since it is temperamental and happens every once in a
while.
Audit Log does not show anything special - just seems the hang indefinitely.
Consumption code itself works fine otherwise and it has been running for
months in this and other dags that use it - but they also show the same
behaviour.
### What you think should happen instead
The `execution_timeout` should be enforced and the task should be killed so
a new one could be placed.
### How to reproduce
It is hard to reproduce, since it happens very unfrequently.
* Create a dag with the definition in the "What happened" section
* Add a function with a basic kafka consumption from a Kafka topic that
consumes until end of topic partitions (or a max number of messages)
* Leave it running and wait for the problem to happen
### Operating System
MWAA on AWS
### Versions of Apache Airflow Providers
--constraint
"https://raw.githubusercontent.com/apache/airflow/constraints-2.5.1/constraints-3.10.txt"
apache-airflow-providers-amazon
apache-airflow-providers-snowflake==4.0.2
apache-airflow-providers-mysql==4.0.0
apache-airflow-providers-slack
confluent-kafka==2.1.0
### Deployment
Amazon (AWS) MWAA
### Deployment details
Medium sized cluster
2.5.1 version, latest update applied 2 weeks ago.
### Anything else
Unclear what triggers the error - but whatever the error, the task should be
killed to enforce the `execution_timeout`.
Seems like an internal thread management issue.
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]