ordonezf opened a new issue #18066:
URL: https://github.com/apache/airflow/issues/18066


   ### Official Helm Chart version
   
   1.1.0 (latest released)
   
   ### Apache Airflow version
   
   2.1.3 (latest released)
   
   ### Kubernetes Version
   
   1.19.13
   
   ### Helm Chart configuration
   
   ```yaml
   executor: "CeleryExecutor"
   workers:
     # Number of airflow celery workers in StatefulSet
     replicas: 1
     # Below is the default value, it does not work
     command: ~
     args:
        - "bash"
        - "-c"
        - |-
          exec \
          airflow celery worker
   ```
   
   ### Docker Image customisations
   
   ```dockerfile
   FROM apache/airflow:2.1.3-python3.7
   
   ENV AIRFLOW_HOME=/opt/airflow
   
   USER root
   
   RUN set -ex \
       && buildDeps=' \
           python3-dev \
           libkrb5-dev \
           libssl-dev \
           libffi-dev \
           build-essential \
           libblas-dev \
           liblapack-dev \
           libpq-dev \
           gcc \
           g++ \
       ' \
       && apt-get update -yqq \
       && apt-get upgrade -yqq \
       && apt-get install -yqq --no-install-recommends \
           $buildDeps \
           libsasl2-dev \
           libsasl2-modules \
           apt-utils \
           curl \
           vim \
           rsync \
           netcat \
           locales \
           sudo \
           patch \
           libpq5 \
       && apt-get autoremove -yqq  --purge\
       && apt-get clean \
       && rm -rf /var/lib/apt/lists/*
   
   USER airflow
   
   COPY --chown=airflow:root requirements*.txt /tmp/
   
   RUN pip install -U pip setuptools wheel cython \
       && pip install -r /tmp/requirements_providers.txt \
       && pip install -r /tmp/requirements.txt
   
   COPY --chown=airflow:root setup.py /tmp/custom_operators/
   COPY --chown=airflow:root custom_operators/ 
/tmp/custom_operators/custom_operators/
   
   RUN pip install /tmp/custom_operatos
   
   COPY --chown=airflow:root entrypoint*.sh /
   COPY --chown=airflow:root config/ ${AIRFLOW_HOME}/config/
   COPY --chown=airflow:root airflow.cfg ${AIRFLOW_HOME}/
   COPY --chown=airflow:root dags/ ${AIRFLOW_HOME}/dags
   ```
   
   ### What happened
   
   Using CeleryExecutor whenever I kill a worker pod that is running a task 
with `kubectl delete pod` or a `helm upgrade` the pod gets instantly killed and 
does not wait for the task to finish or the end of 
terminationGracePeriodSeconds.
   
   ### What you expected to happen
   
   I expect the worker to finish all it's tasks inside the grace period before 
being killed
   
   Killing the pod when it's running a task throws this
   ```bash
   k logs -f airflow-worker-86d78f7477-rjljs
   
    * Serving Flask app "airflow.utils.serve_logs" (lazy loading)
    * Environment: production
      WARNING: This is a development server. Do not use it in a production 
deployment.
      Use a production WSGI server instead.
    * Debug mode: off
   [2021-09-07 16:26:25,612] {_internal.py:113} INFO -  * Running on 
http://0.0.0.0:8793/ (Press CTRL+C to quit)
   /home/airflow/.local/lib/python3.7/site-packages/celery/platforms.py:801 
RuntimeWarning: You're running the worker with superuser privileges: this is
   absolutely not recommended!
   
   Please specify a different user using the --uid option.
   
   User information: uid=50000 euid=50000 gid=0 egid=0
   
   [2021-09-07 16:28:11,021: WARNING/ForkPoolWorker-1] Running <TaskInstance: 
test-long-running.long-long 2021-09-07T16:28:09.148524+00:00 [queued]> on host 
airflow-worker-86d78f7477-rjljs
   
   worker: Warm shutdown (MainProcess)
   [2021-09-07 16:28:32,919: ERROR/MainProcess] Process 'ForkPoolWorker-2' 
pid:20 exited with 'signal 15 (SIGTERM)'
   [2021-09-07 16:28:32,930: ERROR/MainProcess] Process 'ForkPoolWorker-1' 
pid:19 exited with 'signal 15 (SIGTERM)'
   [2021-09-07 16:28:33,183: ERROR/MainProcess] Task handler raised error: 
WorkerLostError('Worker exited prematurely: signal 15 (SIGTERM) Job: 0.')
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.7/site-packages/celery/worker/worker.py", 
line 208, in start
       self.blueprint.start(self)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/celery/bootsteps.py", line 
119, in start
       step.start(parent)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/celery/bootsteps.py", line 
369, in start
       return self.obj.start()
     File 
"/home/airflow/.local/lib/python3.7/site-packages/celery/worker/consumer/consumer.py",
 line 318, in start
       blueprint.start(self)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/celery/bootsteps.py", line 
119, in start
       step.start(parent)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/celery/worker/consumer/consumer.py",
 line 599, in start
       c.loop(*c.loop_args())
     File 
"/home/airflow/.local/lib/python3.7/site-packages/celery/worker/loops.py", line 
83, in asynloop
       next(loop)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/kombu/asynchronous/hub.py", 
line 303, in create_loop
       poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
     File 
"/home/airflow/.local/lib/python3.7/site-packages/kombu/asynchronous/hub.py", 
line 145, in fire_timers
       entry()
     File 
"/home/airflow/.local/lib/python3.7/site-packages/kombu/asynchronous/timer.py", 
line 68, in __call__
       return self.fun(*self.args, **self.kwargs)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/kombu/asynchronous/timer.py", 
line 130, in _reschedules
       return fun(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/celery/worker/consumer/gossip.py",
 line 167, in periodic
       for worker in values(workers):
     File 
"/home/airflow/.local/lib/python3.7/site-packages/kombu/utils/functional.py", 
line 109, in _iterate_values
       for k in self:
     File 
"/home/airflow/.local/lib/python3.7/site-packages/kombu/utils/functional.py", 
line 95, in __iter__
       def __iter__(self):
     File 
"/home/airflow/.local/lib/python3.7/site-packages/celery/apps/worker.py", line 
285, in _handle_request
       raise exc(exitcode)
   celery.exceptions.WorkerShutdown: 0
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File "/home/airflow/.local/lib/python3.7/site-packages/billiard/pool.py", 
line 1267, in mark_as_worker_lost
       human_status(exitcode), job._job),
   billiard.exceptions.WorkerLostError: Worker exited prematurely: signal 15 
(SIGTERM) Job: 0.
   
    -------------- celery@airflow-worker-86d78f7477-rjljs v4.4.7 (cliffs)
   --- ***** -----
   -- ******* ---- Linux-5.4.129-63.229.amzn2.x86_64-x86_64-with-debian-10.10 
2021-09-07 16:26:26
   - *** --- * ---
   - ** ---------- [config]
   - ** ---------- .> app:         
airflow.executors.celery_executor:0x7ff517d78d90
   - ** ---------- .> transport:   redis://:**@airflow-redis:6379/0
   - ** ---------- .> results:     
postgresql+psycopg2://airflow:**@stg-datascience-database.trocafone.net:5432/airflow
   - *** --- * --- .> concurrency: 2 (prefork)
   -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this 
worker)
   --- ***** -----
    -------------- [queues]
                   .> default          exchange=default(direct) key=default
   ```
   
   ### How to reproduce
   
   Run a dag with this airflow configuration
   ```yaml
   executor: "CeleryExecutor"
   workers:
     replicas: 1
     command: ~
     args:
       - "bash"
       - "-c"
       # The format below is necessary to get `helm lint` happy
       - |-
         exec \
         airflow celery worker
   ```
   
   and kill the worker pod
   
   ### Anything else
   
   Overwriting the official entrypoint seems to solve the issue
   
   ```yaml
   workers:
     # To gracefully shutdown workers I have to overwrite the container 
entrypoint
     command: ["airflow"]
     args: ["celery", "worker"]
   ```
   
   When the worker gets killed another worker pod comes online and the old one 
