o-nikolas commented on code in PR #52815:
URL: https://github.com/apache/airflow/pull/52815#discussion_r2462163036
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1315,6 +1315,11 @@ def _run_scheduler_loop(self) -> None:
self._emit_running_ti_metrics,
)
+ timers.call_regular_interval(
+ conf.getfloat("scheduler", "dagrun_metrics_interval",
fallback=30.0),
Review Comment:
Does this config exist? I don't see an update to config_templates/config.yml
here?
##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -7086,6 +7086,33 @@ def
test_process_expired_deadlines_no_deadlines_found(self, mock_handle_miss, se
# The handler should not be called, but no exceptions should be raised
either.`
mock_handle_miss.assert_not_called()
+ def test_emit_running_dags_metric(self, dag_maker, monkeypatch):
+ """Test that the running_dags metric is emitted correctly."""
+ from airflow.utils.state import DagRunState
+
+ with dag_maker("metric_dag") as dag:
+ _ = dag
+ dag_maker.create_dagrun(run_id="run_1", state=DagRunState.RUNNING,
logical_date=timezone.utcnow())
+ dag_maker.create_dagrun(
+ run_id="run_2", state=DagRunState.RUNNING,
logical_date=timezone.utcnow() + timedelta(hours=1)
+ )
+
+ recorded: list[tuple[str, int]] = []
+
+ def _fake_gauge(metric: str, value: int, *_, **__):
+ recorded.append((metric, value))
+
+ monkeypatch.setattr("airflow.dag_processing.manager.Stats.gauge",
_fake_gauge, raising=True)
+
+ with conf_vars({("metrics", "statsd_on"): "True"}):
+ # mock_executor = MagicMock(spec=BaseExecutor)
+ # scheduler_job = Job(executor=mock_executor)
Review Comment:
Debugging commenting left?
##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -1024,6 +1024,13 @@ def prepare_file_queue(self, known_files: dict[str,
set[DagFileInfo]]):
self._add_files_to_queue(to_queue, False)
Stats.incr("dag_processing.file_path_queue_update_count")
+ def _is_metrics_enabled(self):
Review Comment:
Where is this used?
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2235,6 +2240,12 @@ def _emit_running_ti_metrics(self, session: Session =
NEW_SESSION) -> None:
self.previous_ti_running_metrics = ti_running_metrics
+ @provide_session
+ def _emit_running_dags_metric(self, session: Session = NEW_SESSION) ->
None:
+ stmt = select(func.count()).select_from(DagRun).where(DagRun.state ==
DagRunState.RUNNING)
+ running_dags = session.scalar(stmt)
+ Stats.gauge("executor.running_dags", running_dags)
Review Comment:
Sorry, I'm late to this PR. Why are we emitting this metric under `executor`
when executors have nothing to do with dag level scheduling? Shouldn't this be
under `scheduler`? The config you're using above even says
`scheduler.dagrun_metrics_interval`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]