jason810496 commented on code in PR #56150:
URL: https://github.com/apache/airflow/pull/56150#discussion_r2745043758
##########
airflow-core/tests/unit/observability/traces/test_otel_tracer.py:
##########
@@ -82,121 +84,176 @@ def test_debug_trace_metaclass(self):
assert isinstance(DebugTrace.factory(), EmptyTrace)
@patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter")
- @patch("airflow.observability.traces.otel_tracer.conf")
- def test_tracer(self, conf_a, exporter):
- # necessary to speed up the span to be emitted
- with env_vars({"OTEL_BSP_SCHEDULE_DELAY": "1"}):
- log = logging.getLogger("TestOtelTrace.test_tracer")
- log.setLevel(logging.DEBUG)
- # hijacking airflow conf with pre-defined
- # values
- conf_a.get.return_value = "abc"
- conf_a.getint.return_value = 123
- # this will enable debug to set - which outputs the result to
console
- conf_a.getboolean.return_value = True
-
- # mocking console exporter with in mem exporter for better
assertion
- in_mem_exporter = InMemorySpanExporter()
- exporter.return_value = in_mem_exporter
+ @patch("airflow._shared.observability.otel_env_config.OtelEnvConfig")
+ @env_vars(
+ {
+ "OTEL_SERVICE_NAME": "my_test_service",
+ # necessary to speed up the span to be emitted
+ "OTEL_BSP_SCHEDULE_DELAY": "1",
+ }
+ )
+ def test_tracer(self, otel_env_conf, exporter):
+ log = logging.getLogger("TestOtelTrace.test_tracer")
+ log.setLevel(logging.DEBUG)
- tracer = otel_tracer.get_otel_tracer(Trace)
- assert conf_a.get.called
- assert conf_a.getint.called
- assert conf_a.getboolean.called
- with tracer.start_span(span_name="span1") as s1:
- with tracer.start_span(span_name="span2") as s2:
- s2.set_attribute("attr2", "val2")
- span2 = json.loads(s2.to_json())
- span1 = json.loads(s1.to_json())
- # assert the two span data
- assert span1["name"] == "span1"
- assert span2["name"] == "span2"
- trace_id = span1["context"]["trace_id"]
- s1_span_id = span1["context"]["span_id"]
- assert span2["context"]["trace_id"] == trace_id
- assert span2["parent_id"] == s1_span_id
- assert span2["attributes"]["attr2"] == "val2"
- assert span2["resource"]["attributes"]["service.name"] == "abc"
+ # mocking console exporter with in mem exporter for better assertion
+ in_mem_exporter = InMemorySpanExporter()
+ exporter.return_value = in_mem_exporter
+
+ tracer = otel_tracer.get_otel_tracer(Trace)
+ assert otel_env_conf.called
+ otel_env_conf.assert_called_once()
+ with tracer.start_span(span_name="span1") as s1:
+ with tracer.start_span(span_name="span2") as s2:
+ s2.set_attribute("attr2", "val2")
+ span2 = json.loads(s2.to_json())
+ span1 = json.loads(s1.to_json())
+ # assert the two span data
+ assert span1["name"] == "span1"
+ assert span2["name"] == "span2"
+ trace_id = span1["context"]["trace_id"]
+ s1_span_id = span1["context"]["span_id"]
+ assert span2["context"]["trace_id"] == trace_id
+ assert span2["parent_id"] == s1_span_id
+ assert span2["attributes"]["attr2"] == "val2"
+ assert span2["resource"]["attributes"]["service.name"] ==
"my_test_service"
@patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter")
- @patch("airflow.observability.traces.otel_tracer.conf")
- def test_dag_tracer(self, conf_a, exporter):
- # necessary to speed up the span to be emitted
- with env_vars({"OTEL_BSP_SCHEDULE_DELAY": "1"}):
- log = logging.getLogger("TestOtelTrace.test_dag_tracer")
- log.setLevel(logging.DEBUG)
- conf_a.get.return_value = "abc"
- conf_a.getint.return_value = 123
- # this will enable debug to set - which outputs the result to
console
- conf_a.getboolean.return_value = True
-
- # mocking console exporter with in mem exporter for better
assertion
- in_mem_exporter = InMemorySpanExporter()
- exporter.return_value = in_mem_exporter
-
- now = datetime.now()
+ @env_vars(
+ {
+ "OTEL_EXPORTER_OTLP_ENDPOINT": "http://localhost:4318",
+ # necessary to speed up the span to be emitted
+ "OTEL_BSP_SCHEDULE_DELAY": "1",
+ }
+ )
+ def test_dag_tracer(self, exporter):
+ log = logging.getLogger("TestOtelTrace.test_dag_tracer")
+ log.setLevel(logging.DEBUG)
- tracer = otel_tracer.get_otel_tracer(Trace)
- with tracer.start_root_span(span_name="span1", start_time=now) as
s1:
- with tracer.start_span(span_name="span2") as s2:
- s2.set_attribute("attr2", "val2")
- span2 = json.loads(s2.to_json())
- span1 = json.loads(s1.to_json())
-
- # The otel sdk, accepts an int for the start_time, and converts it
to an iso string,
- # using `util.ns_to_iso_str()`.
- nano_time = datetime_to_nano(now)
- assert span1["start_time"] == util.ns_to_iso_str(nano_time)
- # Same trace_id
- assert span1["context"]["trace_id"] == span2["context"]["trace_id"]
- assert span1["context"]["span_id"] == span2["parent_id"]
+ # mocking console exporter with in mem exporter for better assertion
+ in_mem_exporter = InMemorySpanExporter()
+ exporter.return_value = in_mem_exporter
- @patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter")
- @patch("airflow.observability.traces.otel_tracer.conf")
- def test_context_propagation(self, conf_a, exporter):
- # necessary to speed up the span to be emitted
- with env_vars({"OTEL_BSP_SCHEDULE_DELAY": "1"}):
- log = logging.getLogger("TestOtelTrace.test_context_propagation")
- log.setLevel(logging.DEBUG)
- conf_a.get.return_value = "abc"
- conf_a.getint.return_value = 123
- # this will enable debug to set - which outputs the result to
console
- conf_a.getboolean.return_value = True
-
- # mocking console exporter with in mem exporter for better
assertion
- in_mem_exporter = InMemorySpanExporter()
- exporter.return_value = in_mem_exporter
-
- # Method that represents another service which is
- # - getting the carrier
- # - extracting the context
- # - using the context to create a new span
- # The new span should be associated with the span from the
injected context carrier.
- def _task_func(otel_tr, carrier):
- parent_context = otel_tr.extract(carrier)
-
- with otel_tr.start_child_span(span_name="sub_span",
parent_context=parent_context) as span:
- span.set_attribute("attr2", "val2")
- json_span = json.loads(span.to_json())
- return json_span
+ now = datetime.now()
- tracer = otel_tracer.get_otel_tracer(Trace)
+ tracer = otel_tracer.get_otel_tracer(Trace)
+ with tracer.start_root_span(span_name="span1", start_time=now) as s1:
+ with tracer.start_span(span_name="span2") as s2:
+ s2.set_attribute("attr2", "val2")
+ span2 = json.loads(s2.to_json())
+ span1 = json.loads(s1.to_json())
+
+ # The otel sdk, accepts an int for the start_time, and converts it to
an iso string,
+ # using `util.ns_to_iso_str()`.
+ nano_time = datetime_to_nano(now)
+ assert span1["start_time"] == util.ns_to_iso_str(nano_time)
+ # Same trace_id
+ assert span1["context"]["trace_id"] == span2["context"]["trace_id"]
+ assert span1["context"]["span_id"] == span2["parent_id"]
+
+ @patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter")
+ @env_vars(
+ {
+ "OTEL_EXPORTER_OTLP_ENDPOINT": "http://localhost:4318",
+ # necessary to speed up the span to be emitted
+ "OTEL_BSP_SCHEDULE_DELAY": "1",
+ }
+ )
+ def test_context_propagation(self, exporter):
+ log = logging.getLogger("TestOtelTrace.test_context_propagation")
+ log.setLevel(logging.DEBUG)
+
+ # mocking console exporter with in mem exporter for better assertion
+ in_mem_exporter = InMemorySpanExporter()
+ exporter.return_value = in_mem_exporter
+
+ # Method that represents another service which is
+ # - getting the carrier
+ # - extracting the context
+ # - using the context to create a new span
+ # The new span should be associated with the span from the injected
context carrier.
+ def _task_func(otel_tr, carrier):
+ parent_context = otel_tr.extract(carrier)
+
+ with otel_tr.start_child_span(span_name="sub_span",
parent_context=parent_context) as span:
+ span.set_attribute("attr2", "val2")
+ json_span = json.loads(span.to_json())
+ return json_span
- root_span = tracer.start_root_span(span_name="root_span",
start_as_current=False)
- # The context is available, it can be injected into the carrier.
- context_carrier = tracer.inject()
+ tracer = otel_tracer.get_otel_tracer(Trace)
- # Some function that uses the carrier to create a new span.
- json_span2 = _task_func(otel_tr=tracer, carrier=context_carrier)
+ root_span = tracer.start_root_span(span_name="root_span",
start_as_current=False)
+ # The context is available, it can be injected into the carrier.
+ context_carrier = tracer.inject()
+
+ # Some function that uses the carrier to create a new span.
+ json_span2 = _task_func(otel_tr=tracer, carrier=context_carrier)
+
+ json_span1 = json.loads(root_span.to_json())
+ # Manually end the span.
+ root_span.end()
+
+ # Verify that span1 is a root span.
+ assert json_span1["parent_id"] is None
+ # Check span2 parent_id to verify that it's a child of span1.
+ assert json_span2["parent_id"] == json_span1["context"]["span_id"]
+ # The trace_id and the span_id are randomly generated by the otel sdk.
+ # Both spans should belong to the same trace.
+ assert json_span1["context"]["trace_id"] ==
json_span2["context"]["trace_id"]
+
+ @pytest.mark.parametrize(
+ ("provided_env_vars", "expected_endpoint", "expected_exporter_module"),
+ [
+ pytest.param(
+ {
+ "OTEL_EXPORTER_OTLP_ENDPOINT": "http://localhost:1234",
+ "OTEL_EXPORTER_OTLP_PROTOCOL": "grpc",
+ "AIRFLOW__TRACES__OTEL_HOST": "breeze-otel-collector",
+ "AIRFLOW__TRACES__OTEL_PORT": "4318",
+ },
+ "localhost:1234",
+ "grpc",
+ id="env_vars_with_grpc",
Review Comment:
> We are deprecating the project specific settings. Why give them precedence
if they are going to be removed?
Fair point, let's keep the precedence as is (OTel env prior than
Airflow-level config).
> There isn't a way to know if the user set the airflow config
Would it be possible to adding sentinel as fallback for the following place
and if the OTel envs are set, and if we get non-sentinel object from `conf`
then we can log warnings?
Sentinel patten example:
https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/_shared/configuration/parser.py#L53-L59
For core:
https://github.com/apache/airflow/blob/337aee896a9102c1d5c002b9708d1a49da9e98ca/airflow-core/src/airflow/observability/traces/otel_tracer.py#L29-L38
Fore SDK:
https://github.com/apache/airflow/blob/337aee896a9102c1d5c002b9708d1a49da9e98ca/task-sdk/src/airflow/sdk/observability/traces/otel_tracer.py#L29-L38
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]