stays in status `Terminating`, all new tasks go to the new worker.
   
   Below are the logs when the worker gets killed
    
   ```bash
   k logs -f airflow-worker-5ff95df84f-fznk7
    * Serving Flask app "airflow.utils.serve_logs" (lazy loading)
    * Environment: production
      WARNING: This is a development server. Do not use it in a production 
deployment.
      Use a production WSGI server instead.
    * Debug mode: off
   [2021-09-07 16:42:42,399] {_internal.py:113} INFO -  * Running on 
http://0.0.0.0:8793/ (Press CTRL+C to quit)
   /home/airflow/.local/lib/python3.7/site-packages/celery/platforms.py:801 
RuntimeWarning: You're running the worker with superuser privileges: this is
   absolutely not recommended!
   
   Please specify a different user using the --uid option.
   
   User information: uid=50000 euid=50000 gid=0 egid=0
   
   [2021-09-07 16:42:53,133: WARNING/ForkPoolWorker-1] Running <TaskInstance: 
test-long-running.long-long 2021-09-07T16:28:09.148524+00:00 [queued]> on host 
airflow-worker-5ff95df84f-fznk7
   
   worker: Warm shutdown (MainProcess)
   
    -------------- celery@airflow-worker-5ff95df84f-fznk7 v4.4.7 (cliffs)
   --- ***** -----
   -- ******* ---- Linux-5.4.129-63.229.amzn2.x86_64-x86_64-with-debian-10.10 
2021-09-07 16:42:43
   - *** --- * ---
   - ** ---------- [config]
   - ** ---------- .> app:         
airflow.executors.celery_executor:0x7f69aaa90d50
   - ** ---------- .> transport:   redis://:**@airflow-redis:6379/0
   - ** ---------- .> results:     
postgresql+psycopg2://airflow:**@stg-datascience-database.trocafone.net:5432/airflow
   - *** --- * --- .> concurrency: 2 (prefork)
   -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this 
worker)
   --- ***** -----
    -------------- [queues]
                   .> default          exchange=default(direct) key=default
   
   
   rpc error: code = Unknown desc = Error: No such container: 
efe5ce470f5bd5b7f84479c1a8f5dc1d5d92cb1ad6b16696fa5a1ca9610602ee%
   ```
   There is no timestamp but it waits for the task to finish before writing 
`worker: Warm shutdown (MainProcess)`
   
   
   
   Another option I tried was using this as the entrypoint and it also works
   
   ```bash
   #!/usr/bin/env bash
   
   handle_worker_term_signal() {
       # Remove worker from queue
       celery -b $AIRFLOW__CELERY__BROKER_URL -d celery@$HOSTNAME control 
cancel_consumer default
   
       while [ $(airflow jobs check --hostname $HOSTNAME | grep "Found one 
alive job." | wc -l) -eq 1 ]; do
           echo 'Finishing jobs!'
           airflow jobs check --hostname $HOSTNAME --limit 100 --allow-multiple
           sleep 60
       done
       echo 'All jobs finished! Terminating worker'
   
       kill $pid
       exit 0
   }
   
   trap handle_worker_term_signal SIGTERM
   
   airflow celery worker &
   pid="$!"
   
   wait $pid
   ```
   
   Got the idea from this post: 
https://medium.com/flatiron-engineering/upgrading-airflow-with-zero-downtime-8df303760c96
   
   
   Thanks!
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to