antonio-antuan opened a new issue, #33618:
URL: https://github.com/apache/airflow/issues/33618

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   checked on 2.6.1, 2.7.0
   
   I use LocalExecutor with airflow running in docker container.
   
   Dag's code looks like this:
   ```
   def get_engine(conn_id = None):
       return 
PostgresHook(postgres_conn_id="postgres_default").get_sqlalchemy_engine()
   
   @dag(
       "foo",
       start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
       schedule=None,
       catchup=False,
       max_active_runs=1,
   )
   def foo():
       @task
       def run():
           with get_engine().begin() as tr:
               tr.execute('select pg_sleep(6000)')
       run()
   
   
   foo()
   ```
   
   During the task run I restart docker container with something like 
`docker-compose restart scheduler` (which is actually a worker).
   After that I can see two queries running in `pg_stat_activity`. If I restart 
one more time - there are three queries and so on.
   
   Here are logs (always are the same for restarts):
   ```
   [2023-08-22 15:55:48 +0000] [21] [INFO] Handling signal: term
   [2023-08-22 15:55:48 +0000] [22] [INFO] Worker exiting (pid: 22)
   [2023-08-22 15:55:48 +0000] [24] [INFO] Worker exiting (pid: 24)
   [2023-08-22T15:55:48.159+0000] {scheduler_job_runner.py:247} INFO - Exiting 
gracefully upon receiving signal 15
   [2023-08-22 15:55:48 +0000] [21] [INFO] Shutting down: Master
   [2023-08-22T15:55:49.162+0000] {process_utils.py:131} INFO - Sending 
Signals.SIGTERM to group 159. PIDs of all processes in the group: [159]
   [2023-08-22T15:55:49.162+0000] {process_utils.py:86} INFO - Sending the 
signal Signals.SIGTERM to group 159
   [2023-08-22T15:55:49.295+0000] {process_utils.py:79} INFO - Process 
psutil.Process(pid=159, status='terminated', exitcode=0, started='15:55:37') 
(159) terminated with exit code 0
   [2023-08-22T15:55:49.295+0000] {local_executor.py:400} INFO - Shutting down 
LocalExecutor; waiting for running tasks to finish.  Signal again if you don't 
want to wait.
   [2023-08-22T15:55:49.295+0000] {scheduler_job_runner.py:865} ERROR - 
Exception when executing Executor.end
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 842, in _execute
       self._run_scheduler_loop()
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 999, in _run_scheduler_loop
       time.sleep(min(self._scheduler_idle_sleep_time, next_event if next_event 
else 0))
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 250, in _exit_gracefully
       sys.exit(os.EX_OK)
   SystemExit: 0
   
   
   During handling of the above exception, another exception occurred:
   
   
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 863, in _execute
       self.job.executor.end()
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/executors/local_executor.py",
 line 404, in end
       self.impl.end()
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/executors/local_executor.py",
 line 345, in end
       self.queue.put((None, None))
     File "<string>", line 2, in put
     File "/usr/local/lib/python3.10/multiprocessing/managers.py", line 817, in 
_callmethod
       conn.send((self._id, methodname, args, kwds))
     File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 206, 
in send
       self._send_bytes(_ForkingPickler.dumps(obj))
     File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 411, 
in _send_bytes
       self._send(header + buf)
     File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 368, 
in _send
       n = write(self._handle, buf)
   BrokenPipeError: [Errno 32] Broken pipe
   [2023-08-22T15:55:49.298+0000] {process_utils.py:131} INFO - Sending 
Signals.SIGTERM to group 159. PIDs of all processes in the group: []
   [2023-08-22T15:55:49.298+0000] {process_utils.py:86} INFO - Sending the 
signal Signals.SIGTERM to group 159
   [2023-08-22T15:55:49.298+0000] {process_utils.py:100} INFO - Sending the 
signal Signals.SIGTERM to process 159 as process group is missing.
   [2023-08-22T15:55:49.298+0000] {scheduler_job_runner.py:871} INFO - Exited 
execute loop
   ```
   
   ### What you think should happen instead
   
   the db connection that is opened when task is running, must be closed.
   
   ### How to reproduce
   
   1. put the code
   2. restart container
   3. run in db `select * from pg_stat_activity where state != 'idle'`
   
   ### Operating System
   
   Arch Linux
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-amazon==8.0.0
   apache-airflow-providers-common-sql==1.4.0
   apache-airflow-providers-google==10.0.0
   apache-airflow-providers-http==4.3.0
   apache-airflow-providers-postgres==5.4.0
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   almost the same installation is used in our AWS ECS.
   
   docker-compose.yaml.
   ```
   ---
   version: '3.4'
   
   x-airflow-common:
     &airflow-common
     build:
       dockerfile: local.dockerfile
     ulimits:
       nofile:
         soft: 65536
         hard: 65536
     user: "${AIRFLOW_UID:-0}:0"
     env_file:
       - "${ENV_FILE:-.env}"
     volumes:
       - ./dags:/opt/airflow/dags
       - ./logs:/opt/airflow/logs
       - ./shared:/mnt/shared
   
   x-depends-on:
     &depends-on
     depends_on:
       postgres:
         condition: service_healthy
   
   services:
     postgres:
       image: postgres:13
       ports:
         - "5432:5432"
       command: ["postgres", "-c", "log_statement=all"]
       healthcheck:
         test: ["CMD", "pg_isready", "-U", "airflow"]
         interval: 5s
         retries: 5
       restart: unless-stopped
       env_file:
         - "${ENV_FILE:-.env}"
       logging:
         options:
           # in case of "log_statement=all" logs may have a very large size. so 
limit it (unlimited by default).
           max-size: "100mb"
   
     scheduler:
       <<: [*airflow-common, *depends-on]
       command: ["airflow", "scheduler"]
       restart: unless-stopped
       ports:
         - "8793:8793"
       extra_hosts:
         - "host.docker.internal:host-gateway"
   
     webserver:
       <<: [*airflow-common, *depends-on]
       command: ["airflow", "webserver"]
       restart: unless-stopped
       ports:
         - "8080:8080"
       healthcheck:
         test: ["CMD", "curl", "--fail", "http://localhost:8080/health";]
         interval: 5s
         timeout: 30s
         retries: 10
   ```
   
   local.dockerfile:
   ```
   FROM apache/airflow:slim-2.7.0-python3.10
   
   USER airflow
   COPY requirements.txt /requirements.txt
   RUN pip install -r /requirements.txt
   ```
   
   requirements are installed with `-c 
https://raw.githubusercontent.com/apache/airflow/constraints-2.6.1/constraints-3.10.txt`
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to