PaulW commented on issue #4636: [AIRFLOW-3737] Kubernetes executor cannot handle long dag/task names URL: https://github.com/apache/airflow/pull/4636#issuecomment-469023138 So I hit the same issue as the last time we tried to trim/hash labels & not store the raw dag_id & task_id as an annotation, in that the following exception is raised upon the pod deletion, when the `_change_state()` function is called at the end: ```[2019-03-01 18:02:04,081] {{kubernetes_executor.py:679}} INFO - Changing state of (('kubernetesexecutorworkerlaunchtest', 'workerpodsubdagtest', datetime.datetime(2019, 3, 1, 17, 45, tzinfo=tzlocal()), 1), None, 'kubernetesexecutorworkerlaunchtestworkerpodsubdagtest-c77976ffb1e54b6788612ac949fb31e2', '102168655') to None [2019-03-01 18:02:04,093] {{kubernetes_executor.py:692}} INFO - Deleted pod: ('kubernetesexecutorworkerlaunchtest', 'workerpodsubdagtest', datetime.datetime(2019, 3, 1, 17, 45, tzinfo=tzlocal()), 1) [2019-03-01 18:02:04,093] {{kubernetes_executor.py:695}} DEBUG - Could not find key: ('kubernetesexecutorworkerlaunchtest', 'workerpodsubdagtest', datetime.datetime(2019, 3, 1, 17, 45, tzinfo=tzlocal()), 1) [2019-03-01 18:02:04,094] {{kubernetes_executor.py:293}} INFO - Event: kubernetesexecutorworkerlaunchtestworkerpodsubdagtest-c77976ffb1e54b6788612ac949fb31e2 had an event of type MODIFIED [2019-03-01 18:02:04,094] {{kubernetes_executor.py:330}} INFO - Event: kubernetesexecutorworkerlaunchtestworkerpodsubdagtest-c77976ffb1e54b6788612ac949fb31e2 Succeeded [2019-03-01 18:02:04,097] {{kubernetes_executor.py:293}} INFO - Event: kubernetesexecutorworkerlaunchtestworkerpodsubdagtest-c77976ffb1e54b6788612ac949fb31e2 had an event of type DELETED [2019-03-01 18:02:04,097] {{kubernetes_executor.py:330}} INFO - Event: kubernetesexecutorworkerlaunchtestworkerpodsubdagtest-c77976ffb1e54b6788612ac949fb31e2 Succeeded [2019-03-01 18:02:04,185] {{dag_processing.py:587}} INFO - Terminating manager process: 639 [2019-03-01 18:02:04,185] {{dag_processing.py:592}} INFO - Waiting up to 5s for manager process to exit... [2019-03-01 18:02:04,207] {{settings.py:201}} DEBUG - Disposing DB connection pool (PID 4983) [2019-03-01 18:02:04,256] {{settings.py:201}} DEBUG - Disposing DB connection pool (PID 5010) [2019-03-01 18:02:04,320] {{jobs.py:1513}} INFO - Exited execute loop [2019-03-01 18:02:04,438] {{cli_action_loggers.py:81}} DEBUG - Calling callbacks: [] [2019-03-01 18:02:04,445] {{settings.py:201}} DEBUG - Disposing DB connection pool (PID 1) Traceback (most recent call last): File "/usr/bin/airflow", line 32, in <module> args.func(args) File "/usr/lib/python3.6/site-packages/airflow/utils/cli.py", line 74, in wrapper return f(*args, **kwargs) File "/usr/lib/python3.6/site-packages/airflow/bin/cli.py", line 991, in scheduler job.run() File "/usr/lib/python3.6/site-packages/airflow/jobs.py", line 202, in run self._execute() File "/usr/lib/python3.6/site-packages/airflow/jobs.py", line 1510, in _execute self._execute_helper() File "/usr/lib/python3.6/site-packages/airflow/jobs.py", line 1596, in _execute_helper self.executor.heartbeat() File "/usr/lib/python3.6/site-packages/airflow/executors/base_executor.py", line 150, in heartbeat self.sync() File "/usr/lib/python3.6/site-packages/airflow/contrib/executors/kubernetes_executor.py", line 680, in sync self._change_state(key, state, pod_id) File "/usr/lib/python3.6/site-packages/airflow/contrib/executors/kubernetes_executor.py", line 703, in _change_state execution_date=ex_time File "/usr/lib64/python3.6/site-packages/sqlalchemy/orm/query.py", line 3046, in one raise orm_exc.NoResultFound("No row was found for one()") sqlalchemy.orm.exc.NoResultFound: No row was found for one()``` The process seems to be: `KubernetesExecutor.sync()` -> `AirflowKubernetesScheduler.sync()` -> `AirflowKubernetesScheduler.process_watcher_task()` `process_watcher_task()` then calls `_labels_to_key()` which extracts the labels from the pod & stores them to `key`. These labels are the truncated/modified ones. `KubernetesExecutor.sync()` resumes, and calls `KubernetesExecutor._change_state()` and passes `key` through to it. `_change_state` then deletes the pod which ran the dag/task, and using the `dag_id` and `task_id` values returned previously from `_labels_to_key()` and stored within `key`, then tries to query the db. ``` (dag_id, task_id, ex_time, try_number) = key with create_session() as session: item = session.query(TaskInstance).filter_by( dag_id=dag_id, task_id=task_id, execution_date=ex_time ).one() if state: item.state = state session.add(item)``` Obviously though this fails, as the `dag_id` and `task_id` stored in the key which is used here is wrong, and doesn't match anything in the DB directly. This was why we ended up storing the raw values as annotations & referenced them instead. I'm currently testing these changes now which remove the need for annotations & keep everything in the labels. The only file edited now is the `contrib/executors/kubernetes_executor.py`. Instead of storing annotations, the `dag_id` and `task_id` are modified to remove any chars which are not supported in label values, and if the value is then greater than 63 chars, it is trimmed & a unique hash created from the 'raw' unmodified value is added to the end. The `_labels_to_key` function then, instead of returning the `dag_id` and `task_id` directly from the label (which would result in the condition above) instead performs a search of TaskInstances, which filters by the `execution_time` value stored as a 3rd label, and then loops through the results & matches the task by comparing the 'safe' `dag_id` and `task_id`. This works fine locally (so far) however, I've some concerns this isn't really 100% foolproof, and that if the updated 'safe' label value, if it differs from the original in any way, should be hashed regardless of length in that respect. I'll upload the changes once I've done some more testing.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
