hussein-awala commented on code in PR #28711:
URL: https://github.com/apache/airflow/pull/28711#discussion_r1061955624
##########
airflow/jobs/scheduler_job.py:
##########
@@ -151,6 +151,7 @@ def __init__(
# How many seconds do we wait for tasks to heartbeat before mark them
as zombies.
self._zombie_threshold_secs = conf.getint("scheduler",
"scheduler_zombie_task_threshold")
self._standalone_dag_processor = conf.getboolean("scheduler",
"standalone_dag_processor")
+ self._is_dag_processor_activated = conf.getint("scheduler",
"dag_dir_list_interval") >= 0
Review Comment:
As I understood, when `standalone_dag_processor` is set to True, the
standalone processor is not created automatically, we just tell the scheduler
that we don't want to create a dag processor in a new thread, then we need to
create the dag processor in a separate pod/container/process using Airflow CLI,
if we don't run it, all the dags will be considered as stale after
`dag_stale_not_seen_duration` seconds, and they will be deleted from the
Metadata.
With this PR, we can disable the dag file processor agent created in the
scheduler process, and we can run the standalone dag processor each time we
need to process our dags files, without any risk to delete the dags from the
Metadata.
In the CLI there is no condition about `dag_dir_list_interval`, so the
`DagFileProcessorManager` can be created normally, and `if
elapsed_time_since_refresh > self.dag_dir_list_interval` will be always True,
which is similar to providing a 0 or a very small value. In addition, if we run
the standalone dag processor in a custom process (without using the helm chart,
ex CI pipeline), we can provide a different conf value to control the interval
between the dag dir list.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]