shihabcsedu09 opened a new issue #14791:
URL: https://github.com/apache/airflow/issues/14791


   **Apache Airflow version**:  2.0.1
   
   **Kubernetes version ** : 1.19.7
   
   **Environment**:
   
   - **Cloud provider or hardware configuration**: Azure Kubernetes Service(AKS)
   - **OS** (e.g. from /etc/os-release): Debian GNU/Linux 10 (buster)
   - **Kernel** (e.g. `uname -a`): Linux airflow-scheduler-db9fd5df6-6475f 
5.4.0-1040-azure #42~18.04.1-Ubuntu SMP Mon Feb 8 19:05:32 UTC 2021 x86_64 
GNU/Linux
   
   **What happened**:
   
   I have configured AKS Kubernetes Cluster where there is an Airflow Scheduler 
pod and an Airflow Scheduler pod. I have written a DAG that will run using the 
Kubernetes Pod Operator. 
   
   When the DAG is triggered a pod is created and the steps inside the DAG 
start running. For the DAGS who take a short amount of time to finish doesn't 
cause any issue for longer running tasks when the task is done in the pod and 
the pod moves to a terminated/completed state the airflow webserver/scheduler 
seems to not receive that information. For this reason, the DAG status is 
always running in the webserver and I have to manually mark that DAG as success 
to go forward. 
   
   I observed both the Kubernetes pod logs and the logs that we can see from 
the web UI for that task. The logs that I can see from the webserver are 
lagging behind the logs in the pod. 
   
   **What you expected to happen**:
   
   When the POD is terminated/completed the airflow scheduler should have that 
information and mark that job as a success. 
   
   **How to reproduce it**:
   
   To mimic a long-running job I have written a small dag where I used a sleep 
command.
   
   Here is the DAG - 
   
   ```
   import time
   from datetime import datetime
   import logging
   
   
   def long_task():
       time.sleep(300)
   
   
   if __name__ == '__main__':
       loop_step_count = 12
   
       for i in range(loop_step_count):
           logging.info(f'Loop Count {i} Current timestamp {datetime.utcnow()}')
           long_task()
           
   ```
   
   and this is how I defined my Kubernetes Pod Operator.
   
   ```
   from airflow import DAG
   from datetime import datetime, timedelta
   from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import 
KubernetesPodOperator
   
   default_args = {
       'owner': 'airflow',
       'description': 'Pipeline for testing airflow aks termination issue',
       'depend_on_past': False,
       'start_date': datetime(2020, 2, 5),
       'email_on_failure': False,
       'email_on_retry': False,
       'retries': 5,
       'retry_delay': timedelta(minutes=1),
   }
   
   with DAG('Termination_Issue_AKS_Airflow',
            default_args=default_args,
            catchup=False) as dag:
   
       step = KubernetesPodOperator(
           namespace='airflow',
           name="Termination_Issue_Aks_Airflow",
           task_id="Termination_Issue_Aks_Airflow",
           image="shihabcsedu09/termination_issue_aks_airflow:latest",
           image_pull_policy='Always',
           get_logs=True,
           log_events_on_failure=True,
           is_delete_operator_pod=True,
           node_selector={'agentpool': 'airflowtasks'},
           termination_grace_period=60,
           startup_timeout_seconds=900,
       )
   
       step
   ```
   
   The image for this DAG step is in in Dockerhub and is public. 
([Link](https://hub.docker.com/repository/docker/shihabcsedu09/termination_issue_aks_airflow)).
 The related Dockerfile is like this. 
   
   ```
   FROM python:3.7
   
   COPY termination_issue_aks_airflow.py .
   
   CMD [ "python", "./termination_issue_aks_airflow.py" ]
   ```
   
   and this is how I am publishing it.
   
   ```
   docker build --no-cache \
     -t termination_issue_aks_airflow \
     -f Dockerfile .
   
   docker tag termination_issue_aks_airflow 
shihabcsedu09/termination_issue_aks_airflow:latest
   
   docker push shihabcsedu09/termination_issue_aks_airflow:latest
   
   ```
   
   **Important to know -** 
   
   In my Kubernetes cluster there are two node pools 
   
   1. **default** node pool - In this pool I deployed my airflow scheduler and 
airflow webserver pod.
   2. **airflowtasks** node pool - In this pool the steps of my DAGs run. You 
can see I used `node_selector={'agentpool': 'airflowtasks'}` to define in which 
node pool the DAG will run. 
   
   If you are using kubernetes, please attempt to recreate the issue using 
minikube or kind.
   
   ## Install minikube/kind
   
   - Minikube https://minikube.sigs.k8s.io/docs/start/
   - Kind https://kind.sigs.k8s.io/docs/user/quick-start/
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to