xBis7 commented on code in PR #56150:
URL: https://github.com/apache/airflow/pull/56150#discussion_r2746284864


##########
airflow-core/tests/unit/observability/traces/test_otel_tracer.py:
##########
@@ -82,121 +84,176 @@ def test_debug_trace_metaclass(self):
         assert isinstance(DebugTrace.factory(), EmptyTrace)
 
     @patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter")
-    @patch("airflow.observability.traces.otel_tracer.conf")
-    def test_tracer(self, conf_a, exporter):
-        # necessary to speed up the span to be emitted
-        with env_vars({"OTEL_BSP_SCHEDULE_DELAY": "1"}):
-            log = logging.getLogger("TestOtelTrace.test_tracer")
-            log.setLevel(logging.DEBUG)
-            # hijacking airflow conf with pre-defined
-            # values
-            conf_a.get.return_value = "abc"
-            conf_a.getint.return_value = 123
-            # this will enable debug to set - which outputs the result to 
console
-            conf_a.getboolean.return_value = True
-
-            # mocking console exporter with in mem exporter for better 
assertion
-            in_mem_exporter = InMemorySpanExporter()
-            exporter.return_value = in_mem_exporter
+    @patch("airflow._shared.observability.otel_env_config.OtelEnvConfig")
+    @env_vars(
+        {
+            "OTEL_SERVICE_NAME": "my_test_service",
+            # necessary to speed up the span to be emitted
+            "OTEL_BSP_SCHEDULE_DELAY": "1",
+        }
+    )
+    def test_tracer(self, otel_env_conf, exporter):
+        log = logging.getLogger("TestOtelTrace.test_tracer")
+        log.setLevel(logging.DEBUG)
 
-            tracer = otel_tracer.get_otel_tracer(Trace)
-            assert conf_a.get.called
-            assert conf_a.getint.called
-            assert conf_a.getboolean.called
-            with tracer.start_span(span_name="span1") as s1:
-                with tracer.start_span(span_name="span2") as s2:
-                    s2.set_attribute("attr2", "val2")
-                    span2 = json.loads(s2.to_json())
-                span1 = json.loads(s1.to_json())
-            # assert the two span data
-            assert span1["name"] == "span1"
-            assert span2["name"] == "span2"
-            trace_id = span1["context"]["trace_id"]
-            s1_span_id = span1["context"]["span_id"]
-            assert span2["context"]["trace_id"] == trace_id
-            assert span2["parent_id"] == s1_span_id
-            assert span2["attributes"]["attr2"] == "val2"
-            assert span2["resource"]["attributes"]["service.name"] == "abc"
+        # mocking console exporter with in mem exporter for better assertion
+        in_mem_exporter = InMemorySpanExporter()
+        exporter.return_value = in_mem_exporter
+
+        tracer = otel_tracer.get_otel_tracer(Trace)
+        assert otel_env_conf.called
+        otel_env_conf.assert_called_once()
+        with tracer.start_span(span_name="span1") as s1:
+            with tracer.start_span(span_name="span2") as s2:
+                s2.set_attribute("attr2", "val2")
+                span2 = json.loads(s2.to_json())
+            span1 = json.loads(s1.to_json())
+        # assert the two span data
+        assert span1["name"] == "span1"
+        assert span2["name"] == "span2"
+        trace_id = span1["context"]["trace_id"]
+        s1_span_id = span1["context"]["span_id"]
+        assert span2["context"]["trace_id"] == trace_id
+        assert span2["parent_id"] == s1_span_id
+        assert span2["attributes"]["attr2"] == "val2"
+        assert span2["resource"]["attributes"]["service.name"] == 
"my_test_service"
 
     @patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter")
