This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-2-test by this push:
     new b6692ee6ef8 [v3-2-test] Prevent AlreadyRunningBackfill error caused by 
invalid date range request (#66874) (#67250)
b6692ee6ef8 is described below

commit b6692ee6ef80f2f7a35798c8bf9c8bd6b52818e6
Author: Yeonguk Choo <[email protected]>
AuthorDate: Fri May 22 00:44:27 2026 +0900

    [v3-2-test] Prevent AlreadyRunningBackfill error caused by invalid date 
range request (#66874) (#67250)
    
    * Fix OTel timer metrics using Gauge instead of Histogram (#64207) (#66865)
    
    * Fix OTel timer metrics using Gauge instead of Histogram
    
    * Use ExponentialBucketHistogramAggregation for timing metrics
    
    * Use public API import path for ExponentialBucketHistogramAggregation and 
fix histogram map isolation
    
    (cherry picked from commit b2dadd2b7623d0d99f6fea0521bf008b7b957cac)
    
    Co-authored-by: namratachaudhary <[email protected]>
    
    * [v3-2-test] Prevent AlreadyRunningBackfill error caused by invalid date 
range request (#66874)
    
    When a backfill is requested with from_date after to_date, the Backfill
    record was committed before _get_info_list() returned an empty list, leaving
    an orphaned record that blocked subsequent backfills with
    AlreadyRunningBackfill until the scheduler's 2-minute cleanup ran.
    
    Add an InvalidBackfillDateRange exception and validate from_date <= to_date
    at the top of _validate_backfill_params(), before any DB operations.
    
    (cherry picked from commit 153623856efb44a3f60ef9fdb67e22e05609e067)
    
    ---------
    
    Co-authored-by: Rahul Vats <[email protected]>
    Co-authored-by: namratachaudhary <[email protected]>
    Co-authored-by: Park Jiwon <[email protected]>
---
 airflow-core/newsfragments/64207.significant.rst   |  1 +
 .../core_api/routes/public/backfills.py            |  3 ++
 airflow-core/src/airflow/models/backfill.py        | 21 ++++++--
 airflow-core/tests/unit/models/test_backfill.py    | 18 +++++++
 .../observability/metrics/otel_logger.py           | 58 +++++++++++++++++++---
 .../observability/metrics/test_otel_logger.py      | 47 ++++++++++++------
 6 files changed, 125 insertions(+), 23 deletions(-)

diff --git a/airflow-core/newsfragments/64207.significant.rst 
b/airflow-core/newsfragments/64207.significant.rst
new file mode 100644
index 00000000000..3254fa20a54
--- /dev/null
+++ b/airflow-core/newsfragments/64207.significant.rst
@@ -0,0 +1 @@
+OTel timer and timing metrics now use Histogram instead of Gauge, preserving 
count, sum, and bucket distribution across recordings.
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py
index 5d4e112df18..d6373f1d52a 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py
@@ -52,6 +52,7 @@ from airflow.models.backfill import (
     DagNoScheduleException,
     InvalidBackfillConf,
     InvalidBackfillDate,
+    InvalidBackfillDateRange,
     InvalidBackfillDirection,
     InvalidReprocessBehavior,
     _create_backfill,
@@ -265,6 +266,7 @@ def create_backfill(
         InvalidBackfillDirection,
         DagNoScheduleException,
         InvalidBackfillDate,
+        InvalidBackfillDateRange,
         InvalidBackfillConf,
     ) as e:
         raise RequestValidationError(str(e))
@@ -314,6 +316,7 @@ def create_backfill_dry_run(
         InvalidBackfillDirection,
         DagNoScheduleException,
         InvalidBackfillDate,
+        InvalidBackfillDateRange,
         InvalidBackfillConf,
     ) as e:
         raise RequestValidationError(str(e))
diff --git a/airflow-core/src/airflow/models/backfill.py 
b/airflow-core/src/airflow/models/backfill.py
index 24c102f1cfc..4f4dba38bf0 100644
--- a/airflow-core/src/airflow/models/backfill.py
+++ b/airflow-core/src/airflow/models/backfill.py
@@ -100,6 +100,14 @@ class InvalidBackfillDate(AirflowException):
     """
 
 
+class InvalidBackfillDateRange(AirflowException):
+    """
+    Raised when from_date is after to_date in a backfill request.
+
+    :meta private:
+    """
+
+
 class InvalidBackfillConf(AirflowException):
     """
     Raised when the provided ``dag_run_conf`` fails validation against the 
DAG's params.
@@ -259,6 +267,16 @@ def _validate_backfill_params(
     reprocess_behavior: ReprocessBehavior | None,
     dag_run_conf: dict | None = None,
 ) -> None:
+
+    if from_date > to_date:
+        raise InvalidBackfillDateRange(
+            f"from_date ({from_date.isoformat()}) must not be after to_date 
({to_date.isoformat()})."
+        )
+
+    current_time = timezone.utcnow()
+    if from_date >= current_time and to_date >= current_time:
+        raise InvalidBackfillDate("Backfill cannot be executed for future 
dates.")
+
     depends_on_past = any(x.depends_on_past for x in dag.tasks)
     if depends_on_past:
         if reverse is True:
@@ -270,9 +288,6 @@ def _validate_backfill_params(
                 "Dag has tasks for which depends_on_past=True. "
                 "You must set reprocess behavior to reprocess completed or 
reprocess failed."
             )
-    current_time = timezone.utcnow()
-    if from_date >= current_time and to_date >= current_time:
-        raise InvalidBackfillDate("Backfill cannot be executed for future 
dates.")
     if dag_run_conf is not None:
         try:
             dag.params.deep_merge(dag_run_conf).validate()
diff --git a/airflow-core/tests/unit/models/test_backfill.py 
b/airflow-core/tests/unit/models/test_backfill.py
index 2749e0e6776..67b2b727a76 100644
--- a/airflow-core/tests/unit/models/test_backfill.py
+++ b/airflow-core/tests/unit/models/test_backfill.py
@@ -33,6 +33,7 @@ from airflow.models.backfill import (
     BackfillDagRun,
     BackfillDagRunExceptionReason,
     InvalidBackfillConf,
+    InvalidBackfillDateRange,
     InvalidBackfillDirection,
     InvalidReprocessBehavior,
     ReprocessBehavior,
@@ -779,3 +780,20 @@ def test_get_latest_dag_run_row_partitioned(session: 
Session):
     dr = session.scalar(stmt)
     assert dr is not None
     assert dr.start_date == timezone.parse("2026-02-23")
+
+
+def test_create_backfill_from_date_after_to_date_raises(dag_maker, session):
+    with dag_maker(schedule="@daily") as dag:
+        PythonOperator(task_id="hi", python_callable=print)
+    session.commit()
+
+    with pytest.raises(InvalidBackfillDateRange, match="must not be after 
to_date"):
+        _create_backfill(
+            dag_id=dag.dag_id,
+            from_date=pendulum.parse("2026-05-13"),
+            to_date=pendulum.parse("2026-05-12"),
+            max_active_runs=2,
+            reverse=False,
+            triggering_user_name="pytest",
+            dag_run_conf={},
+        )
diff --git 
a/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py 
b/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
index c8db3ee02f9..b7b8effe8dc 100644
--- 
a/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
+++ 
b/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
@@ -30,6 +30,7 @@ from opentelemetry.sdk.metrics._internal.export import (
     ConsoleMetricExporter,
     PeriodicExportingMetricReader,
 )
+from opentelemetry.sdk.metrics.view import 
ExponentialBucketHistogramAggregation, View
 from opentelemetry.sdk.resources import SERVICE_NAME, Resource
 
 from ..common import get_otel_data_exporter
@@ -146,7 +147,8 @@ class _OtelTimer(Timer):
     """
     An implementation of Stats.Timer() which records the result in the OTel 
Metrics Map.
 
-    OpenTelemetry does not have a native timer, we will store the values as a 
Gauge.
+    OpenTelemetry does not have a native timer; values are stored as a 
Histogram so that
+    all observations (count, sum, bucket distribution) are preserved across 
multiple recordings.
 
     :param name: The name of the timer.
     :param tags: Tags to append to the timer.
@@ -160,9 +162,9 @@ class _OtelTimer(Timer):
 
     def stop(self, send: bool = True) -> None:
         super().stop(send)
-        if self.name and send and self.duration:
-            self.otel_logger.metrics_map.set_gauge_value(
-                full_name(prefix=self.otel_logger.prefix, name=self.name), 
self.duration, False, self.tags
+        if self.name and send and self.duration is not None:
+            self.otel_logger.metrics_map.record_histogram_value(
+                full_name(prefix=self.otel_logger.prefix, name=self.name), 
self.duration, self.tags
             )
 
 
@@ -278,11 +280,11 @@ class SafeOtelLogger:
         *,
         tags: Attributes = None,
     ) -> None:
-        """OTel does not have a native timer, stored as a Gauge whose value is 
elapsed ms."""
+        """Record a timing observation as a Histogram to preserve distribution 
information."""
         if self.metrics_validator.test(stat) and 
name_is_otel_safe(self.prefix, stat):
             if isinstance(dt, datetime.timedelta):
                 dt = dt.total_seconds() * 1000.0
-            self.metrics_map.set_gauge_value(full_name(prefix=self.prefix, 
name=stat), float(dt), False, tags)
+            
self.metrics_map.record_histogram_value(full_name(prefix=self.prefix, 
name=stat), float(dt), tags)
 
     def timer(
         self,
@@ -314,15 +316,29 @@ class InternalGauge:
         self.gauge.set(new_value, attributes=self.attributes)
 
 
+class InternalHistogram:
+    """Stores a histogram instrument for timer/timing metrics."""
+
+    def __init__(self, meter, name: str):
+        otel_safe_name = _get_otel_safe_name(name)
+        self.histogram = meter.create_histogram(name=otel_safe_name, unit="ms")
+        log.debug("Created %s as type: %s", otel_safe_name, 
_type_as_str(self.histogram))
+
+    def record(self, value: float, tags: Attributes) -> None:
+        self.histogram.record(value, attributes=tags)
+
+
 class MetricsMap:
     """Stores Otel Instruments."""
 
     def __init__(self, meter):
         self.meter = meter
         self.map = {}
+        self.histograms: dict[str, InternalHistogram] = {}
 
     def clear(self) -> None:
         self.map.clear()
+        self.histograms.clear()
 
     def _create_counter(self, name):
         """Create a new counter or up_down_counter for the provided name."""
@@ -376,6 +392,21 @@ class MetricsMap:
 
         self.map[key].set_value(value, delta)
 
+    def record_histogram_value(self, name: str, value: float, tags: 
Attributes) -> None:
+        """
+        Record a timing observation in a Histogram instrument.
+
+        Unlike a Gauge, a Histogram accumulates all observations so that 
count, sum,
+        and bucket distribution are preserved across multiple recordings.
+
+        :param name: The name of the histogram to record.
+        :param value: The timing observation in milliseconds.
+        :param tags: Attributes to attach to the observation.
+        """
+        if name not in self.histograms:
+            self.histograms[name] = InternalHistogram(meter=self.meter, 
name=name)
+        self.histograms[name].record(value, tags)
+
 
 def flush_otel_metrics():
     provider = metrics.get_meter_provider()
@@ -400,6 +431,15 @@ def get_otel_logger(
     stat_name_handler: Callable[[str], str] | None = None,
     statsd_influxdb_enabled: bool = False,
 ) -> SafeOtelLogger:
+    """
+    Build and return a :class:`SafeOtelLogger` backed by a configured 
:class:`MeterProvider`.
+
+    Histogram instruments (used for ``timing()`` / ``timer()`` metrics) are 
aggregated with
+    
:class:`~opentelemetry.sdk.metrics.view.ExponentialBucketHistogramAggregation`
+    so that bucket boundaries adapt automatically to the observed data range.  
This avoids
+    the need to hand-tune explicit bucket boundaries for metrics that span 
very different
+    scales (milliseconds to hours).
+    """
     otel_env_config = load_metrics_env_config()
 
     effective_service_name: str = otel_env_config.service_name or service_name 
or "airflow"
@@ -453,6 +493,12 @@ def get_otel_logger(
         MeterProvider(
             resource=resource,
             metric_readers=readers,
+            views=[
+                View(
+                    instrument_type=metrics.Histogram,
+                    aggregation=ExponentialBucketHistogramAggregation(),
+                )
+            ],
             shutdown_on_exit=False,
         ),
     )
diff --git 
a/shared/observability/tests/observability/metrics/test_otel_logger.py 
b/shared/observability/tests/observability/metrics/test_otel_logger.py
index f7b348354d7..3c1369ec44b 100644
--- a/shared/observability/tests/observability/metrics/test_otel_logger.py
+++ b/shared/observability/tests/observability/metrics/test_otel_logger.py
@@ -25,6 +25,7 @@ from unittest import mock
 
 import pytest
 from opentelemetry.metrics import MeterProvider
+from opentelemetry.sdk.metrics.view import 
ExponentialBucketHistogramAggregation, View
 
 from airflow_shared.observability.common import get_otel_data_exporter
 from airflow_shared.observability.exceptions import InvalidStatsNameException
@@ -244,25 +245,28 @@ class TestOtelMetrics:
 
         self.stats.timing(name, dt=datetime.timedelta(seconds=123))
 
-        
self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name))
-        expected_value = 123000.0
-        assert self.map[full_name(name)].value == expected_value
+        
self.meter.get_meter().create_histogram.assert_called_once_with(name=full_name(name),
 unit="ms")
+        
self.meter.get_meter().create_histogram.return_value.record.assert_called_once_with(
+            123000.0, attributes=None
+        )
 
     def test_timing_new_metric_with_tags(self, name):
         tags = {"hello": "world"}
-        key = _generate_key_name(full_name(name), tags)
 
         self.stats.timing(name, dt=1, tags=tags)
 
-        
self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name))
-        self.map[key].attributes == tags
+        
self.meter.get_meter().create_histogram.assert_called_once_with(name=full_name(name),
 unit="ms")
+        
self.meter.get_meter().create_histogram.return_value.record.assert_called_once_with(
+            1.0, attributes=tags
+        )
 
     def test_timing_existing_metric(self, name):
         self.stats.timing(name, dt=1)
         self.stats.timing(name, dt=2)
 
-        
self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name))
-        assert self.map[full_name(name)].value == 2
+        # histogram created only once, but both observations are recorded
+        
self.meter.get_meter().create_histogram.assert_called_once_with(name=full_name(name),
 unit="ms")
+        assert 
self.meter.get_meter().create_histogram.return_value.record.call_count == 2
 
     # For the four test_timer_foo tests below:
     #   time.perf_count() is called once to get the starting timestamp and 
again
@@ -277,7 +281,7 @@ class TestOtelMetrics:
         expected_duration = 3140.0
         assert timer.duration == expected_duration
         assert mock_time.call_count == 2
-        
self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name))
+        
self.meter.get_meter().create_histogram.assert_called_once_with(name=full_name(name),
 unit="ms")
 
     @mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
     def test_timer_no_name_returns_float_but_does_not_store_value(self, 
mock_time, name):
@@ -288,7 +292,7 @@ class TestOtelMetrics:
         expected_duration = 3140.0
         assert timer.duration == expected_duration
         assert mock_time.call_count == 2
-        self.meter.get_meter().create_gauge.assert_not_called()
+        self.meter.get_meter().create_histogram.assert_not_called()
 
     @mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
     def test_timer_start_and_stop_manually_send_false(self, mock_time, name):
@@ -301,7 +305,7 @@ class TestOtelMetrics:
         expected_value = 3140.0
         assert timer.duration == expected_value
         assert mock_time.call_count == 2
-        self.meter.get_meter().create_gauge.assert_not_called()
+        self.meter.get_meter().create_histogram.assert_not_called()
 
     @mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
     def test_timer_start_and_stop_manually_send_true(self, mock_time, name):
@@ -314,7 +318,7 @@ class TestOtelMetrics:
         expected_value = 3140.0
         assert timer.duration == expected_value
         assert mock_time.call_count == 2
-        
self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name))
+        
self.meter.get_meter().create_histogram.assert_called_once_with(name=full_name(name),
 unit="ms")
 
     @pytest.mark.parametrize(
         (
@@ -415,6 +419,18 @@ class TestOtelMetrics:
                 == 
f"opentelemetry.exporter.otlp.proto.{expected_exporter_module}.metric_exporter"
             )
 
+    @mock.patch("airflow_shared.observability.metrics.otel_logger.metrics")
+    
@mock.patch("airflow_shared.observability.metrics.otel_logger.MeterProvider")
+    def test_get_otel_logger_uses_exponential_histogram_view(self, 
mock_provider, mock_metrics):
+        get_otel_logger(host="localhost", port=4318)
+
+        call_kwargs = mock_provider.call_args.kwargs
+        views = call_kwargs["views"]
+        assert len(views) == 1
+        view = views[0]
+        assert isinstance(view, View)
+        assert isinstance(view._aggregation, 
ExponentialBucketHistogramAggregation)
+
     def test_atexit_flush_on_process_exit(self):
         """
         Run a process that initializes a logger, creates a stat and then exits.
@@ -422,8 +438,11 @@ class TestOtelMetrics:
         The logger initialization registers an atexit hook.
         Test that the hook runs and flushes the created stat at shutdown.
         """
-        test_module_name = "tests.observability.metrics.test_otel_logger"
-        function_call_str = f"import {test_module_name} as m; 
m.mock_service_run()"
+        function_call_str = (
+            "from airflow_shared.observability.metrics.otel_logger import 
get_otel_logger; "
+            "logger = get_otel_logger(debug=True); "
+            "logger.incr('my_test_stat')"
+        )
 
         proc = subprocess.run(
             [sys.executable, "-c", function_call_str],

Reply via email to