antonio-antuan opened a new issue, #33618:
URL: https://github.com/apache/airflow/issues/33618
### Apache Airflow version
Other Airflow 2 version (please specify below)
### What happened
checked on 2.6.1, 2.7.0
I use LocalExecutor with airflow running in docker container.
Dag's code looks like this:
```
def get_engine(conn_id = None):
return
PostgresHook(postgres_conn_id="postgres_default").get_sqlalchemy_engine()
@dag(
"foo",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=None,
catchup=False,
max_active_runs=1,
)
def foo():
@task
def run():
with get_engine().begin() as tr:
tr.execute('select pg_sleep(6000)')
run()
foo()
```
During the task run I restart docker container with something like
`docker-compose restart scheduler` (which is actually a worker).
After that I can see two queries running in `pg_stat_activity`. If I restart
one more time - there are three queries and so on.
Here are logs (always are the same for restarts):
```
[2023-08-22 15:55:48 +0000] [21] [INFO] Handling signal: term
[2023-08-22 15:55:48 +0000] [22] [INFO] Worker exiting (pid: 22)
[2023-08-22 15:55:48 +0000] [24] [INFO] Worker exiting (pid: 24)
[2023-08-22T15:55:48.159+0000] {scheduler_job_runner.py:247} INFO - Exiting
gracefully upon receiving signal 15
[2023-08-22 15:55:48 +0000] [21] [INFO] Shutting down: Master
[2023-08-22T15:55:49.162+0000] {process_utils.py:131} INFO - Sending
Signals.SIGTERM to group 159. PIDs of all processes in the group: [159]
[2023-08-22T15:55:49.162+0000] {process_utils.py:86} INFO - Sending the
signal Signals.SIGTERM to group 159
[2023-08-22T15:55:49.295+0000] {process_utils.py:79} INFO - Process
psutil.Process(pid=159, status='terminated', exitcode=0, started='15:55:37')
(159) terminated with exit code 0
[2023-08-22T15:55:49.295+0000] {local_executor.py:400} INFO - Shutting down
LocalExecutor; waiting for running tasks to finish. Signal again if you don't
want to wait.
[2023-08-22T15:55:49.295+0000] {scheduler_job_runner.py:865} ERROR -
Exception when executing Executor.end
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
line 842, in _execute
self._run_scheduler_loop()
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
line 999, in _run_scheduler_loop
time.sleep(min(self._scheduler_idle_sleep_time, next_event if next_event
else 0))
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
line 250, in _exit_gracefully
sys.exit(os.EX_OK)
SystemExit: 0
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
line 863, in _execute
self.job.executor.end()
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/executors/local_executor.py",
line 404, in end
self.impl.end()
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/executors/local_executor.py",
line 345, in end
self.queue.put((None, None))
File "<string>", line 2, in put
File "/usr/local/lib/python3.10/multiprocessing/managers.py", line 817, in
_callmethod
conn.send((self._id, methodname, args, kwds))
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 206,
in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 411,
in _send_bytes
self._send(header + buf)
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 368,
in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
[2023-08-22T15:55:49.298+0000] {process_utils.py:131} INFO - Sending
Signals.SIGTERM to group 159. PIDs of all processes in the group: []
[2023-08-22T15:55:49.298+0000] {process_utils.py:86} INFO - Sending the
signal Signals.SIGTERM to group 159
[2023-08-22T15:55:49.298+0000] {process_utils.py:100} INFO - Sending the
signal Signals.SIGTERM to process 159 as process group is missing.
[2023-08-22T15:55:49.298+0000] {scheduler_job_runner.py:871} INFO - Exited
execute loop
```
### What you think should happen instead
the db connection that is opened when task is running, must be closed.
### How to reproduce
1. put the code
2. restart container
3. run in db `select * from pg_stat_activity where state != 'idle'`
### Operating System
Arch Linux
### Versions of Apache Airflow Providers
apache-airflow-providers-amazon==8.0.0
apache-airflow-providers-common-sql==1.4.0
apache-airflow-providers-google==10.0.0
apache-airflow-providers-http==4.3.0
apache-airflow-providers-postgres==5.4.0
### Deployment
Docker-Compose
### Deployment details
almost the same installation is used in our AWS ECS.
docker-compose.yaml.
```
---
version: '3.4'
x-airflow-common:
&airflow-common
build:
dockerfile: local.dockerfile
ulimits:
nofile:
soft: 65536
hard: 65536
user: "${AIRFLOW_UID:-0}:0"
env_file:
- "${ENV_FILE:-.env}"
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./shared:/mnt/shared
x-depends-on:
&depends-on
depends_on:
postgres:
condition: service_healthy
services:
postgres:
image: postgres:13
ports:
- "5432:5432"
command: ["postgres", "-c", "log_statement=all"]
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
restart: unless-stopped
env_file:
- "${ENV_FILE:-.env}"
logging:
options:
# in case of "log_statement=all" logs may have a very large size. so
limit it (unlimited by default).
max-size: "100mb"
scheduler:
<<: [*airflow-common, *depends-on]
command: ["airflow", "scheduler"]
restart: unless-stopped
ports:
- "8793:8793"
extra_hosts:
- "host.docker.internal:host-gateway"
webserver:
<<: [*airflow-common, *depends-on]
command: ["airflow", "webserver"]
restart: unless-stopped
ports:
- "8080:8080"
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 5s
timeout: 30s
retries: 10
```
local.dockerfile:
```
FROM apache/airflow:slim-2.7.0-python3.10
USER airflow
COPY requirements.txt /requirements.txt
RUN pip install -r /requirements.txt
```
requirements are installed with `-c
https://raw.githubusercontent.com/apache/airflow/constraints-2.6.1/constraints-3.10.txt`
### Anything else
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]