amoghrajesh commented on code in PR #56187:
URL: https://github.com/apache/airflow/pull/56187#discussion_r2567452621


##########
shared/observability/src/airflow_shared/observability/metrics/datadog_logger.py:
##########
@@ -19,11 +19,11 @@
 
 import datetime
 import logging
+from collections.abc import Callable
 from typing import TYPE_CHECKING
 
-from airflow.configuration import conf
-from airflow.metrics.protocols import Timer
-from airflow.metrics.validators import (
+from airflow._shared.observability.metrics.protocols import Timer
+from airflow._shared.observability.metrics.validators import (

Review Comment:
   Same comment throughout the PR



##########
shared/observability/src/airflow_shared/observability/metrics/base_stats_logger.py:
##########
@@ -19,10 +19,10 @@
 
 from typing import TYPE_CHECKING, Any, Protocol
 
-from airflow.metrics.protocols import Timer
+from airflow._shared.observability.metrics.protocols import Timer
 
 if TYPE_CHECKING:
-    from airflow.metrics.protocols import DeltaType
+    from airflow._shared.observability.metrics.protocols import DeltaType

Review Comment:
   We should be using relative paths here to ensure it works fine in all the 
libraries its going to be symlinked in. For example: 
https://github.com/apache/airflow/pull/54449/files#diff-9c616c02bb4f9b87a735f0f215dee6d4325b64e337780ab41ee2c25fc2c7a8bfR17-R51,
 declare these in `__init__.py` as needed and import those.



##########
shared/observability/src/airflow_shared/observability/traces/otel_tracer.py:
##########
@@ -26,22 +26,20 @@
 from opentelemetry import trace
 from opentelemetry.context import attach, create_key
 from opentelemetry.exporter.otlp.proto.http.trace_exporter import 
OTLPSpanExporter
-from opentelemetry.sdk.resources import HOST_NAME, SERVICE_NAME, Resource
+from opentelemetry.sdk.resources import SERVICE_NAME, Resource
 from opentelemetry.sdk.trace import Span, SpanProcessor, Tracer as 
OpenTelemetryTracer, TracerProvider
 from opentelemetry.sdk.trace.export import BatchSpanProcessor, 
ConsoleSpanExporter, SimpleSpanProcessor
 from opentelemetry.sdk.trace.id_generator import IdGenerator
 from opentelemetry.trace import Link, NonRecordingSpan, SpanContext, 
TraceFlags, Tracer
 from opentelemetry.trace.propagation.tracecontext import 
TraceContextTextMapPropagator
 from opentelemetry.trace.span import INVALID_SPAN_ID, INVALID_TRACE_ID
 
-from airflow._shared.timezones import timezone
-from airflow.configuration import conf
-from airflow.traces.utils import (
+from airflow._shared.observability.traces.utils import (
+    datetime_to_nano,
     parse_traceparent,
     parse_tracestate,
 )
-from airflow.utils.dates import datetime_to_nano
-from airflow.utils.net import get_hostname
+from airflow._shared.timezones import timezone

Review Comment:
   We do not want to have cross shared library dependence as well. Since you 
only use: `utcnow`, you can just copy that utility to inside this module:
   ```python
   def utcnow() -> dt.datetime:
       """Get the current date and time in UTC."""
       return dt.datetime.now(tz=utc)
   ```



##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -116,7 +116,11 @@
 from airflow.sdk.execution_time.sentry import Sentry
 from airflow.sdk.execution_time.xcom import XCom
 from airflow.sdk.timezone import coerce_datetime
-from airflow.stats import Stats
+
+try:
+    from airflow.observability.stats import Stats
+except ImportError:
+    from airflow.stats import Stats  # type: ignore[attr-defined,no-redef]

Review Comment:
   Same comment as above about importing from core airflow



##########
shared/observability/pyproject.toml:
##########
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[project]
+name = "apache-airflow-shared-observability"
+description = "Shared observability code for Airflow distributions"
+version = "0.0"
+classifiers = [
+    "Private :: Do Not Upload",
+]
+
+dependencies = [
+    "pendulum>=3.1.0",
+    "methodtools>=0.4.7",
+    "sqlalchemy>=1.4.36",

Review Comment:
   Why are these needed?



##########
airflow-core/src/airflow/observability/stats.py:
##########
@@ -22,11 +22,11 @@
 from collections.abc import Callable

Review Comment:
   task sdk might need to have a similar factory method/module



##########
shared/observability/src/airflow_shared/observability/metrics/statsd_logger.py:
##########
@@ -155,13 +158,25 @@ def timer(
         return Timer()
 
 
-def get_statsd_logger(cls) -> SafeStatsdLogger:
+def get_statsd_logger(
+    cls,
+    stats_class: Callable[[str], str] | None = None,
+    host: str | None = None,
+    port: int | None = None,
+    prefix: str | None = None,
+    ipv6: bool = False,
+    influxdb_tags_enabled: bool = False,
+    statsd_disabled_tags: str | None = None,
+    metrics_allow_list: str | None = None,
+    metrics_block_list: str | None = None,
+    stat_name_handler: Callable[[str], str] | None = None,
+    statsd_influxdb_enabled: bool = False,
+) -> SafeStatsdLogger:
     """Return logger for StatsD."""
     # no need to check for the scheduler/statsd_on -> this method is only 
called when it is set
     # and previously it would crash with None is callable if it was called 
without it.
     from statsd import StatsClient
 
-    stats_class = conf.getimport("metrics", "statsd_custom_client_path", 
fallback=None)
     if stats_class:
         if not issubclass(stats_class, StatsClient):
             raise AirflowConfigException(

Review Comment:
   `conf.getimport("metrics", "statsd_custom_client_path", fallback=None)` I 
mean



##########
shared/observability/src/airflow_shared/observability/metrics/statsd_logger.py:
##########
@@ -22,10 +22,9 @@
 from functools import wraps
 from typing import TYPE_CHECKING, TypeVar, cast
 
-from airflow.configuration import conf
-from airflow.exceptions import AirflowConfigException
-from airflow.metrics.protocols import Timer
-from airflow.metrics.validators import (
+from airflow._shared.configuration.exceptions import AirflowConfigException

Review Comment:
   Same about this one, need to have it be independent of any other library, 
have to think of some proposal for this too.



##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -36,7 +36,7 @@
     DagScheduleAssetReference,
     DagScheduleAssetUriReference,
 )
-from airflow.stats import Stats
+from airflow.observability.stats import Stats

Review Comment:
   We cannot do this, defeats the purpose of client server separation. Task sdk 
should either define `Stats` or something along those lines but it cannot come 
from airflow core



##########
shared/observability/src/airflow_shared/observability/metrics/statsd_logger.py:
##########
@@ -155,13 +158,25 @@ def timer(
         return Timer()
 
 
-def get_statsd_logger(cls) -> SafeStatsdLogger:
+def get_statsd_logger(
+    cls,
+    stats_class: Callable[[str], str] | None = None,
+    host: str | None = None,
+    port: int | None = None,
+    prefix: str | None = None,
+    ipv6: bool = False,
+    influxdb_tags_enabled: bool = False,
+    statsd_disabled_tags: str | None = None,
+    metrics_allow_list: str | None = None,
+    metrics_block_list: str | None = None,
+    stat_name_handler: Callable[[str], str] | None = None,
+    statsd_influxdb_enabled: bool = False,
+) -> SafeStatsdLogger:
     """Return logger for StatsD."""
     # no need to check for the scheduler/statsd_on -> this method is only 
called when it is set
     # and previously it would crash with None is callable if it was called 
without it.
     from statsd import StatsClient
 
-    stats_class = conf.getimport("metrics", "statsd_custom_client_path", 
fallback=None)
     if stats_class:
         if not issubclass(stats_class, StatsClient):
             raise AirflowConfigException(

Review Comment:
   Wait since we inject this and dont really read `conf`, we can just get rid 
of this check or move it factory where we actually read `conf`?



##########
task-sdk/src/airflow/sdk/io/fs.py:
##########
@@ -26,7 +26,11 @@
 
 from airflow.providers_manager import ProvidersManager
 from airflow.sdk.module_loading import import_string
-from airflow.stats import Stats
+
+try:
+    from airflow.observability.stats import Stats
+except ImportError:
+    from airflow.stats import Stats  # type: ignore[attr-defined,no-redef]

Review Comment:
   Same as above
   



##########
airflow-core/src/airflow/jobs/dag_processor_job_runner.py:
##########
@@ -21,7 +21,7 @@
 
 from airflow.jobs.base_job_runner import BaseJobRunner
 from airflow.jobs.job import Job, perform_heartbeat
-from airflow.stats import Stats
+from airflow.observability.stats import Stats

Review Comment:
   I think moving stats into observability is too big of a change with almost 
no real value. We should not be moving that around. Keep it as stats itself 
with needed factory methods staying there



##########
airflow-core/src/airflow/observability/metrics/datadog_logger.py:
##########
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations

Review Comment:
   Where did this module belong earlier? If it was not in core airflow, it 
should not come there now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to