PaulW edited a comment on issue #4636: [AIRFLOW-3737] Kubernetes executor 
cannot handle long dag/task names
URL: https://github.com/apache/airflow/pull/4636#issuecomment-469023138
 
 
   So I hit the same issue as the last time we tried to trim/hash labels & not 
store the raw dag_id & task_id as an annotation, in that the following 
exception is raised upon the pod deletion, when the `_change_state()` function 
is called at the end:
   
   ```[2019-03-01 18:02:04,081] {{kubernetes_executor.py:679}} INFO - Changing 
state of (('kubernetesexecutorworkerlaunchtest', 'workerpodsubdagtest', 
datetime.datetime(2019, 3, 1, 17, 45, tzinfo=tzlocal()), 1), None, 
'kubernetesexecutorworkerlaunchtestworkerpodsubdagtest-c77976ffb1e54b6788612ac949fb31e2',
 '102168655') to None
   [2019-03-01 18:02:04,093] {{kubernetes_executor.py:692}} INFO - Deleted pod: 
('kubernetesexecutorworkerlaunchtest', 'workerpodsubdagtest', 
datetime.datetime(2019, 3, 1, 17, 45, tzinfo=tzlocal()), 1)
   [2019-03-01 18:02:04,093] {{kubernetes_executor.py:695}} DEBUG - Could not 
find key: ('kubernetesexecutorworkerlaunchtest', 'workerpodsubdagtest', 
datetime.datetime(2019, 3, 1, 17, 45, tzinfo=tzlocal()), 1)
   [2019-03-01 18:02:04,094] {{kubernetes_executor.py:293}} INFO - Event: 
kubernetesexecutorworkerlaunchtestworkerpodsubdagtest-c77976ffb1e54b6788612ac949fb31e2
 had an event of type MODIFIED
   [2019-03-01 18:02:04,094] {{kubernetes_executor.py:330}} INFO - Event: 
kubernetesexecutorworkerlaunchtestworkerpodsubdagtest-c77976ffb1e54b6788612ac949fb31e2
 Succeeded
   [2019-03-01 18:02:04,097] {{kubernetes_executor.py:293}} INFO - Event: 
kubernetesexecutorworkerlaunchtestworkerpodsubdagtest-c77976ffb1e54b6788612ac949fb31e2
 had an event of type DELETED
   [2019-03-01 18:02:04,097] {{kubernetes_executor.py:330}} INFO - Event: 
kubernetesexecutorworkerlaunchtestworkerpodsubdagtest-c77976ffb1e54b6788612ac949fb31e2
 Succeeded
   [2019-03-01 18:02:04,185] {{dag_processing.py:587}} INFO - Terminating 
manager process: 639
   [2019-03-01 18:02:04,185] {{dag_processing.py:592}} INFO - Waiting up to 5s 
for manager process to exit...
   [2019-03-01 18:02:04,207] {{settings.py:201}} DEBUG - Disposing DB 
connection pool (PID 4983)
   [2019-03-01 18:02:04,256] {{settings.py:201}} DEBUG - Disposing DB 
connection pool (PID 5010)
   [2019-03-01 18:02:04,320] {{jobs.py:1513}} INFO - Exited execute loop
   [2019-03-01 18:02:04,438] {{cli_action_loggers.py:81}} DEBUG - Calling 
callbacks: []
   [2019-03-01 18:02:04,445] {{settings.py:201}} DEBUG - Disposing DB 
connection pool (PID 1)
   
   Traceback (most recent call last):
     File "/usr/bin/airflow", line 32, in <module>
       args.func(args)
     File "/usr/lib/python3.6/site-packages/airflow/utils/cli.py", line 74, in 
wrapper
       return f(*args, **kwargs)
     File "/usr/lib/python3.6/site-packages/airflow/bin/cli.py", line 991, in 
scheduler
       job.run()
     File "/usr/lib/python3.6/site-packages/airflow/jobs.py", line 202, in run
       self._execute()
     File "/usr/lib/python3.6/site-packages/airflow/jobs.py", line 1510, in 
_execute
       self._execute_helper()
     File "/usr/lib/python3.6/site-packages/airflow/jobs.py", line 1596, in 
_execute_helper
       self.executor.heartbeat()
     File 
"/usr/lib/python3.6/site-packages/airflow/executors/base_executor.py", line 
150, in heartbeat
       self.sync()
     File 
"/usr/lib/python3.6/site-packages/airflow/contrib/executors/kubernetes_executor.py",
 line 680, in sync
       self._change_state(key, state, pod_id)
     File 
"/usr/lib/python3.6/site-packages/airflow/contrib/executors/kubernetes_executor.py",
 line 703, in _change_state
       execution_date=ex_time
     File "/usr/lib64/python3.6/site-packages/sqlalchemy/orm/query.py", line 
3046, in one
       raise orm_exc.NoResultFound("No row was found for one()")
   sqlalchemy.orm.exc.NoResultFound: No row was found for one()
   ```
   
   The process seems to be:
   
   `KubernetesExecutor.sync()` -> `AirflowKubernetesScheduler.sync()` -> 
`AirflowKubernetesScheduler.process_watcher_task()`
   
   `process_watcher_task()` then calls `_labels_to_key()` which extracts the 
labels from the pod & stores them to `key`.  These labels are the 
truncated/modified ones.
   
   `KubernetesExecutor.sync()` resumes, and calls 
`KubernetesExecutor._change_state()` and passes `key` through to it.
   
   `_change_state` then deletes the pod which ran the dag/task, and using the 
`dag_id` and `task_id` values returned previously from `_labels_to_key()` and 
stored within `key`, then tries to query the db.
   
   ```         (dag_id, task_id, ex_time, try_number) = key
            with create_session() as session:
                item = session.query(TaskInstance).filter_by(
                    dag_id=dag_id,
                    task_id=task_id,
                    execution_date=ex_time
                ).one()
                if state:
                    item.state = state
                    session.add(item)
   ```
   
   Obviously though this fails, as the `dag_id` and `task_id` stored in the key 
which is used here is wrong, and doesn't match anything in the DB directly.  
This was why we ended up storing the raw values as annotations & referenced 
them instead.
   
   I'm currently testing these changes now which remove the need for 
annotations & keep everything in the labels.  The only file edited now is the 
`contrib/executors/kubernetes_executor.py`.
   
   Instead of storing annotations, the `dag_id` and `task_id` are modified to 
remove any chars which are not supported in label values, and if the value is 
then greater than 63 chars, it is trimmed & a unique hash created from the 
'raw' unmodified value is added to the end.  The `_labels_to_key` function 
then, instead of returning the `dag_id` and `task_id` directly from the label 
(which would result in the condition above) instead performs a search of 
TaskInstances, which filters by the `execution_time` value stored as a 3rd 
label, loops through the results & matches the task by comparing the 'safe' 
`dag_id` and `task_id`.  This works fine locally (so far) however, I've some 
concerns this isn't really 100% foolproof, and that if the updated 'safe' label 
value differs from the original in any way, should be hashed regardless of 
length in that respect to avoid mismatched tasks if 2 dag or task names differs 
purely by a char which is filtered out.
   
   I'll upload the changes once I've done some more testing.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to