uranusjr commented on code in PR #29093:
URL: https://github.com/apache/airflow/pull/29093#discussion_r1094097614


##########
airflow/config_templates/config.yml:
##########
@@ -867,6 +867,13 @@ metrics:
       type: string
       example: ~
       default: ~
+    statsd_influxdb_enabled:
+      description: |
+        To enable sending airflow metrics with StatsD-Influxdb tagging 
convention.

Review Comment:
   ```suggestion
           To enable sending Airflow metrics with StatsD-Influxdb tagging 
convention.
   ```



##########
airflow/jobs/scheduler_job.py:
##########
@@ -1240,7 +1240,13 @@ def _update_state(dag: DAG, dag_run: DagRun):
                 # always happening immediately after the data interval.
                 expected_start_date = dag.get_run_data_interval(dag_run).end
                 schedule_delay = dag_run.start_date - expected_start_date
+                # Publish metrics twice with backward compatible name, and 
then with tags
                 Stats.timing(f"dagrun.schedule_delay.{dag.dag_id}", 
schedule_delay)
+                Stats.timing(
+                    "dagrun.schedule_delay",
+                    schedule_delay,
+                    tags={"dag_id": f"{dag.dag_id}"},

Review Comment:
   ```suggestion
                       tags={"dag_id": dag.dag_id},
   ```



##########
airflow/stats.py:
##########
@@ -196,7 +222,23 @@ def stat_name_default_handler(stat_name, max_length=250) 
-> str:
         raise InvalidStatsNameException(
             f"The stat_name ({stat_name}) has to be less than {max_length} 
characters."
         )
-    if not all((c in ALLOWED_CHARACTERS) for c in stat_name):
+    if not all((c in allowed_chars) for c in stat_name):
+        raise InvalidStatsNameException(
+            f"The stat name ({stat_name}) has to be composed of ASCII "
+            f"alphabets, numbers, or the underscore, dot, or dash characters."
+        )
+    return stat_name
+
+
+def stat_name_influxdb_handler(stat_name, max_length=250) -> str:
+    """InfluxDB-Statsd default name validator."""
+    if not isinstance(stat_name, str):
+        raise InvalidStatsNameException("The stat_name has to be a string")
+    if len(stat_name) > max_length:
+        raise InvalidStatsNameException(
+            f"The stat_name ({stat_name}) has to be less than {max_length} 
characters."
+        )
+    if not all((c in ALLOWED_CHARACTERS | set([",", "="])) for c in stat_name):

Review Comment:
   ```suggestion
       if not all((c in {*ALLOWED_CHARACTERS, ",", "="}) for c in stat_name):
   ```



##########
airflow/stats.py:
##########
@@ -206,7 +248,13 @@ def stat_name_default_handler(stat_name, max_length=250) 
-> str:
 
 def get_current_handler_stat_name_func() -> Callable[[str], str]:
     """Get Stat Name Handler from airflow.cfg."""
-    return conf.getimport("metrics", "stat_name_handler") or 
stat_name_default_handler
+    handler = conf.getimport("metrics", "stat_name_handler")
+    if handler is None:
+        if conf.get("metrics", "statsd_influxdb_enabled", fallback=False):
+            handler = partial(stat_name_default_handler, 
allowed_chars=ALLOWED_CHARACTERS | set([",", "="]))

Review Comment:
   ```suggestion
               handler = partial(stat_name_default_handler, 
allowed_chars={*ALLOWED_CHARACTERS, ",", "="})
   ```



##########
airflow/models/dagrun.py:
##########
@@ -877,6 +877,11 @@ def 
_emit_true_scheduling_delay_stats_for_finished_state(self, finished_tis: lis
                 true_delay = first_start_date - data_interval_end
                 if true_delay.total_seconds() > 0:
                     
Stats.timing(f"dagrun.{dag.dag_id}.first_task_scheduling_delay", true_delay)
+                    Stats.timing(
+                        "dagrun.first_task_scheduling_delay",
+                        true_delay,
+                        tags={"dag_id": f"{dag.dag_id}"},

Review Comment:
   ```suggestion
                           tags={"dag_id": dag.dag_id},
   ```



##########
airflow/stats.py:
##########
@@ -250,43 +298,107 @@ def test(self, stat):
             return True  # default is all metrics allowed
 
 
+def prepare_stat_with_tags(fn: T) -> T:
+    """Add tags to stat with influxdb standard format if influxdb_tags_enabled 
is True."""
+
+    @wraps(fn)
+    def wrapper(self, stat=None, *args, tags=None, **kwargs):
+        if self.influxdb_tags_enabled:
+            if stat is not None and tags is not None:
+                for k, v in tags.items():
+                    if not set(",=").intersection(set(v + k)):
+                        stat += f",{k}={v}"
+                    else:
+                        log.error("Dropping invalid tag: %s=%s.", k, v)
+        return fn(self, stat, *args, tags=tags, **kwargs)
+
+    return cast(T, wrapper)
+
+
 class SafeStatsdLogger:
     """StatsD Logger."""
 
-    def __init__(self, statsd_client, 
allow_list_validator=AllowListValidator()):
+    def __init__(
+        self,
+        statsd_client,
+        allow_list_validator=AllowListValidator(),
+        aggregation_optimizer_enabled=False,
+        influxdb_tags_enabled=False,
+    ):
         self.statsd = statsd_client
         self.allow_list_validator = allow_list_validator
+        self.aggregation_optimizer_enabled = aggregation_optimizer_enabled
+        self.influxdb_tags_enabled = influxdb_tags_enabled
 
+    @prepare_stat_with_tags
     @validate_stat
-    def incr(self, stat, count=1, rate=1, tags: dict[str, str] | None = None):
+    def incr(
+        self,
+        stat,
+        count=1,
+        rate=1,
+        *,
+        tags: dict[str, str] | None = None,
+    ):

Review Comment:
   Add types to other arguments while we’re changing these? (Same for other 
functions)



##########
airflow/stats.py:
##########
@@ -250,43 +298,107 @@ def test(self, stat):
             return True  # default is all metrics allowed
 
 
+def prepare_stat_with_tags(fn: T) -> T:
+    """Add tags to stat with influxdb standard format if influxdb_tags_enabled 
is True."""
+
+    @wraps(fn)
+    def wrapper(self, stat=None, *args, tags=None, **kwargs):
+        if self.influxdb_tags_enabled:
+            if stat is not None and tags is not None:
+                for k, v in tags.items():
+                    if not set(",=").intersection(set(v + k)):

Review Comment:
   Probably better to use the same `all((x not in y) for x in v + k)` approach; 
it’s much more readable than using set here.
   
   (Or we can introduce a util function that wraps the set operation to make it 
easier to understand for readers)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to