xBis7 commented on code in PR #56150:
URL: https://github.com/apache/airflow/pull/56150#discussion_r2737125856
##########
airflow-core/tests/unit/observability/traces/test_otel_tracer.py:
##########
@@ -82,121 +84,176 @@ def test_debug_trace_metaclass(self):
assert isinstance(DebugTrace.factory(), EmptyTrace)
@patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter")
- @patch("airflow.observability.traces.otel_tracer.conf")
- def test_tracer(self, conf_a, exporter):
- # necessary to speed up the span to be emitted
- with env_vars({"OTEL_BSP_SCHEDULE_DELAY": "1"}):
- log = logging.getLogger("TestOtelTrace.test_tracer")
- log.setLevel(logging.DEBUG)
- # hijacking airflow conf with pre-defined
- # values
- conf_a.get.return_value = "abc"
- conf_a.getint.return_value = 123
- # this will enable debug to set - which outputs the result to
console
- conf_a.getboolean.return_value = True
-
- # mocking console exporter with in mem exporter for better
assertion
- in_mem_exporter = InMemorySpanExporter()
- exporter.return_value = in_mem_exporter
+ @patch("airflow._shared.observability.otel_env_config.OtelEnvConfig")
+ @env_vars(
+ {
+ "OTEL_SERVICE_NAME": "my_test_service",
+ # necessary to speed up the span to be emitted
+ "OTEL_BSP_SCHEDULE_DELAY": "1",
+ }
+ )
+ def test_tracer(self, otel_env_conf, exporter):
+ log = logging.getLogger("TestOtelTrace.test_tracer")
+ log.setLevel(logging.DEBUG)
- tracer = otel_tracer.get_otel_tracer(Trace)
- assert conf_a.get.called
- assert conf_a.getint.called
- assert conf_a.getboolean.called
- with tracer.start_span(span_name="span1") as s1:
- with tracer.start_span(span_name="span2") as s2:
- s2.set_attribute("attr2", "val2")
- span2 = json.loads(s2.to_json())
- span1 = json.loads(s1.to_json())
- # assert the two span data
- assert span1["name"] == "span1"
- assert span2["name"] == "span2"
- trace_id = span1["context"]["trace_id"]
- s1_span_id = span1["context"]["span_id"]
- assert span2["context"]["trace_id"] == trace_id
- assert span2["parent_id"] == s1_span_id
- assert span2["attributes"]["attr2"] == "val2"
- assert span2["resource"]["attributes"]["service.name"] == "abc"
+ # mocking console exporter with in mem exporter for better assertion
+ in_mem_exporter = InMemorySpanExporter()
+ exporter.return_value = in_mem_exporter
+
+ tracer = otel_tracer.get_otel_tracer(Trace)
+ assert otel_env_conf.called
+ otel_env_conf.assert_called_once()
+ with tracer.start_span(span_name="span1") as s1:
+ with tracer.start_span(span_name="span2") as s2:
+ s2.set_attribute("attr2", "val2")
+ span2 = json.loads(s2.to_json())
+ span1 = json.loads(s1.to_json())
+ # assert the two span data
+ assert span1["name"] == "span1"
+ assert span2["name"] == "span2"
+ trace_id = span1["context"]["trace_id"]
+ s1_span_id = span1["context"]["span_id"]
+ assert span2["context"]["trace_id"] == trace_id
+ assert span2["parent_id"] == s1_span_id
+ assert span2["attributes"]["attr2"] == "val2"
+ assert span2["resource"]["attributes"]["service.name"] ==
"my_test_service"
@patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter")
- @patch("airflow.observability.traces.otel_tracer.conf")
- def test_dag_tracer(self, conf_a, exporter):
- # necessary to speed up the span to be emitted
- with env_vars({"OTEL_BSP_SCHEDULE_DELAY": "1"}):
- log = logging.getLogger("TestOtelTrace.test_dag_tracer")
- log.setLevel(logging.DEBUG)
- conf_a.get.return_value = "abc"
- conf_a.getint.return_value = 123
- # this will enable debug to set - which outputs the result to
console
- conf_a.getboolean.return_value = True
-
- # mocking console exporter with in mem exporter for better
assertion
- in_mem_exporter = InMemorySpanExporter()
- exporter.return_value = in_mem_exporter
-
- now = datetime.now()
+ @env_vars(
+ {
+ "OTEL_EXPORTER_OTLP_ENDPOINT": "http://localhost:4318",
+ # necessary to speed up the span to be emitted
+ "OTEL_BSP_SCHEDULE_DELAY": "1",
+ }
+ )
+ def test_dag_tracer(self, exporter):
+ log = logging.getLogger("TestOtelTrace.test_dag_tracer")
+ log.setLevel(logging.DEBUG)
- tracer = otel_tracer.get_otel_tracer(Trace)
- with tracer.start_root_span(span_name="span1", start_time=now) as
s1:
- with tracer.start_span(span_name="span2") as s2:
- s2.set_attribute("attr2", "val2")
- span2 = json.loads(s2.to_json())
- span1 = json.loads(s1.to_json())
-
- # The otel sdk, accepts an int for the start_time, and converts it
to an iso string,
- # using `util.ns_to_iso_str()`.
- nano_time = datetime_to_nano(now)
- assert span1["start_time"] == util.ns_to_iso_str(nano_time)
- # Same trace_id
- assert span1["context"]["trace_id"] == span2["context"]["trace_id"]
- assert span1["context"]["span_id"] == span2["parent_id"]
+ # mocking console exporter with in mem exporter for better assertion
+ in_mem_exporter = InMemorySpanExporter()
+ exporter.return_value = in_mem_exporter
- @patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter")
- @patch("airflow.observability.traces.otel_tracer.conf")
- def test_context_propagation(self, conf_a, exporter):
- # necessary to speed up the span to be emitted
- with env_vars({"OTEL_BSP_SCHEDULE_DELAY": "1"}):
- log = logging.getLogger("TestOtelTrace.test_context_propagation")
- log.setLevel(logging.DEBUG)
- conf_a.get.return_value = "abc"
- conf_a.getint.return_value = 123
- # this will enable debug to set - which outputs the result to
console
- conf_a.getboolean.return_value = True
-
- # mocking console exporter with in mem exporter for better
assertion
- in_mem_exporter = InMemorySpanExporter()
- exporter.return_value = in_mem_exporter
-
- # Method that represents another service which is
- # - getting the carrier
- # - extracting the context
- # - using the context to create a new span
- # The new span should be associated with the span from the
injected context carrier.
- def _task_func(otel_tr, carrier):
- parent_context = otel_tr.extract(carrier)
-
- with otel_tr.start_child_span(span_name="sub_span",
parent_context=parent_context) as span:
- span.set_attribute("attr2", "val2")
- json_span = json.loads(span.to_json())
- return json_span
+ now = datetime.now()
- tracer = otel_tracer.get_otel_tracer(Trace)
+ tracer = otel_tracer.get_otel_tracer(Trace)
+ with tracer.start_root_span(span_name="span1", start_time=now) as s1:
+ with tracer.start_span(span_name="span2") as s2:
+ s2.set_attribute("attr2", "val2")
+ span2 = json.loads(s2.to_json())
+ span1 = json.loads(s1.to_json())
+
+ # The otel sdk, accepts an int for the start_time, and converts it to
an iso string,
+ # using `util.ns_to_iso_str()`.
+ nano_time = datetime_to_nano(now)
+ assert span1["start_time"] == util.ns_to_iso_str(nano_time)
+ # Same trace_id
+ assert span1["context"]["trace_id"] == span2["context"]["trace_id"]
+ assert span1["context"]["span_id"] == span2["parent_id"]
+
+ @patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter")
+ @env_vars(
+ {
+ "OTEL_EXPORTER_OTLP_ENDPOINT": "http://localhost:4318",
+ # necessary to speed up the span to be emitted
+ "OTEL_BSP_SCHEDULE_DELAY": "1",
+ }
+ )
+ def test_context_propagation(self, exporter):
+ log = logging.getLogger("TestOtelTrace.test_context_propagation")
+ log.setLevel(logging.DEBUG)
+
+ # mocking console exporter with in mem exporter for better assertion
+ in_mem_exporter = InMemorySpanExporter()
+ exporter.return_value = in_mem_exporter
+
+ # Method that represents another service which is
+ # - getting the carrier
+ # - extracting the context
+ # - using the context to create a new span
+ # The new span should be associated with the span from the injected
context carrier.
+ def _task_func(otel_tr, carrier):
+ parent_context = otel_tr.extract(carrier)
+
+ with otel_tr.start_child_span(span_name="sub_span",
parent_context=parent_context) as span:
+ span.set_attribute("attr2", "val2")
+ json_span = json.loads(span.to_json())
+ return json_span
- root_span = tracer.start_root_span(span_name="root_span",
start_as_current=False)
- # The context is available, it can be injected into the carrier.
- context_carrier = tracer.inject()
+ tracer = otel_tracer.get_otel_tracer(Trace)
- # Some function that uses the carrier to create a new span.
- json_span2 = _task_func(otel_tr=tracer, carrier=context_carrier)
+ root_span = tracer.start_root_span(span_name="root_span",
start_as_current=False)
+ # The context is available, it can be injected into the carrier.
+ context_carrier = tracer.inject()
+
+ # Some function that uses the carrier to create a new span.
+ json_span2 = _task_func(otel_tr=tracer, carrier=context_carrier)
+
+ json_span1 = json.loads(root_span.to_json())
+ # Manually end the span.
+ root_span.end()
+
+ # Verify that span1 is a root span.
+ assert json_span1["parent_id"] is None
+ # Check span2 parent_id to verify that it's a child of span1.
+ assert json_span2["parent_id"] == json_span1["context"]["span_id"]
+ # The trace_id and the span_id are randomly generated by the otel sdk.
+ # Both spans should belong to the same trace.
+ assert json_span1["context"]["trace_id"] ==
json_span2["context"]["trace_id"]
+
+ @pytest.mark.parametrize(
+ ("provided_env_vars", "expected_endpoint", "expected_exporter_module"),
+ [
+ pytest.param(
+ {
+ "OTEL_EXPORTER_OTLP_ENDPOINT": "http://localhost:1234",
+ "OTEL_EXPORTER_OTLP_PROTOCOL": "grpc",
+ "AIRFLOW__TRACES__OTEL_HOST": "breeze-otel-collector",
+ "AIRFLOW__TRACES__OTEL_PORT": "4318",
+ },
+ "localhost:1234",
+ "grpc",
+ id="env_vars_with_grpc",
Review Comment:
When we are reading from the config, there is always a value, even if the
user hasn't set one because we are reading the default.
There isn't a way to know if the user set the airflow config or not without
explicitly setting the fallback to None.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]