shihabcsedu09 opened a new issue #14791:
URL: https://github.com/apache/airflow/issues/14791
**Apache Airflow version**: 2.0.1
**Kubernetes version ** : 1.19.7
**Environment**:
- **Cloud provider or hardware configuration**: Azure Kubernetes Service(AKS)
- **OS** (e.g. from /etc/os-release): Debian GNU/Linux 10 (buster)
- **Kernel** (e.g. `uname -a`): Linux airflow-scheduler-db9fd5df6-6475f
5.4.0-1040-azure #42~18.04.1-Ubuntu SMP Mon Feb 8 19:05:32 UTC 2021 x86_64
GNU/Linux
**What happened**:
I have configured AKS Kubernetes Cluster where there is an Airflow Scheduler
pod and an Airflow Scheduler pod. I have written a DAG that will run using the
Kubernetes Pod Operator.
When the DAG is triggered a pod is created and the steps inside the DAG
start running. For the DAGS who take a short amount of time to finish doesn't
cause any issue for longer running tasks when the task is done in the pod and
the pod moves to a terminated/completed state the airflow webserver/scheduler
seems to not receive that information. For this reason, the DAG status is
always running in the webserver and I have to manually mark that DAG as success
to go forward.
I observed both the Kubernetes pod logs and the logs that we can see from
the web UI for that task. The logs that I can see from the webserver are
lagging behind the logs in the pod.
**What you expected to happen**:
When the POD is terminated/completed the airflow scheduler should have that
information and mark that job as a success.
**How to reproduce it**:
To mimic a long-running job I have written a small dag where I used a sleep
command.
Here is the DAG -
```
import time
from datetime import datetime
import logging
def long_task():
time.sleep(300)
if __name__ == '__main__':
loop_step_count = 12
for i in range(loop_step_count):
logging.info(f'Loop Count {i} Current timestamp {datetime.utcnow()}')
long_task()
```
and this is how I defined my Kubernetes Pod Operator.
```
from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import
KubernetesPodOperator
default_args = {
'owner': 'airflow',
'description': 'Pipeline for testing airflow aks termination issue',
'depend_on_past': False,
'start_date': datetime(2020, 2, 5),
'email_on_failure': False,
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=1),
}
with DAG('Termination_Issue_AKS_Airflow',
default_args=default_args,
catchup=False) as dag:
step = KubernetesPodOperator(
namespace='airflow',
name="Termination_Issue_Aks_Airflow",
task_id="Termination_Issue_Aks_Airflow",
image="shihabcsedu09/termination_issue_aks_airflow:latest",
image_pull_policy='Always',
get_logs=True,
log_events_on_failure=True,
is_delete_operator_pod=True,
node_selector={'agentpool': 'airflowtasks'},
termination_grace_period=60,
startup_timeout_seconds=900,
)
step
```
The image for this DAG step is in in Dockerhub and is public.
([Link](https://hub.docker.com/repository/docker/shihabcsedu09/termination_issue_aks_airflow)).
The related Dockerfile is like this.
```
FROM python:3.7
COPY termination_issue_aks_airflow.py .
CMD [ "python", "./termination_issue_aks_airflow.py" ]
```
and this is how I am publishing it.
```
docker build --no-cache \
-t termination_issue_aks_airflow \
-f Dockerfile .
docker tag termination_issue_aks_airflow
shihabcsedu09/termination_issue_aks_airflow:latest
docker push shihabcsedu09/termination_issue_aks_airflow:latest
```
**Important to know -**
In my Kubernetes cluster there are two node pools
1. **default** node pool - In this pool I deployed my airflow scheduler and
airflow webserver pod.
2. **airflowtasks** node pool - In this pool the steps of my DAGs run. You
can see I used `node_selector={'agentpool': 'airflowtasks'}` to define in which
node pool the DAG will run.
If you are using kubernetes, please attempt to recreate the issue using
minikube or kind.
## Install minikube/kind
- Minikube https://minikube.sigs.k8s.io/docs/start/
- Kind https://kind.sigs.k8s.io/docs/user/quick-start/
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]