jason810496 commented on code in PR #56150: URL: https://github.com/apache/airflow/pull/56150#discussion_r2734710312
########## airflow-core/docs/administration-and-deployment/logging-monitoring/metrics.rst: ########## @@ -79,10 +79,11 @@ Add the Collector details to your configuration file e.g. ``airflow.cfg`` .. note:: - To support the OpenTelemetry exporter standard, the ``metrics`` configurations are transparently overridden by use of standard OpenTelemetry SDK environment variables. Review Comment: Maybe we could add `.. deprecated:: 3.2.0` indent block for the Airflow-level config above and mentioning the best practice. https://github.com/apache/airflow/blob/762d8cc0d644941910f312073aef597d19751a05/airflow-core/docs/administration-and-deployment/logging-monitoring/metrics.rst#L69-L78 ########## airflow-core/docs/administration-and-deployment/logging-monitoring/traces.rst: ########## @@ -45,9 +45,11 @@ Add the following lines to your configuration file e.g. ``airflow.cfg`` .. note:: - To support the OpenTelemetry exporter standard, the ``traces`` configurations are transparently superseded by use of standard OpenTelemetry SDK environment variables. + The following config keys have been deprecated and will be removed in the future + otel_host, otel_port, otel_debugging_on, otel_service, otel_ssl_active Review Comment: ```suggestion ``otel_host``, ``otel_port``, ``otel_debugging_on``, ``otel_service``, ``otel_ssl_active`` ``` ########## airflow-core/tests/unit/observability/traces/test_otel_tracer.py: ########## @@ -82,121 +84,176 @@ def test_debug_trace_metaclass(self): assert isinstance(DebugTrace.factory(), EmptyTrace) @patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter") - @patch("airflow.observability.traces.otel_tracer.conf") - def test_tracer(self, conf_a, exporter): - # necessary to speed up the span to be emitted - with env_vars({"OTEL_BSP_SCHEDULE_DELAY": "1"}): - log = logging.getLogger("TestOtelTrace.test_tracer") - log.setLevel(logging.DEBUG) - # hijacking airflow conf with pre-defined - # values - conf_a.get.return_value = "abc" - conf_a.getint.return_value = 123 - # this will enable debug to set - which outputs the result to console - conf_a.getboolean.return_value = True - - # mocking console exporter with in mem exporter for better assertion - in_mem_exporter = InMemorySpanExporter() - exporter.return_value = in_mem_exporter + @patch("airflow._shared.observability.otel_env_config.OtelEnvConfig") + @env_vars( + { + "OTEL_SERVICE_NAME": "my_test_service", + # necessary to speed up the span to be emitted + "OTEL_BSP_SCHEDULE_DELAY": "1", + } + ) + def test_tracer(self, otel_env_conf, exporter): + log = logging.getLogger("TestOtelTrace.test_tracer") + log.setLevel(logging.DEBUG) - tracer = otel_tracer.get_otel_tracer(Trace) - assert conf_a.get.called - assert conf_a.getint.called - assert conf_a.getboolean.called - with tracer.start_span(span_name="span1") as s1: - with tracer.start_span(span_name="span2") as s2: - s2.set_attribute("attr2", "val2") - span2 = json.loads(s2.to_json()) - span1 = json.loads(s1.to_json()) - # assert the two span data - assert span1["name"] == "span1" - assert span2["name"] == "span2" - trace_id = span1["context"]["trace_id"] - s1_span_id = span1["context"]["span_id"] - assert span2["context"]["trace_id"] == trace_id - assert span2["parent_id"] == s1_span_id - assert span2["attributes"]["attr2"] == "val2" - assert span2["resource"]["attributes"]["service.name"] == "abc" + # mocking console exporter with in mem exporter for better assertion + in_mem_exporter = InMemorySpanExporter() + exporter.return_value = in_mem_exporter + + tracer = otel_tracer.get_otel_tracer(Trace) + assert otel_env_conf.called + otel_env_conf.assert_called_once() + with tracer.start_span(span_name="span1") as s1: + with tracer.start_span(span_name="span2") as s2: + s2.set_attribute("attr2", "val2") + span2 = json.loads(s2.to_json()) + span1 = json.loads(s1.to_json()) + # assert the two span data + assert span1["name"] == "span1" + assert span2["name"] == "span2" + trace_id = span1["context"]["trace_id"] + s1_span_id = span1["context"]["span_id"] + assert span2["context"]["trace_id"] == trace_id + assert span2["parent_id"] == s1_span_id + assert span2["attributes"]["attr2"] == "val2" + assert span2["resource"]["attributes"]["service.name"] == "my_test_service" @patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter") - @patch("airflow.observability.traces.otel_tracer.conf") - def test_dag_tracer(self, conf_a, exporter): - # necessary to speed up the span to be emitted - with env_vars({"OTEL_BSP_SCHEDULE_DELAY": "1"}): - log = logging.getLogger("TestOtelTrace.test_dag_tracer") - log.setLevel(logging.DEBUG) - conf_a.get.return_value = "abc" - conf_a.getint.return_value = 123 - # this will enable debug to set - which outputs the result to console - conf_a.getboolean.return_value = True - - # mocking console exporter with in mem exporter for better assertion - in_mem_exporter = InMemorySpanExporter() - exporter.return_value = in_mem_exporter - - now = datetime.now() + @env_vars( + { + "OTEL_EXPORTER_OTLP_ENDPOINT": "http://localhost:4318", + # necessary to speed up the span to be emitted + "OTEL_BSP_SCHEDULE_DELAY": "1", + } + ) + def test_dag_tracer(self, exporter): + log = logging.getLogger("TestOtelTrace.test_dag_tracer") + log.setLevel(logging.DEBUG) - tracer = otel_tracer.get_otel_tracer(Trace) - with tracer.start_root_span(span_name="span1", start_time=now) as s1: - with tracer.start_span(span_name="span2") as s2: - s2.set_attribute("attr2", "val2") - span2 = json.loads(s2.to_json()) - span1 = json.loads(s1.to_json()) - - # The otel sdk, accepts an int for the start_time, and converts it to an iso string, - # using `util.ns_to_iso_str()`. - nano_time = datetime_to_nano(now) - assert span1["start_time"] == util.ns_to_iso_str(nano_time) - # Same trace_id - assert span1["context"]["trace_id"] == span2["context"]["trace_id"] - assert span1["context"]["span_id"] == span2["parent_id"] + # mocking console exporter with in mem exporter for better assertion + in_mem_exporter = InMemorySpanExporter() + exporter.return_value = in_mem_exporter - @patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter") - @patch("airflow.observability.traces.otel_tracer.conf") - def test_context_propagation(self, conf_a, exporter): - # necessary to speed up the span to be emitted - with env_vars({"OTEL_BSP_SCHEDULE_DELAY": "1"}): - log = logging.getLogger("TestOtelTrace.test_context_propagation") - log.setLevel(logging.DEBUG) - conf_a.get.return_value = "abc" - conf_a.getint.return_value = 123 - # this will enable debug to set - which outputs the result to console - conf_a.getboolean.return_value = True - - # mocking console exporter with in mem exporter for better assertion - in_mem_exporter = InMemorySpanExporter() - exporter.return_value = in_mem_exporter - - # Method that represents another service which is - # - getting the carrier - # - extracting the context - # - using the context to create a new span - # The new span should be associated with the span from the injected context carrier. - def _task_func(otel_tr, carrier): - parent_context = otel_tr.extract(carrier) - - with otel_tr.start_child_span(span_name="sub_span", parent_context=parent_context) as span: - span.set_attribute("attr2", "val2") - json_span = json.loads(span.to_json()) - return json_span + now = datetime.now() - tracer = otel_tracer.get_otel_tracer(Trace) + tracer = otel_tracer.get_otel_tracer(Trace) + with tracer.start_root_span(span_name="span1", start_time=now) as s1: + with tracer.start_span(span_name="span2") as s2: + s2.set_attribute("attr2", "val2") + span2 = json.loads(s2.to_json()) + span1 = json.loads(s1.to_json()) + + # The otel sdk, accepts an int for the start_time, and converts it to an iso string, + # using `util.ns_to_iso_str()`. + nano_time = datetime_to_nano(now) + assert span1["start_time"] == util.ns_to_iso_str(nano_time) + # Same trace_id + assert span1["context"]["trace_id"] == span2["context"]["trace_id"] + assert span1["context"]["span_id"] == span2["parent_id"] + + @patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter") + @env_vars( + { + "OTEL_EXPORTER_OTLP_ENDPOINT": "http://localhost:4318", + # necessary to speed up the span to be emitted + "OTEL_BSP_SCHEDULE_DELAY": "1", + } + ) + def test_context_propagation(self, exporter): + log = logging.getLogger("TestOtelTrace.test_context_propagation") + log.setLevel(logging.DEBUG) + + # mocking console exporter with in mem exporter for better assertion + in_mem_exporter = InMemorySpanExporter() + exporter.return_value = in_mem_exporter + + # Method that represents another service which is + # - getting the carrier + # - extracting the context + # - using the context to create a new span + # The new span should be associated with the span from the injected context carrier. + def _task_func(otel_tr, carrier): + parent_context = otel_tr.extract(carrier) + + with otel_tr.start_child_span(span_name="sub_span", parent_context=parent_context) as span: + span.set_attribute("attr2", "val2") + json_span = json.loads(span.to_json()) + return json_span - root_span = tracer.start_root_span(span_name="root_span", start_as_current=False) - # The context is available, it can be injected into the carrier. - context_carrier = tracer.inject() + tracer = otel_tracer.get_otel_tracer(Trace) - # Some function that uses the carrier to create a new span. - json_span2 = _task_func(otel_tr=tracer, carrier=context_carrier) + root_span = tracer.start_root_span(span_name="root_span", start_as_current=False) + # The context is available, it can be injected into the carrier. + context_carrier = tracer.inject() + + # Some function that uses the carrier to create a new span. + json_span2 = _task_func(otel_tr=tracer, carrier=context_carrier) + + json_span1 = json.loads(root_span.to_json()) + # Manually end the span. + root_span.end() + + # Verify that span1 is a root span. + assert json_span1["parent_id"] is None + # Check span2 parent_id to verify that it's a child of span1. + assert json_span2["parent_id"] == json_span1["context"]["span_id"] + # The trace_id and the span_id are randomly generated by the otel sdk. + # Both spans should belong to the same trace. + assert json_span1["context"]["trace_id"] == json_span2["context"]["trace_id"] + + @pytest.mark.parametrize( + ("provided_env_vars", "expected_endpoint", "expected_exporter_module"), + [ + pytest.param( + { + "OTEL_EXPORTER_OTLP_ENDPOINT": "http://localhost:1234", + "OTEL_EXPORTER_OTLP_PROTOCOL": "grpc", + "AIRFLOW__TRACES__OTEL_HOST": "breeze-otel-collector", + "AIRFLOW__TRACES__OTEL_PORT": "4318", + }, + "localhost:1234", + "grpc", + id="env_vars_with_grpc", Review Comment: Anyway, we could discuss the precedence between the Airflow‑level configuration and the OpenTelemetry environment variables. Regardless of what the precedence is, IMHO, we should raise warning if both Airflow-level config and OpenTelemetry env are set, and we should we should clearly document the precedence between them. ########## airflow-core/docs/administration-and-deployment/logging-monitoring/traces.rst: ########## @@ -45,9 +45,11 @@ Add the following lines to your configuration file e.g. ``airflow.cfg`` .. note:: - To support the OpenTelemetry exporter standard, the ``traces`` configurations are transparently superseded by use of standard OpenTelemetry SDK environment variables. Review Comment: Same as `traces` https://github.com/apache/airflow/blob/762d8cc0d644941910f312073aef597d19751a05/airflow-core/docs/administration-and-deployment/logging-monitoring/traces.rst#L36-L44 ########## airflow-core/tests/unit/observability/traces/test_otel_tracer.py: ########## @@ -82,121 +84,176 @@ def test_debug_trace_metaclass(self): assert isinstance(DebugTrace.factory(), EmptyTrace) @patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter") - @patch("airflow.observability.traces.otel_tracer.conf") - def test_tracer(self, conf_a, exporter): - # necessary to speed up the span to be emitted - with env_vars({"OTEL_BSP_SCHEDULE_DELAY": "1"}): - log = logging.getLogger("TestOtelTrace.test_tracer") - log.setLevel(logging.DEBUG) - # hijacking airflow conf with pre-defined - # values - conf_a.get.return_value = "abc" - conf_a.getint.return_value = 123 - # this will enable debug to set - which outputs the result to console - conf_a.getboolean.return_value = True - - # mocking console exporter with in mem exporter for better assertion - in_mem_exporter = InMemorySpanExporter() - exporter.return_value = in_mem_exporter + @patch("airflow._shared.observability.otel_env_config.OtelEnvConfig") + @env_vars( + { + "OTEL_SERVICE_NAME": "my_test_service", + # necessary to speed up the span to be emitted + "OTEL_BSP_SCHEDULE_DELAY": "1", + } + ) + def test_tracer(self, otel_env_conf, exporter): + log = logging.getLogger("TestOtelTrace.test_tracer") + log.setLevel(logging.DEBUG) - tracer = otel_tracer.get_otel_tracer(Trace) - assert conf_a.get.called - assert conf_a.getint.called - assert conf_a.getboolean.called - with tracer.start_span(span_name="span1") as s1: - with tracer.start_span(span_name="span2") as s2: - s2.set_attribute("attr2", "val2") - span2 = json.loads(s2.to_json()) - span1 = json.loads(s1.to_json()) - # assert the two span data - assert span1["name"] == "span1" - assert span2["name"] == "span2" - trace_id = span1["context"]["trace_id"] - s1_span_id = span1["context"]["span_id"] - assert span2["context"]["trace_id"] == trace_id - assert span2["parent_id"] == s1_span_id - assert span2["attributes"]["attr2"] == "val2" - assert span2["resource"]["attributes"]["service.name"] == "abc" + # mocking console exporter with in mem exporter for better assertion + in_mem_exporter = InMemorySpanExporter() + exporter.return_value = in_mem_exporter + + tracer = otel_tracer.get_otel_tracer(Trace) + assert otel_env_conf.called + otel_env_conf.assert_called_once() + with tracer.start_span(span_name="span1") as s1: + with tracer.start_span(span_name="span2") as s2: + s2.set_attribute("attr2", "val2") + span2 = json.loads(s2.to_json()) + span1 = json.loads(s1.to_json()) + # assert the two span data + assert span1["name"] == "span1" + assert span2["name"] == "span2" + trace_id = span1["context"]["trace_id"] + s1_span_id = span1["context"]["span_id"] + assert span2["context"]["trace_id"] == trace_id + assert span2["parent_id"] == s1_span_id + assert span2["attributes"]["attr2"] == "val2" + assert span2["resource"]["attributes"]["service.name"] == "my_test_service" @patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter") - @patch("airflow.observability.traces.otel_tracer.conf") - def test_dag_tracer(self, conf_a, exporter): - # necessary to speed up the span to be emitted - with env_vars({"OTEL_BSP_SCHEDULE_DELAY": "1"}): - log = logging.getLogger("TestOtelTrace.test_dag_tracer") - log.setLevel(logging.DEBUG) - conf_a.get.return_value = "abc" - conf_a.getint.return_value = 123 - # this will enable debug to set - which outputs the result to console - conf_a.getboolean.return_value = True - - # mocking console exporter with in mem exporter for better assertion - in_mem_exporter = InMemorySpanExporter() - exporter.return_value = in_mem_exporter - - now = datetime.now() + @env_vars( + { + "OTEL_EXPORTER_OTLP_ENDPOINT": "http://localhost:4318", + # necessary to speed up the span to be emitted + "OTEL_BSP_SCHEDULE_DELAY": "1", + } + ) + def test_dag_tracer(self, exporter): + log = logging.getLogger("TestOtelTrace.test_dag_tracer") + log.setLevel(logging.DEBUG) - tracer = otel_tracer.get_otel_tracer(Trace) - with tracer.start_root_span(span_name="span1", start_time=now) as s1: - with tracer.start_span(span_name="span2") as s2: - s2.set_attribute("attr2", "val2") - span2 = json.loads(s2.to_json()) - span1 = json.loads(s1.to_json()) - - # The otel sdk, accepts an int for the start_time, and converts it to an iso string, - # using `util.ns_to_iso_str()`. - nano_time = datetime_to_nano(now) - assert span1["start_time"] == util.ns_to_iso_str(nano_time) - # Same trace_id - assert span1["context"]["trace_id"] == span2["context"]["trace_id"] - assert span1["context"]["span_id"] == span2["parent_id"] + # mocking console exporter with in mem exporter for better assertion + in_mem_exporter = InMemorySpanExporter() + exporter.return_value = in_mem_exporter - @patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter") - @patch("airflow.observability.traces.otel_tracer.conf") - def test_context_propagation(self, conf_a, exporter): - # necessary to speed up the span to be emitted - with env_vars({"OTEL_BSP_SCHEDULE_DELAY": "1"}): - log = logging.getLogger("TestOtelTrace.test_context_propagation") - log.setLevel(logging.DEBUG) - conf_a.get.return_value = "abc" - conf_a.getint.return_value = 123 - # this will enable debug to set - which outputs the result to console - conf_a.getboolean.return_value = True - - # mocking console exporter with in mem exporter for better assertion - in_mem_exporter = InMemorySpanExporter() - exporter.return_value = in_mem_exporter - - # Method that represents another service which is - # - getting the carrier - # - extracting the context - # - using the context to create a new span - # The new span should be associated with the span from the injected context carrier. - def _task_func(otel_tr, carrier): - parent_context = otel_tr.extract(carrier) - - with otel_tr.start_child_span(span_name="sub_span", parent_context=parent_context) as span: - span.set_attribute("attr2", "val2") - json_span = json.loads(span.to_json()) - return json_span + now = datetime.now() - tracer = otel_tracer.get_otel_tracer(Trace) + tracer = otel_tracer.get_otel_tracer(Trace) + with tracer.start_root_span(span_name="span1", start_time=now) as s1: + with tracer.start_span(span_name="span2") as s2: + s2.set_attribute("attr2", "val2") + span2 = json.loads(s2.to_json()) + span1 = json.loads(s1.to_json()) + + # The otel sdk, accepts an int for the start_time, and converts it to an iso string, + # using `util.ns_to_iso_str()`. + nano_time = datetime_to_nano(now) + assert span1["start_time"] == util.ns_to_iso_str(nano_time) + # Same trace_id + assert span1["context"]["trace_id"] == span2["context"]["trace_id"] + assert span1["context"]["span_id"] == span2["parent_id"] + + @patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter") + @env_vars( + { + "OTEL_EXPORTER_OTLP_ENDPOINT": "http://localhost:4318", + # necessary to speed up the span to be emitted + "OTEL_BSP_SCHEDULE_DELAY": "1", + } + ) + def test_context_propagation(self, exporter): + log = logging.getLogger("TestOtelTrace.test_context_propagation") + log.setLevel(logging.DEBUG) + + # mocking console exporter with in mem exporter for better assertion + in_mem_exporter = InMemorySpanExporter() + exporter.return_value = in_mem_exporter + + # Method that represents another service which is + # - getting the carrier + # - extracting the context + # - using the context to create a new span + # The new span should be associated with the span from the injected context carrier. + def _task_func(otel_tr, carrier): + parent_context = otel_tr.extract(carrier) + + with otel_tr.start_child_span(span_name="sub_span", parent_context=parent_context) as span: + span.set_attribute("attr2", "val2") + json_span = json.loads(span.to_json()) + return json_span - root_span = tracer.start_root_span(span_name="root_span", start_as_current=False) - # The context is available, it can be injected into the carrier. - context_carrier = tracer.inject() + tracer = otel_tracer.get_otel_tracer(Trace) - # Some function that uses the carrier to create a new span. - json_span2 = _task_func(otel_tr=tracer, carrier=context_carrier) + root_span = tracer.start_root_span(span_name="root_span", start_as_current=False) + # The context is available, it can be injected into the carrier. + context_carrier = tracer.inject() + + # Some function that uses the carrier to create a new span. + json_span2 = _task_func(otel_tr=tracer, carrier=context_carrier) + + json_span1 = json.loads(root_span.to_json()) + # Manually end the span. + root_span.end() + + # Verify that span1 is a root span. + assert json_span1["parent_id"] is None + # Check span2 parent_id to verify that it's a child of span1. + assert json_span2["parent_id"] == json_span1["context"]["span_id"] + # The trace_id and the span_id are randomly generated by the otel sdk. + # Both spans should belong to the same trace. + assert json_span1["context"]["trace_id"] == json_span2["context"]["trace_id"] + + @pytest.mark.parametrize( + ("provided_env_vars", "expected_endpoint", "expected_exporter_module"), + [ + pytest.param( + { + "OTEL_EXPORTER_OTLP_ENDPOINT": "http://localhost:1234", + "OTEL_EXPORTER_OTLP_PROTOCOL": "grpc", + "AIRFLOW__TRACES__OTEL_HOST": "breeze-otel-collector", + "AIRFLOW__TRACES__OTEL_PORT": "4318", + }, + "localhost:1234", + "grpc", + id="env_vars_with_grpc", Review Comment: I'm not sure would it be better to still respect the Airflow-level config before OpenTelemetry environment variables. > According to the OpenTelemetry specification, configuration is expected to happen through the standard OpenTelemetry environment variables rather than project-specific settings. Even though we have mention the best practice for OpenTelemetry as above, from a compatibility perspective we might still want to respect the project‑level settings, while emitting a warning or even an error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
