potiuk commented on code in PR #30255:
URL: https://github.com/apache/airflow/pull/30255#discussion_r1156751543
##########
airflow/cli/commands/dag_processor_command.py:
##########
@@ -19,30 +19,40 @@
import logging
from datetime import timedelta
+from typing import Any
import daemon
from daemon.pidfile import TimeoutPIDLockFile
from airflow import settings
from airflow.configuration import conf
-from airflow.jobs.dag_processor_job import DagProcessorJob
+from airflow.jobs.base_job import BaseJob
+from airflow.jobs.dag_processor_job import DagProcessorJobRunner
from airflow.utils import cli as cli_utils
from airflow.utils.cli import setup_locations, setup_logging
log = logging.getLogger(__name__)
+dag_processor_job_runner: DagProcessorJobRunner | None = None
-def _create_dag_processor_job(args) -> DagProcessorJob:
+
+def _create_dag_processor_job(args: Any) -> BaseJob:
"""Creates DagFileProcessorProcess instance."""
+ from airflow.dag_processing.manager import DagFileProcessorManager
+
processor_timeout_seconds: int = conf.getint("core",
"dag_file_processor_timeout")
processor_timeout = timedelta(seconds=processor_timeout_seconds)
- return DagProcessorJob(
+ global dag_processor_job_runner
+ processor = DagFileProcessorManager(
processor_timeout=processor_timeout,
dag_directory=args.subdir,
max_runs=args.num_runs,
dag_ids=[],
pickle_dags=args.do_pickle,
)
+ return BaseJob(
+ job_runner=processor.job_runner,
+ )
Review Comment:
Just to explain - with this kind of staged refactor, some things were added
just to make each of the steps smaller and they are really temporary - this is
the case here. I could have also merged it all in one big PR, but this one big
PR would be rather difficult to review in one go (and when I did it in the past
I was asked to split it into reviewable pieces). But I can see if I can remove
that one now looking at the final stage without addign too much to this PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]