ecodina opened a new issue, #61916:
URL: https://github.com/apache/airflow/issues/61916
### Apache Airflow version
3.1.7
### If "Other Airflow 3 version" selected, which one?
_No response_
### What happened?
The triggerer fails every few hours with the OSError "Too many open files".
In there I run `ExternalTaskSensor` as well as a custom trigger (below).
I thought it could be related to #56366, but my triggerer does not use the
`cleanup` method. I've also seen issues for the worker (#51624) and
dagprocessor (#49887).
I have been investigating and see that `/proc/7/fd` always increases.
Instead, `/proc/24/fd` does handle closing files / sockets correctly. From what
I've seen, my trigger code runs as PID 24 (used `os.getpid()` to verify it), so
PID 7 is probably the parent process:
```
root@99ee11a02435:/opt/airflow# ps aux
USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
default 1 0.0 0.0 2336 1024 ? Ss 16:59 0:00
/usr/bin/dumb-init -- /entrypoint triggerer --skip-serve-logs
default 7 2.3 2.7 386360 221004 ? Ssl 16:59 0:16
/usr/python/bin/python3.12 /home/airflow/.local/bin/airflow triggerer
--skip-serve-logs
default 24 0.2 1.8 359924 153816 ? Sl 17:01 0:01
/usr/python/bin/python3.12 /home/airflow/.local/bin/airflow triggerer
--skip-serve-logs
```
<details>
<summary>What my custom trigger does</summary>
My trigger basically connects to a Redis database and waits for a certain
key to change to a certain status.
```python
class MyTrigger(BaseTrigger):
...
async def check_slurm_state(self, redis_conn: Redis):
"""
Checks the slurm's job state every *self.polling_interval* on the
Redis database.
"""
finished_ok = False
final_message = ""
while True:
state = await self.get_sacct_output(redis_conn)
# We check if state in SACCT_RUNNING in case it is stuck in a
completed state
if (
self.last_known_state == state["state"]
and state["state"] in SACCT_RUNNING
or state["state"] == "UNKNOWN"
):
await asyncio.sleep(self.polling_interval)
continue
# The state has changed!
self.log.ainfo(f"Job has changed to status {state['state']}")
# I've also tried self.log.info
await self.store_state(redis_conn, state["state"])
is_finished, finished_ok, final_message = await
self.parse_state_change(
state["state"], state["reason"]
)
self.last_known_state = state["state"]
if not is_finished:
await asyncio.sleep(self.polling_interval)
else:
break
return finished_ok, final_message
async def run(self):
redis_hook = RedisHook(redis_conn_id="my_redis_conn")
conn = await redis_hook.aget_connection(redis_hook.redis_conn_id)
redis_client = Redis(
host=conn.host,
port=conn.port,
username=conn.login,
password=None
if str(conn.password).lower() in ["none", "false", ""]
else conn.password,
db=conn.extra_dejson.get("db"),
max_connections=5,
decode_responses=True,
)
async with redis_client:
...
finished_ok, final_message = await
self.check_slurm_state(redis_client)
...
yield TriggerEvent({"finished_ok": finished_ok, "final_message":
final_message)
```
</details>
When I saw this problem I thought that it may be due to logging. We use a
custom FileTaskHandler, but have configured the trigger **not** to use it by
setting the variable AIRFLOW__LOGGING__LOGGING_CONFIG_CLASS to "":
```bash
default@99ee11a02435:/opt/airflow$ airflow info
Apache Airflow
version | 3.1.7
executor | LocalExecutor
task_logging_handler | airflow.utils.log.file_task_handler.FileTaskHandler
sql_alchemy_conn | postgresql+psycopg2://db_editor:****@db:5432/airflow
dags_folder | /opt/airflow/dags
plugins_folder | /opt/airflow/plugins
base_log_folder | /opt/airflow/logs
remote_base_log_folder |
```
### What you think should happen instead?
Files / sockets should be closed when they are no longer needed for PID 7.
### How to reproduce
Run Airflow in Docker and create 2 Dags: a "parent" Dag, and a "child" Dag
with an ExternalTaskSensor in deferrable mode.
Access the container and check how many `fd` are there for each PID:
```
ls /proc/7/fd | wc -l
ls /proc/24/fd | wc -l
```
You'll see that the number of `fd` for PID 24 increases when the
ExternalTaskSensor starts, and decreases when it finishes.
You'll also see that the number of `fd` for PID 7 increases when the
ExternalTaskSensor starts, but never decreases.
### Operating System
Debian GNU/Linux 12 (bookworm)
### Versions of Apache Airflow Providers
apache-airflow-providers-common-compat==1.13.0
apache-airflow-providers-common-io==1.7.1
apache-airflow-providers-common-sql==1.30.4
apache-airflow-providers-ftp==3.14.1
apache-airflow-providers-git==0.2.2
apache-airflow-providers-http==5.6.4
apache-airflow-providers-imap==3.10.3
apache-airflow-providers-keycloak==0.5.1
apache-airflow-providers-postgres==6.5.3
apache-airflow-providers-redis==4.4.2
apache-airflow-providers-sftp==5.7.0
apache-airflow-providers-smtp==2.4.2
apache-airflow-providers-ssh==4.3.1
apache-airflow-providers-standard==1.11.0
### Deployment
Docker-Compose
### Deployment details
- We use Docker Swarm.
- We use Airflow's Docker image where we install our own provider that has
our trigger, as well as some other providers.
- We use python 3.12
- The logs volume is mounted as an NFS volume with options
`nfsvers=4.2,rw,noatime,nocto,actimeo=5,nolock`
### Anything else?
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]