amoghrajesh commented on code in PR #56187: URL: https://github.com/apache/airflow/pull/56187#discussion_r2567452621
########## shared/observability/src/airflow_shared/observability/metrics/datadog_logger.py: ########## @@ -19,11 +19,11 @@ import datetime import logging +from collections.abc import Callable from typing import TYPE_CHECKING -from airflow.configuration import conf -from airflow.metrics.protocols import Timer -from airflow.metrics.validators import ( +from airflow._shared.observability.metrics.protocols import Timer +from airflow._shared.observability.metrics.validators import ( Review Comment: Same comment throughout the PR ########## shared/observability/src/airflow_shared/observability/metrics/base_stats_logger.py: ########## @@ -19,10 +19,10 @@ from typing import TYPE_CHECKING, Any, Protocol -from airflow.metrics.protocols import Timer +from airflow._shared.observability.metrics.protocols import Timer if TYPE_CHECKING: - from airflow.metrics.protocols import DeltaType + from airflow._shared.observability.metrics.protocols import DeltaType Review Comment: We should be using relative paths here to ensure it works fine in all the libraries its going to be symlinked in. For example: https://github.com/apache/airflow/pull/54449/files#diff-9c616c02bb4f9b87a735f0f215dee6d4325b64e337780ab41ee2c25fc2c7a8bfR17-R51, declare these in `__init__.py` as needed and import those. ########## shared/observability/src/airflow_shared/observability/traces/otel_tracer.py: ########## @@ -26,22 +26,20 @@ from opentelemetry import trace from opentelemetry.context import attach, create_key from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter -from opentelemetry.sdk.resources import HOST_NAME, SERVICE_NAME, Resource +from opentelemetry.sdk.resources import SERVICE_NAME, Resource from opentelemetry.sdk.trace import Span, SpanProcessor, Tracer as OpenTelemetryTracer, TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter, SimpleSpanProcessor from opentelemetry.sdk.trace.id_generator import IdGenerator from opentelemetry.trace import Link, NonRecordingSpan, SpanContext, TraceFlags, Tracer from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator from opentelemetry.trace.span import INVALID_SPAN_ID, INVALID_TRACE_ID -from airflow._shared.timezones import timezone -from airflow.configuration import conf -from airflow.traces.utils import ( +from airflow._shared.observability.traces.utils import ( + datetime_to_nano, parse_traceparent, parse_tracestate, ) -from airflow.utils.dates import datetime_to_nano -from airflow.utils.net import get_hostname +from airflow._shared.timezones import timezone Review Comment: We do not want to have cross shared library dependence as well. Since you only use: `utcnow`, you can just copy that utility to inside this module: ```python def utcnow() -> dt.datetime: """Get the current date and time in UTC.""" return dt.datetime.now(tz=utc) ``` ########## task-sdk/src/airflow/sdk/execution_time/task_runner.py: ########## @@ -116,7 +116,11 @@ from airflow.sdk.execution_time.sentry import Sentry from airflow.sdk.execution_time.xcom import XCom from airflow.sdk.timezone import coerce_datetime -from airflow.stats import Stats + +try: + from airflow.observability.stats import Stats +except ImportError: + from airflow.stats import Stats # type: ignore[attr-defined,no-redef] Review Comment: Same comment as above about importing from core airflow ########## shared/observability/pyproject.toml: ########## @@ -0,0 +1,72 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[project] +name = "apache-airflow-shared-observability" +description = "Shared observability code for Airflow distributions" +version = "0.0" +classifiers = [ + "Private :: Do Not Upload", +] + +dependencies = [ + "pendulum>=3.1.0", + "methodtools>=0.4.7", + "sqlalchemy>=1.4.36", Review Comment: Why are these needed? ########## airflow-core/src/airflow/observability/stats.py: ########## @@ -22,11 +22,11 @@ from collections.abc import Callable Review Comment: task sdk might need to have a similar factory method/module ########## shared/observability/src/airflow_shared/observability/metrics/statsd_logger.py: ########## @@ -155,13 +158,25 @@ def timer( return Timer() -def get_statsd_logger(cls) -> SafeStatsdLogger: +def get_statsd_logger( + cls, + stats_class: Callable[[str], str] | None = None, + host: str | None = None, + port: int | None = None, + prefix: str | None = None, + ipv6: bool = False, + influxdb_tags_enabled: bool = False, + statsd_disabled_tags: str | None = None, + metrics_allow_list: str | None = None, + metrics_block_list: str | None = None, + stat_name_handler: Callable[[str], str] | None = None, + statsd_influxdb_enabled: bool = False, +) -> SafeStatsdLogger: """Return logger for StatsD.""" # no need to check for the scheduler/statsd_on -> this method is only called when it is set # and previously it would crash with None is callable if it was called without it. from statsd import StatsClient - stats_class = conf.getimport("metrics", "statsd_custom_client_path", fallback=None) if stats_class: if not issubclass(stats_class, StatsClient): raise AirflowConfigException( Review Comment: `conf.getimport("metrics", "statsd_custom_client_path", fallback=None)` I mean ########## shared/observability/src/airflow_shared/observability/metrics/statsd_logger.py: ########## @@ -22,10 +22,9 @@ from functools import wraps from typing import TYPE_CHECKING, TypeVar, cast -from airflow.configuration import conf -from airflow.exceptions import AirflowConfigException -from airflow.metrics.protocols import Timer -from airflow.metrics.validators import ( +from airflow._shared.configuration.exceptions import AirflowConfigException Review Comment: Same about this one, need to have it be independent of any other library, have to think of some proposal for this too. ########## airflow-core/src/airflow/assets/manager.py: ########## @@ -36,7 +36,7 @@ DagScheduleAssetReference, DagScheduleAssetUriReference, ) -from airflow.stats import Stats +from airflow.observability.stats import Stats Review Comment: We cannot do this, defeats the purpose of client server separation. Task sdk should either define `Stats` or something along those lines but it cannot come from airflow core ########## shared/observability/src/airflow_shared/observability/metrics/statsd_logger.py: ########## @@ -155,13 +158,25 @@ def timer( return Timer() -def get_statsd_logger(cls) -> SafeStatsdLogger: +def get_statsd_logger( + cls, + stats_class: Callable[[str], str] | None = None, + host: str | None = None, + port: int | None = None, + prefix: str | None = None, + ipv6: bool = False, + influxdb_tags_enabled: bool = False, + statsd_disabled_tags: str | None = None, + metrics_allow_list: str | None = None, + metrics_block_list: str | None = None, + stat_name_handler: Callable[[str], str] | None = None, + statsd_influxdb_enabled: bool = False, +) -> SafeStatsdLogger: """Return logger for StatsD.""" # no need to check for the scheduler/statsd_on -> this method is only called when it is set # and previously it would crash with None is callable if it was called without it. from statsd import StatsClient - stats_class = conf.getimport("metrics", "statsd_custom_client_path", fallback=None) if stats_class: if not issubclass(stats_class, StatsClient): raise AirflowConfigException( Review Comment: Wait since we inject this and dont really read `conf`, we can just get rid of this check or move it factory where we actually read `conf`? ########## task-sdk/src/airflow/sdk/io/fs.py: ########## @@ -26,7 +26,11 @@ from airflow.providers_manager import ProvidersManager from airflow.sdk.module_loading import import_string -from airflow.stats import Stats + +try: + from airflow.observability.stats import Stats +except ImportError: + from airflow.stats import Stats # type: ignore[attr-defined,no-redef] Review Comment: Same as above ########## airflow-core/src/airflow/jobs/dag_processor_job_runner.py: ########## @@ -21,7 +21,7 @@ from airflow.jobs.base_job_runner import BaseJobRunner from airflow.jobs.job import Job, perform_heartbeat -from airflow.stats import Stats +from airflow.observability.stats import Stats Review Comment: I think moving stats into observability is too big of a change with almost no real value. We should not be moving that around. Keep it as stats itself with needed factory methods staying there ########## airflow-core/src/airflow/observability/metrics/datadog_logger.py: ########## @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations Review Comment: Where did this module belong earlier? If it was not in core airflow, it should not come there now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
