ordonezf opened a new issue #18066:
URL: https://github.com/apache/airflow/issues/18066
### Official Helm Chart version
1.1.0 (latest released)
### Apache Airflow version
2.1.3 (latest released)
### Kubernetes Version
1.19.13
### Helm Chart configuration
```yaml
executor: "CeleryExecutor"
workers:
# Number of airflow celery workers in StatefulSet
replicas: 1
# Below is the default value, it does not work
command: ~
args:
- "bash"
- "-c"
- |-
exec \
airflow celery worker
```
### Docker Image customisations
```dockerfile
FROM apache/airflow:2.1.3-python3.7
ENV AIRFLOW_HOME=/opt/airflow
USER root
RUN set -ex \
&& buildDeps=' \
python3-dev \
libkrb5-dev \
libssl-dev \
libffi-dev \
build-essential \
libblas-dev \
liblapack-dev \
libpq-dev \
gcc \
g++ \
' \
&& apt-get update -yqq \
&& apt-get upgrade -yqq \
&& apt-get install -yqq --no-install-recommends \
$buildDeps \
libsasl2-dev \
libsasl2-modules \
apt-utils \
curl \
vim \
rsync \
netcat \
locales \
sudo \
patch \
libpq5 \
&& apt-get autoremove -yqq --purge\
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
USER airflow
COPY --chown=airflow:root requirements*.txt /tmp/
RUN pip install -U pip setuptools wheel cython \
&& pip install -r /tmp/requirements_providers.txt \
&& pip install -r /tmp/requirements.txt
COPY --chown=airflow:root setup.py /tmp/custom_operators/
COPY --chown=airflow:root custom_operators/
/tmp/custom_operators/custom_operators/
RUN pip install /tmp/custom_operatos
COPY --chown=airflow:root entrypoint*.sh /
COPY --chown=airflow:root config/ ${AIRFLOW_HOME}/config/
COPY --chown=airflow:root airflow.cfg ${AIRFLOW_HOME}/
COPY --chown=airflow:root dags/ ${AIRFLOW_HOME}/dags
```
### What happened
Using CeleryExecutor whenever I kill a worker pod that is running a task
with `kubectl delete pod` or a `helm upgrade` the pod gets instantly killed and
does not wait for the task to finish or the end of
terminationGracePeriodSeconds.
### What you expected to happen
I expect the worker to finish all it's tasks inside the grace period before
being killed
Killing the pod when it's running a task throws this
```bash
k logs -f airflow-worker-86d78f7477-rjljs
* Serving Flask app "airflow.utils.serve_logs" (lazy loading)
* Environment: production
WARNING: This is a development server. Do not use it in a production
deployment.
Use a production WSGI server instead.
* Debug mode: off
[2021-09-07 16:26:25,612] {_internal.py:113} INFO - * Running on
http://0.0.0.0:8793/ (Press CTRL+C to quit)
/home/airflow/.local/lib/python3.7/site-packages/celery/platforms.py:801
RuntimeWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!
Please specify a different user using the --uid option.
User information: uid=50000 euid=50000 gid=0 egid=0
[2021-09-07 16:28:11,021: WARNING/ForkPoolWorker-1] Running <TaskInstance:
test-long-running.long-long 2021-09-07T16:28:09.148524+00:00 [queued]> on host
airflow-worker-86d78f7477-rjljs
worker: Warm shutdown (MainProcess)
[2021-09-07 16:28:32,919: ERROR/MainProcess] Process 'ForkPoolWorker-2'
pid:20 exited with 'signal 15 (SIGTERM)'
[2021-09-07 16:28:32,930: ERROR/MainProcess] Process 'ForkPoolWorker-1'
pid:19 exited with 'signal 15 (SIGTERM)'
[2021-09-07 16:28:33,183: ERROR/MainProcess] Task handler raised error:
WorkerLostError('Worker exited prematurely: signal 15 (SIGTERM) Job: 0.')
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.7/site-packages/celery/worker/worker.py",
line 208, in start
self.blueprint.start(self)
File
"/home/airflow/.local/lib/python3.7/site-packages/celery/bootsteps.py", line
119, in start
step.start(parent)
File
"/home/airflow/.local/lib/python3.7/site-packages/celery/bootsteps.py", line
369, in start
return self.obj.start()
File
"/home/airflow/.local/lib/python3.7/site-packages/celery/worker/consumer/consumer.py",
line 318, in start
blueprint.start(self)
File
"/home/airflow/.local/lib/python3.7/site-packages/celery/bootsteps.py", line
119, in start
step.start(parent)
File
"/home/airflow/.local/lib/python3.7/site-packages/celery/worker/consumer/consumer.py",
line 599, in start
c.loop(*c.loop_args())
File
"/home/airflow/.local/lib/python3.7/site-packages/celery/worker/loops.py", line
83, in asynloop
next(loop)
File
"/home/airflow/.local/lib/python3.7/site-packages/kombu/asynchronous/hub.py",
line 303, in create_loop
poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
File
"/home/airflow/.local/lib/python3.7/site-packages/kombu/asynchronous/hub.py",
line 145, in fire_timers
entry()
File
"/home/airflow/.local/lib/python3.7/site-packages/kombu/asynchronous/timer.py",
line 68, in __call__
return self.fun(*self.args, **self.kwargs)
File
"/home/airflow/.local/lib/python3.7/site-packages/kombu/asynchronous/timer.py",
line 130, in _reschedules
return fun(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.7/site-packages/celery/worker/consumer/gossip.py",
line 167, in periodic
for worker in values(workers):
File
"/home/airflow/.local/lib/python3.7/site-packages/kombu/utils/functional.py",
line 109, in _iterate_values
for k in self:
File
"/home/airflow/.local/lib/python3.7/site-packages/kombu/utils/functional.py",
line 95, in __iter__
def __iter__(self):
File
"/home/airflow/.local/lib/python3.7/site-packages/celery/apps/worker.py", line
285, in _handle_request
raise exc(exitcode)
celery.exceptions.WorkerShutdown: 0
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/billiard/pool.py",
line 1267, in mark_as_worker_lost
human_status(exitcode), job._job),
billiard.exceptions.WorkerLostError: Worker exited prematurely: signal 15
(SIGTERM) Job: 0.
-------------- celery@airflow-worker-86d78f7477-rjljs v4.4.7 (cliffs)
--- ***** -----
-- ******* ---- Linux-5.4.129-63.229.amzn2.x86_64-x86_64-with-debian-10.10
2021-09-07 16:26:26
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:
airflow.executors.celery_executor:0x7ff517d78d90
- ** ---------- .> transport: redis://:**@airflow-redis:6379/0
- ** ---------- .> results:
postgresql+psycopg2://airflow:**@stg-datascience-database.trocafone.net:5432/airflow
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this
worker)
--- ***** -----
-------------- [queues]
.> default exchange=default(direct) key=default
```
### How to reproduce
Run a dag with this airflow configuration
```yaml
executor: "CeleryExecutor"
workers:
replicas: 1
command: ~
args:
- "bash"
- "-c"
# The format below is necessary to get `helm lint` happy
- |-
exec \
airflow celery worker
```
and kill the worker pod
### Anything else
Overwriting the official entrypoint seems to solve the issue
```yaml
workers:
# To gracefully shutdown workers I have to overwrite the container
entrypoint
command: ["airflow"]
args: ["celery", "worker"]
```
When the worker gets killed another worker pod comes online and the old one
stays in status `Terminating`, all new tasks go to the new worker.
Below are the logs when the worker gets killed
```bash
k logs -f airflow-worker-5ff95df84f-fznk7
* Serving Flask app "airflow.utils.serve_logs" (lazy loading)
* Environment: production
WARNING: This is a development server. Do not use it in a production
deployment.
Use a production WSGI server instead.
* Debug mode: off
[2021-09-07 16:42:42,399] {_internal.py:113} INFO - * Running on
http://0.0.0.0:8793/ (Press CTRL+C to quit)
/home/airflow/.local/lib/python3.7/site-packages/celery/platforms.py:801
RuntimeWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!
Please specify a different user using the --uid option.
User information: uid=50000 euid=50000 gid=0 egid=0
[2021-09-07 16:42:53,133: WARNING/ForkPoolWorker-1] Running <TaskInstance:
test-long-running.long-long 2021-09-07T16:28:09.148524+00:00 [queued]> on host
airflow-worker-5ff95df84f-fznk7
worker: Warm shutdown (MainProcess)
-------------- celery@airflow-worker-5ff95df84f-fznk7 v4.4.7 (cliffs)
--- ***** -----
-- ******* ---- Linux-5.4.129-63.229.amzn2.x86_64-x86_64-with-debian-10.10
2021-09-07 16:42:43
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:
airflow.executors.celery_executor:0x7f69aaa90d50
- ** ---------- .> transport: redis://:**@airflow-redis:6379/0
- ** ---------- .> results:
postgresql+psycopg2://airflow:**@stg-datascience-database.trocafone.net:5432/airflow
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this
worker)
--- ***** -----
-------------- [queues]
.> default exchange=default(direct) key=default
rpc error: code = Unknown desc = Error: No such container:
efe5ce470f5bd5b7f84479c1a8f5dc1d5d92cb1ad6b16696fa5a1ca9610602ee%
```
There is no timestamp but it waits for the task to finish before writing
`worker: Warm shutdown (MainProcess)`
Another option I tried was using this as the entrypoint and it also works
```bash
#!/usr/bin/env bash
handle_worker_term_signal() {
# Remove worker from queue
celery -b $AIRFLOW__CELERY__BROKER_URL -d celery@$HOSTNAME control
cancel_consumer default
while [ $(airflow jobs check --hostname $HOSTNAME | grep "Found one
alive job." | wc -l) -eq 1 ]; do
echo 'Finishing jobs!'
airflow jobs check --hostname $HOSTNAME --limit 100 --allow-multiple
sleep 60
done
echo 'All jobs finished! Terminating worker'
kill $pid
exit 0
}
trap handle_worker_term_signal SIGTERM
airflow celery worker &
pid="$!"
wait $pid
```
Got the idea from this post:
https://medium.com/flatiron-engineering/upgrading-airflow-with-zero-downtime-8df303760c96
Thanks!
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]