rpfernandezjr opened a new issue #15580:
URL: https://github.com/apache/airflow/issues/15580


   **Apache Airflow version**:  2.0.2
   
   **Kubernetes version**: 1.17.17
   
   - **Cloud provider or hardware configuration**: GKE
   - **OS** (e.g. from /etc/os-release):  CentOS Linux release 7.8.2003 (Core)
   - **kernel**: 4.19.150+ x86
   
   **What happened**:
   A dag is kicked off via the airflow webserver, the scheduler kicks off a new 
worker pod for the task, and that task is flagged as `success` , however the 
worker pod that was executed stays in a `CrashLoopBackOff` state, and is never 
removed.
   
   
   **What you expected to happen**:
   once the task is flagged as `success` , i expect the worker pod to get 
cleaned up. 
   
   Here's the entry from the database where the task was flagged as successful 
after the initial pod run.
   ```
     task_id   | try_number | pid |  state  |          start_date           |   
        end_date            
   
------------+------------+-----+---------+-------------------------------+-------------------------------
    start_task |          1 |   1 | success | 2021-04-28 20:55:37.117977+00 | 
2021-04-28 20:55:37.424436+00
   ```
   
   Here's a snippet of the scheduler logs, it continues to log this in a loop 
   ```
   [2021-04-28 21:19:00,140] {kubernetes_executor.py:335} DEBUG - Syncing 
KubernetesExecutor
   [2021-04-28 21:19:00,140] {kubernetes_executor.py:261} DEBUG - 
KubeJobWatcher alive, continuing
   [2021-04-28 21:19:00,141] {dag_processing.py:385} DEBUG - Received message 
of type DagParsingStat
   [2021-04-28 21:19:00,142] {dag_processing.py:385} DEBUG - Received message 
of type DagParsingStat
   [2021-04-28 21:19:00,152] {scheduler_job.py:1399} DEBUG - Next timed event 
is in 1.834328
   [2021-04-28 21:19:00,152] {scheduler_job.py:1401} DEBUG - Ran scheduling 
loop in 0.05 seconds
   [2021-04-28 21:19:01,154] {scheduler_job.py:1591} DEBUG - Running 
SchedulerJob._create_dagruns_for_dags with retries. Try 1 of 3
   [2021-04-28 21:19:01,165] {scheduler_job.py:1573} DEBUG - Running 
SchedulerJob._get_dagmodels_and_create_dagruns with retries. Try 1 of 3
   [2021-04-28 21:19:01,187] {scheduler_job.py:940} DEBUG - No tasks to 
consider for execution.
   [2021-04-28 21:19:01,189] {base_executor.py:150} DEBUG - 1 running task 
instances
   [2021-04-28 21:19:01,189] {base_executor.py:151} DEBUG - 0 in queue
   [2021-04-28 21:19:01,189] {base_executor.py:152} DEBUG - 31 open slots
   [2021-04-28 21:19:01,190] {base_executor.py:161} DEBUG - Calling the <class 
'airflow.executors.kubernetes_executor.KubernetesExecutor'> sync method
   [2021-04-28 21:19:01,190] {kubernetes_executor.py:510} DEBUG - self.running: 
{TaskInstanceKey(dag_id='raf-k8-dag', task_id='start_task', 
execution_date=datetime.datetime(2021, 4, 28, 20, 55, 32, 53209, 
tzinfo=Timezone('UTC')), try_number=1)}
   ```
   
   here is the pod what was created `dagfk8dagstarttask...` , stays in a 
crashing state.
   ```
   NAME                                                 READY   STATUS          
   RESTARTS   AGE
   airflow-0                                            2/2     Running         
   0          17h
   dagfk8dagstarttask.372040981c7a4a2d8e3df6f01e656f50  0/1     
CrashLoopBackOff   9          24m
   ```
   
   currently have these 2 settings in our `airflow.cfg` config
   ```
   delete_worker_pods = True
   delete_worker_pods_on_failure = False
   ```
   I've tried filliping `delete_worker_pods_on_failure` to both True and False 
