This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c858509d18 Fix DagProcessorJob integration for standalone
dag-processor (#30278)
c858509d18 is described below
commit c858509d186929965219f0d6dce6621dd8edf154
Author: Jarek Potiuk <[email protected]>
AuthorDate: Fri Mar 24 18:31:37 2023 +0100
Fix DagProcessorJob integration for standalone dag-processor (#30278)
The DagProcessorJob integration implemented in #28799 was not
complete. It missed a few crucial changes:
* importing DagProcessorJob in airflow/models/__init__.py - not
importing it there caused `airflow jobs check` to fail, when
querying DagProcessorJob in the BaseJob query, because
the DagProcessorJob was not registered by the time the query
was run (so polimorphic ORM model retrieval was not aware of
DagProcessorJob model.
* airflow jobs check command did not have DagProcessorJob
added as valid job type, so it was impossible to monitor for it
* also the processor manager did not set heartbeats periodically,
so the Job for the DagFileProcessor was considered as not alive
pretty quickly even if standalone dag-processor was running.
This PR fixes all three problems.
Fixes: #30251
---
airflow/cli/cli_config.py | 2 +-
airflow/dag_processing/manager.py | 5 +++++
airflow/jobs/dag_processor_job.py | 1 +
airflow/models/__init__.py | 1 +
4 files changed, 8 insertions(+), 1 deletion(-)
diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py
index f687a478d6..5502a68fb8 100644
--- a/airflow/cli/cli_config.py
+++ b/airflow/cli/cli_config.py
@@ -1014,7 +1014,7 @@ ARG_MIN_PENDING_MINUTES = Arg(
# jobs check
ARG_JOB_TYPE_FILTER = Arg(
("--job-type",),
- choices=("BackfillJob", "LocalTaskJob", "SchedulerJob", "TriggererJob"),
+ choices=("BackfillJob", "LocalTaskJob", "SchedulerJob", "TriggererJob",
"DagProcessorJob"),
action="store",
help="The type of job(s) that will be checked.",
)
diff --git a/airflow/dag_processing/manager.py
b/airflow/dag_processing/manager.py
index b8bd7332f6..2b603a0a38 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -46,6 +46,7 @@ from airflow.api_internal.internal_api_call import
internal_api_call
from airflow.callbacks.callback_requests import CallbackRequest,
SlaCallbackRequest
from airflow.configuration import conf
from airflow.dag_processing.processor import DagFileProcessorProcess
+from airflow.jobs.base_job import BaseJob
from airflow.models import errors
from airflow.models.dag import DagModel
from airflow.models.dagwarning import DagWarning
@@ -380,6 +381,7 @@ class DagFileProcessorManager(LoggingMixin):
pickle_dags: bool,
signal_conn: MultiprocessingConnection | None = None,
async_mode: bool = True,
+ job: BaseJob | None = None,
):
super().__init__()
# known files; this will be updated every `dag_dir_list_interval` and
stuff added/removed accordingly
@@ -393,6 +395,7 @@ class DagFileProcessorManager(LoggingMixin):
self._async_mode = async_mode
self._parsing_start_time: int | None = None
self._dag_directory = dag_directory
+ self._job = job
# Set the signal conn in to non-blocking mode, so that attempting to
# send when the buffer is full errors, rather than hangs for-ever
@@ -573,6 +576,8 @@ class DagFileProcessorManager(LoggingMixin):
while True:
loop_start_time = time.monotonic()
ready = multiprocessing.connection.wait(self.waitables.keys(),
timeout=poll_time)
+ if self._job:
+ self._job.heartbeat()
if self._direct_scheduler_conn is not None and
self._direct_scheduler_conn in ready:
agent_signal = self._direct_scheduler_conn.recv()
diff --git a/airflow/jobs/dag_processor_job.py
b/airflow/jobs/dag_processor_job.py
index 70690a78db..b12dc5b762 100644
--- a/airflow/jobs/dag_processor_job.py
+++ b/airflow/jobs/dag_processor_job.py
@@ -54,6 +54,7 @@ class DagProcessorJob(BaseJob):
processor_timeout=processor_timeout,
dag_ids=dag_ids,
pickle_dags=pickle_dags,
+ job=self,
)
super().__init__(*args, **kwargs)
diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py
index 281ffcfd67..3dd2ddc9ff 100644
--- a/airflow/models/__init__.py
+++ b/airflow/models/__init__.py
@@ -61,6 +61,7 @@ def import_all_models():
import airflow.jobs.backfill_job
import airflow.jobs.base_job
+ import airflow.jobs.dag_processor_job
import airflow.jobs.local_task_job
import airflow.jobs.scheduler_job
import airflow.jobs.triggerer_job