-    @patch("airflow.observability.traces.otel_tracer.conf")
-    def test_dag_tracer(self, conf_a, exporter):
-        # necessary to speed up the span to be emitted
-        with env_vars({"OTEL_BSP_SCHEDULE_DELAY": "1"}):
-            log = logging.getLogger("TestOtelTrace.test_dag_tracer")
-            log.setLevel(logging.DEBUG)
-            conf_a.get.return_value = "abc"
-            conf_a.getint.return_value = 123
-            # this will enable debug to set - which outputs the result to 
console
-            conf_a.getboolean.return_value = True
-
-            # mocking console exporter with in mem exporter for better 
assertion
-            in_mem_exporter = InMemorySpanExporter()
-            exporter.return_value = in_mem_exporter
-
-            now = datetime.now()
+    @env_vars(
+        {
+            "OTEL_EXPORTER_OTLP_ENDPOINT": "http://localhost:4318";,
+            # necessary to speed up the span to be emitted
+            "OTEL_BSP_SCHEDULE_DELAY": "1",
+        }
+    )
+    def test_dag_tracer(self, exporter):
+        log = logging.getLogger("TestOtelTrace.test_dag_tracer")
+        log.setLevel(logging.DEBUG)
 
-            tracer = otel_tracer.get_otel_tracer(Trace)
-            with tracer.start_root_span(span_name="span1", start_time=now) as 
s1:
-                with tracer.start_span(span_name="span2") as s2:
-                    s2.set_attribute("attr2", "val2")
-                    span2 = json.loads(s2.to_json())
-                span1 = json.loads(s1.to_json())
-
-            # The otel sdk, accepts an int for the start_time, and converts it 
to an iso string,
-            # using `util.ns_to_iso_str()`.
-            nano_time = datetime_to_nano(now)
-            assert span1["start_time"] == util.ns_to_iso_str(nano_time)
-            # Same trace_id
-            assert span1["context"]["trace_id"] == span2["context"]["trace_id"]
-            assert span1["context"]["span_id"] == span2["parent_id"]
+        # mocking console exporter with in mem exporter for better assertion
+        in_mem_exporter = InMemorySpanExporter()
+        exporter.return_value = in_mem_exporter
 
-    @patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter")
-    @patch("airflow.observability.traces.otel_tracer.conf")
-    def test_context_propagation(self, conf_a, exporter):
-        # necessary to speed up the span to be emitted
-        with env_vars({"OTEL_BSP_SCHEDULE_DELAY": "1"}):
-            log = logging.getLogger("TestOtelTrace.test_context_propagation")
-            log.setLevel(logging.DEBUG)
-            conf_a.get.return_value = "abc"
-            conf_a.getint.return_value = 123
-            # this will enable debug to set - which outputs the result to 
console
-            conf_a.getboolean.return_value = True
-
-            # mocking console exporter with in mem exporter for better 
assertion
-            in_mem_exporter = InMemorySpanExporter()
-            exporter.return_value = in_mem_exporter
-
-            # Method that represents another service which is
-            #  - getting the carrier
-            #  - extracting the context
-            #  - using the context to create a new span
-            # The new span should be associated with the span from the 
injected context carrier.
-            def _task_func(otel_tr, carrier):
-                parent_context = otel_tr.extract(carrier)
-
-                with otel_tr.start_child_span(span_name="sub_span", 
parent_context=parent_context) as span:
-                    span.set_attribute("attr2", "val2")
-                    json_span = json.loads(span.to_json())
-                return json_span
+        now = datetime.now()
 
-            tracer = otel_tracer.get_otel_tracer(Trace)
+        tracer = otel_tracer.get_otel_tracer(Trace)
+        with tracer.start_root_span(span_name="span1", start_time=now) as s1:
+            with tracer.start_span(span_name="span2") as s2:
+                s2.set_attribute("attr2", "val2")
+                span2 = json.loads(s2.to_json())
+            span1 = json.loads(s1.to_json())
+
+        # The otel sdk, accepts an int for the start_time, and converts it to 
an iso string,
+        # using `util.ns_to_iso_str()`.
+        nano_time = datetime_to_nano(now)
+        assert span1["start_time"] == util.ns_to_iso_str(nano_time)
+        # Same trace_id
+        assert span1["context"]["trace_id"] == span2["context"]["trace_id"]
+        assert span1["context"]["span_id"] == span2["parent_id"]
+
+    @patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter")
+    @env_vars(
+        {
+            "OTEL_EXPORTER_OTLP_ENDPOINT": "http://localhost:4318";,
+            # necessary to speed up the span to be emitted
+            "OTEL_BSP_SCHEDULE_DELAY": "1",
+        }
+    )
+    def test_context_propagation(self, exporter):
+        log = logging.getLogger("TestOtelTrace.test_context_propagation")
+        log.setLevel(logging.DEBUG)
+
+        # mocking console exporter with in mem exporter for better assertion
+        in_mem_exporter = InMemorySpanExporter()
+        exporter.return_value = in_mem_exporter
+
+        # Method that represents another service which is
+        #  - getting the carrier
+        #  - extracting the context
+        #  - using the context to create a new span
+        # The new span should be associated with the span from the injected 
context carrier.
+        def _task_func(otel_tr, carrier):
+            parent_context = otel_tr.extract(carrier)
+
+            with otel_tr.start_child_span(span_name="sub_span", 
parent_context=parent_context) as span:
+                span.set_attribute("attr2", "val2")
+                json_span = json.loads(span.to_json())
+            return json_span
 
