Hello. I would like to discuss the following issue that we face in Cloud Composer (and probably others face too). We deploy Airflow components running in separate GKE pods, and DAG files are synced from GCS (Google Cloud Storage) to each component separately - we do not use any NFS-type disks mounted to each component, the DAG files are continuously synced to each pod (i.e. something like ~"gsutil rsync ..." in a loop).
Since all components are in such a distributed environment, DAG files can be out of sync between components, and this results in the following issue: 1. new DAG file is synced to DAG processor 2. new DAG is scheduled by scheduler 3. Celery worker starts execution of the task (scheduled DAG) and fails (can't parse file) because DAG file is not yet synced to worker 4. new DAG file is synced to Celery worker The parsing of the DAG file in task runner happens here: https://github.com/apache/airflow/blob/eabe6b8dd77204f7c0d117c9d9ad1f4166869671/task-sdk/src/airflow/sdk/execution_time/task_runner.py#L634 So far, we were trying different hacks to address this issue in Cloud Composer. *Question:* Would it make sense/is it possible to have some retry logic in the "parse" method of task runner? For example, ~implementation: - DAG is parsed *- if DAG is not found -> sleep + retry parsing (loop)* *- if timeout reached, exit with message "Dag not found ..."* - if DAG is found, continue Having any value >0 for timeout has its own downside, that failure of the tasks which DAG files really disappear will now take more time. The timeout can be configurable, and we can have "0" as default value, which means that the implementation will be completely backward compatible. And Airflow administrators can override this value, knowing that they have the issue described above, and downsides of having this timeout increased. Any thoughts? -- Eugene
