Hello.

I would like to discuss the following issue that we face in Cloud Composer
(and probably others face too).
We deploy Airflow components running in separate GKE pods, and DAG files
are synced from GCS (Google Cloud Storage) to each component separately -
we do not use any NFS-type disks mounted to each component,
the DAG files are continuously synced to each pod (i.e. something like
~"gsutil rsync ..." in a loop).

Since all components are in such a distributed environment, DAG files can
be out of sync between components, and this results in the following issue:
1. new DAG file is synced to DAG processor
2. new DAG is scheduled by scheduler
3. Celery worker starts execution of the task (scheduled DAG) and fails
(can't parse file) because DAG file is not yet synced to worker
4. new DAG file is synced to Celery worker

The parsing of the DAG file in task runner happens here:
https://github.com/apache/airflow/blob/eabe6b8dd77204f7c0d117c9d9ad1f4166869671/task-sdk/src/airflow/sdk/execution_time/task_runner.py#L634

So far, we were trying different hacks to address this issue in Cloud
Composer.

*Question:*
Would it make sense/is it possible to have some retry logic in the "parse"
method of task runner? For example, ~implementation:
- DAG is parsed
*- if DAG is not found -> sleep + retry parsing (loop)*
*- if timeout reached, exit with message "Dag not found ..."*
- if DAG is found, continue

Having any value >0 for timeout has its own downside, that failure of the
tasks which DAG files really disappear will now take more time.

The timeout can be configurable, and we can have "0" as default value,
which means that the implementation will be completely backward compatible.
And Airflow administrators can override this value, knowing that they have
the issue described above, and downsides of having this timeout increased.

Any thoughts?

-- 
Eugene

Reply via email to