-            root_span = tracer.start_root_span(span_name="root_span", 
start_as_current=False)
-            # The context is available, it can be injected into the carrier.
-            context_carrier = tracer.inject()
+        tracer = otel_tracer.get_otel_tracer(Trace)
 
-            # Some function that uses the carrier to create a new span.
-            json_span2 = _task_func(otel_tr=tracer, carrier=context_carrier)
+        root_span = tracer.start_root_span(span_name="root_span", 
start_as_current=False)
+        # The context is available, it can be injected into the carrier.
+        context_carrier = tracer.inject()
+
+        # Some function that uses the carrier to create a new span.
+        json_span2 = _task_func(otel_tr=tracer, carrier=context_carrier)
+
+        json_span1 = json.loads(root_span.to_json())
+        # Manually end the span.
+        root_span.end()
+
+        # Verify that span1 is a root span.
+        assert json_span1["parent_id"] is None
+        # Check span2 parent_id to verify that it's a child of span1.
+        assert json_span2["parent_id"] == json_span1["context"]["span_id"]
+        # The trace_id and the span_id are randomly generated by the otel sdk.
+        # Both spans should belong to the same trace.
+        assert json_span1["context"]["trace_id"] == 
json_span2["context"]["trace_id"]
+
+    @pytest.mark.parametrize(
+        ("provided_env_vars", "expected_endpoint", "expected_exporter_module"),
+        [
+            pytest.param(
+                {
+                    "OTEL_EXPORTER_OTLP_ENDPOINT": "http://localhost:1234";,
+                    "OTEL_EXPORTER_OTLP_PROTOCOL": "grpc",
+                    "AIRFLOW__TRACES__OTEL_HOST": "breeze-otel-collector",
+                    "AIRFLOW__TRACES__OTEL_PORT": "4318",
+                },
+                "localhost:1234",
+                "grpc",
+                id="env_vars_with_grpc",

Review Comment:
   I went over the code with the debugger and in order to change this, we would 
have to change the way the parser works when reading conf values.
   
   Let's take for example, `conf.get()`.
   
   There is a prioritized sequence with options for looking up configs.
   
   We iterate through the sequence
   
   
https://github.com/apache/airflow/blob/337aee896a9102c1d5c002b9708d1a49da9e98ca/shared/configuration/src/airflow_shared/configuration/parser.py#L1008
   
   which is
   
   
https://github.com/apache/airflow/blob/337aee896a9102c1d5c002b9708d1a49da9e98ca/shared/configuration/src/airflow_shared/configuration/parser.py#L165-L177
   
   Any option that doesn't find the config key, returns 
VALUE_NOT_FOUND_SENTINEL. E.g. if the environment doesn't define it, then it 
returns VALUE_NOT_FOUND_SENTINEL.
   
   All of these options have to return VALUE_NOT_FOUND_SENTINEL so that we exit 
the loop and the fallback parameter is checked.
   
   
https://github.com/apache/airflow/blob/337aee896a9102c1d5c002b9708d1a49da9e98ca/shared/configuration/src/airflow_shared/configuration/parser.py#L1008-L1023
   
   If you see the last lookup option is to read the default from the config
   
   
https://github.com/apache/airflow/blob/337aee896a9102c1d5c002b9708d1a49da9e98ca/shared/configuration/src/airflow_shared/configuration/parser.py#L176
   
   For that to return VALUE_NOT_FOUND_SENTINEL, it means that the option isn't 
recognizable by the airflow config template.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to