and get the same results in my runs.
   
   
   here's the description of the worker pod that is crashing
   ```
   Name:                 dagk8dagstarttask.372040981c7a4a2d8e3df6f01e656f50
   Namespace:            default
   Priority:             500
   Priority Class Name:  mid-priority
   Node:                 gke-udp-xxxxx-gke-node-pool-1a26adcb-92ca/10.100.0.45
   Start Time:           Wed, 28 Apr 2021 15:55:33 -0500
   Labels:               airflow-worker=372
                         airflow_version=2.0.2
                         dag_id=raf-k8-dag
                         execution_date=2021-04-28T20_55_32.053209_plus_00_00
                         kubernetes_executor=True
                         task_id=start_task
                         try_number=1
   Annotations:          dag_id: raf-k8-dag
                         execution_date: 2021-04-28T20:55:32.053209+00:00
                         kubernetes.io/limit-ranger: LimitRanger plugin set: 
cpu, memory request for container base; cpu limit for container base
                         task_id: start_task
                         try_number: 1
   Status:               Running
   IP:                   10.102.1.96
   IPs:
     IP:  10.102.1.96
   Containers:
     base:
       Container ID:  
docker://7d0fc4d8a1e799ce215a08a4216682f8a0fb33d735cb81c22ae4bd4410f3b78f
       Image:         gcr.io/xxxxxxxx/default/airflow:latest
       Image ID:      
docker-pullable://gcr.io/xxxxxxxxxxx/default/airflow@sha256:66fe0f4e1185698c93ebfba05d837f1fc764cd3ec70492a6a058a01efa558598
       Port:          <none>
       Host Port:     <none>
       Args:
         airflow
         tasks
         run
         raf-k8-dag
         start_task
         2021-04-28T20:55:32.053209+00:00
         --local
         --pool
         default_pool
         --subdir
         /opt/airflow/dags/k8-test.py
       State:          Waiting
         Reason:       CrashLoopBackOff
       Last State:     Terminated
         Reason:       Completed
         Exit Code:    0
         Started:      Wed, 28 Apr 2021 16:01:34 -0500
         Finished:     Wed, 28 Apr 2021 16:01:37 -0500
       Ready:          False
       Restart Count:  6
       Limits:
         cpu:  750m
       Requests:
         cpu:     150m
         memory:  512Mi
       Environment:
         AIRFLOW_IS_K8S_EXECUTOR_POD:  True
       Mounts:
         /var/run/secrets/kubernetes.io/serviceaccount from default-token-hd2mp 
(ro)
   Conditions:
     Type              Status
     Initialized       True
     Ready             False
     ContainersReady   False
     PodScheduled      True
   Volumes:
     default-token-hd2mp:
       Type:        Secret (a volume populated by a Secret)
       SecretName:  default-token-hd2mp
       Optional:    false
   QoS Class:       Burstable
   Node-Selectors:  <none>
   Tolerations:     node.kubernetes.io/not-ready:NoExecute op=Exists for 300s
                    node.kubernetes.io/unreachable:NoExecute op=Exists for 300s
   Events:
     Type     Reason     Age                   From               Message
     ----     ------     ----                  ----               -------
     Normal   Scheduled  8m5s                  default-scheduler  Successfully 
assigned default/dagk8dagstarttask.372040981c7a4a2d8e3df6f01e656f50 to 
gke-udp-xxxxx-gke-node-pool-1a26adcb-92ca
     Normal   Started    7m8s (x4 over 8m3s)   kubelet            Started 
container base
     Normal   Pulling    6m21s (x5 over 8m4s)  kubelet            Pulling image 
"gcr.io/xxxxxxxxxxx/default/airflow:latest"
     Normal   Pulled     6m21s (x5 over 8m3s)  kubelet            Successfully 
pulled image "gcr.io/xxxxxxxxxxx/default/airflow:latest"
     Normal   Created    6m21s (x5 over 8m3s)  kubelet            Created 
container base
     Warning  BackOff    3m (x22 over 7m54s)   kubelet            Back-off 
restarting failed container
   ```
   
   here's the dag that I'm running too, pretty basic
   ```python
   import logging
   import os
   import time
   import sys
   
   from airflow import DAG
   from airflow.example_dags.libs.helper import print_stuff
   from airflow.operators.python import PythonOperator
   from airflow.settings import AIRFLOW_HOME
   from airflow.utils.dates import days_ago
   from kubernetes.client import models as k8s
   
   default_args = {
       'owner': 'airflow',
   }
   
   log = logging.getLogger(__name__)
   
   with DAG(
           dag_id='raf-k8-dag',
           default_args=default_args,
           schedule_interval=None,
           start_date=days_ago(1),
           tags=['raf-dag-tag'],
   ) as dag:
   
       def print_me():
           msg = "start-task ran ok"
           print(msg)
           log.info(msg)
           return 0
   
       start_task = PythonOperator(
           task_id="start_task",
           python_callable=print_me
       )
   
       start_task
   ```
   
   **How to reproduce it**:
   Run the dag listed above.
   
   
   
   **Anything else we need to know**:
    All signs point to the code (a basic print/log statement) are being 
executed when the worker runs, however we don't see the output of the worker 
logs via the airflow webserver. 
   
   seems very similar to this issues: 
https://github.com/apache/airflow/issues/13917  which was closed without a 
resolution.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to