ferruzzi commented on code in PR #37948: URL: https://github.com/apache/airflow/pull/37948#discussion_r1516903821
########## airflow/traces/utils.py: ########## @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import logging + +from airflow.utils.hashlib_wrapper import md5 + +log = logging.getLogger(__name__) + + +def gen_trace_id(dag_run) -> str: Review Comment: Type hints in the signatures please. ########## airflow/traces/utils.py: ########## @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import logging + +from airflow.utils.hashlib_wrapper import md5 + +log = logging.getLogger(__name__) + + +def gen_trace_id(dag_run) -> str: + dag_id = dag_run.dag_id + run_id = dag_run.run_id + start_dt = dag_run.start_date + hash_seed = f"{dag_id}_{run_id}_{start_dt.timestamp()}" + hash_hex = md5(hash_seed.encode("utf-8")).hexdigest() + return hash_hex + + +def gen_span_id_from_ti_key(ti_key) -> str: + """Generate span id from TI key.""" + dag_id = ti_key.dag_id + run_id = ti_key.run_id + task_id = ti_key.task_id + try_num = ti_key.try_number # key always has next number, not current + hash_seed = f"{dag_id}_{run_id}_{task_id}_{try_num}" + hash_hex = md5(hash_seed.encode("utf-8")).hexdigest()[16:] + return hash_hex + + +def gen_dag_span_id(dag_run): + """Generate dag's root span id using dag_run.""" + dag_id = dag_run.dag_id + run_id = dag_run.run_id + start_dt = dag_run.start_date + hash_seed = f"{dag_id}_{run_id}_{start_dt.timestamp()}" + hash_hex = md5(hash_seed.encode("utf-8")).hexdigest()[16:] + return hash_hex + + +def gen_span_id(ti): + """Generate span id from the task instance.""" + dag_run = ti.dag_run + dag_id = dag_run.dag_id + run_id = dag_run.run_id + task_id = ti.task_id + """in terms of ti when this is called, the try_number is already set to next, hence the subtraction""" + try_num = ti.try_number - 1 + hash_seed = f"{dag_id}_{run_id}_{task_id}_{try_num}" + hash_hex = md5(hash_seed.encode("utf-8")).hexdigest()[16:] + return hash_hex + + +def parse_traceparent(traceparent_str: str | None = None) -> dict: + """Parse traceparent string: 00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01.""" + if traceparent_str is None: + return {} + tokens = traceparent_str.split("-") + if len(tokens) != 4: + raise ValueError("The traceparent string does not have the correct format.") + return {"version": tokens[0], "trace_id": tokens[1], "parent_id": tokens[2], "flags": tokens[3]} Review Comment: Purely a suggestion, feel free to ignore it: ``` if traceparent_str is None: return {} try: version, trace_id, parent_id, flags = traceparent_str.split("-") return {"version": version, "trace_id": trace_id, "parent_id": parent_id, "flags": flags} except ValueError: # Wrong number of values raise ValueError("The traceparent string does not have the correct format.") ``` Maybe that's a little easier to read? Maybe not. I don't know. Feel free to just resolve this one. ########## airflow/traces/tracer.py: ########## @@ -0,0 +1,256 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import logging +import socket +import typing +from typing import ( + TYPE_CHECKING, + Callable, +) + +from airflow.configuration import conf +from airflow.typing_compat import Protocol + +log = logging.getLogger(__name__) + + +def gen_context(trace_id, span_id): + """Generate span context from trace_id and span_id.""" + from airflow.traces.otel_tracer import gen_context as otel_gen_context + + return otel_gen_context(trace_id, span_id) + + +def gen_links_from_kv_list(list): + """Generate links from kv list of {trace_id:int, span_id:int}.""" + from airflow.traces.otel_tracer import gen_links_from_kv_list + + return gen_links_from_kv_list(list) + + +def span(func): + """Decorate a function with span.""" + + def wrapper(*args, **kwargs): + func_name = func.__name__ + qual_name = func.__qualname__ + module_name = func.__module__ + if "." in qual_name: + component = f"{qual_name.rsplit('.', 1)[0]}" + else: + component = module_name + with Trace.start_span(span_name=func_name, component=component): + return func(*args, **kwargs) + + return wrapper + + +class DummyContext: + """If no Tracer is configured, DummyContext is used as a fallback.""" + + def __init__(self): + self.trace_id = 1 + + +class DummySpan: + """If no Tracer is configured, DummySpan is used as a fallback.""" + + def __enter__(self): + """Enter.""" + return self + + def __exit__(self, *args, **kwargs): + """Exit.""" + pass + + def __call__(self, obj): + """Call.""" + return obj + + def get_span_context(self): + """Get span context.""" + return DUMMY_CTX + + def set_attribute(self, key, value) -> None: + """Set an attribute to the span.""" + pass + + def set_attributes(self, attributes) -> None: + """Set multiple attributes at once.""" + pass + + def add_event( + self, + name: str, + attributes=None, Review Comment: Here and below: Type hint for attributes? ########## airflow/traces/tracer.py: ########## @@ -0,0 +1,256 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import logging +import socket +import typing +from typing import ( + TYPE_CHECKING, + Callable, +) + +from airflow.configuration import conf +from airflow.typing_compat import Protocol + +log = logging.getLogger(__name__) + + +def gen_context(trace_id, span_id): + """Generate span context from trace_id and span_id.""" + from airflow.traces.otel_tracer import gen_context as otel_gen_context + + return otel_gen_context(trace_id, span_id) + + +def gen_links_from_kv_list(list): + """Generate links from kv list of {trace_id:int, span_id:int}.""" + from airflow.traces.otel_tracer import gen_links_from_kv_list + + return gen_links_from_kv_list(list) + + +def span(func): + """Decorate a function with span.""" + + def wrapper(*args, **kwargs): + func_name = func.__name__ + qual_name = func.__qualname__ + module_name = func.__module__ + if "." in qual_name: + component = f"{qual_name.rsplit('.', 1)[0]}" + else: + component = module_name + with Trace.start_span(span_name=func_name, component=component): + return func(*args, **kwargs) + + return wrapper + + +class DummyContext: + """If no Tracer is configured, DummyContext is used as a fallback.""" + + def __init__(self): + self.trace_id = 1 + + +class DummySpan: + """If no Tracer is configured, DummySpan is used as a fallback.""" + + def __enter__(self): + """Enter.""" + return self + + def __exit__(self, *args, **kwargs): + """Exit.""" + pass + + def __call__(self, obj): + """Call.""" + return obj + + def get_span_context(self): + """Get span context.""" + return DUMMY_CTX + + def set_attribute(self, key, value) -> None: + """Set an attribute to the span.""" + pass + + def set_attributes(self, attributes) -> None: + """Set multiple attributes at once.""" + pass + + def add_event( + self, + name: str, + attributes=None, + timestamp: int | None = None, + ) -> None: + """Add event to span.""" + pass + + def add_link( + self, + context: typing.Any, + attributes=None, + ) -> None: + """Add link to the span.""" + pass + + def end(self, end_time=None, *args, **kwargs) -> None: + """End.""" + pass + + +DUMMY_SPAN = DummySpan() +DUMMY_CTX = DummyContext() + + +class Tracer(Protocol): + """This class is only used for TypeChecking (for IDEs, mypy, etc).""" + + instance: Tracer | DummyTrace | None = None + + @classmethod + def get_tracer(cls, component): + """Get a tracer.""" + raise NotImplementedError() + + @classmethod + def start_span( + cls, + span_name: str, + component: str | None = None, + parent_sc=None, + span_id=None, + links=None, + start_time=None, + ): + """Start a span.""" + raise NotImplementedError() + + @classmethod + def use_span(cls, span): + """Use a span as current.""" + raise NotImplementedError() + + @classmethod + def get_current_span(self): + raise NotImplementedError() + + @classmethod + def start_span_from_dagrun(cls, dagrun, span_name=None, service_name=None, component=None, links=None): Review Comment: Here and elsewhere: Just my personal preference, feel free to resolve and ignore. If you add a comma after the last arg, the static checks will force it to one per line like `start_span()` has them above. ########## airflow/traces/tracer.py: ########## @@ -0,0 +1,256 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import logging +import socket +import typing +from typing import ( + TYPE_CHECKING, + Callable, +) + +from airflow.configuration import conf +from airflow.typing_compat import Protocol + +log = logging.getLogger(__name__) + + +def gen_context(trace_id, span_id): + """Generate span context from trace_id and span_id.""" + from airflow.traces.otel_tracer import gen_context as otel_gen_context + + return otel_gen_context(trace_id, span_id) + + +def gen_links_from_kv_list(list): + """Generate links from kv list of {trace_id:int, span_id:int}.""" + from airflow.traces.otel_tracer import gen_links_from_kv_list + + return gen_links_from_kv_list(list) + + +def span(func): + """Decorate a function with span.""" + + def wrapper(*args, **kwargs): + func_name = func.__name__ + qual_name = func.__qualname__ + module_name = func.__module__ + if "." in qual_name: + component = f"{qual_name.rsplit('.', 1)[0]}" + else: + component = module_name + with Trace.start_span(span_name=func_name, component=component): + return func(*args, **kwargs) + + return wrapper + + +class DummyContext: + """If no Tracer is configured, DummyContext is used as a fallback.""" + + def __init__(self): + self.trace_id = 1 + + +class DummySpan: + """If no Tracer is configured, DummySpan is used as a fallback.""" + + def __enter__(self): + """Enter.""" + return self + + def __exit__(self, *args, **kwargs): + """Exit.""" + pass + + def __call__(self, obj): + """Call.""" + return obj + + def get_span_context(self): + """Get span context.""" + return DUMMY_CTX + + def set_attribute(self, key, value) -> None: + """Set an attribute to the span.""" + pass + + def set_attributes(self, attributes) -> None: + """Set multiple attributes at once.""" + pass + + def add_event( + self, + name: str, + attributes=None, + timestamp: int | None = None, + ) -> None: + """Add event to span.""" + pass + + def add_link( + self, + context: typing.Any, Review Comment: Import this as `from typing import Any` above and just use Any here if you must, but preferably narrow this down if you can. ########## airflow/traces/utils.py: ########## @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import logging + +from airflow.utils.hashlib_wrapper import md5 + +log = logging.getLogger(__name__) + + +def gen_trace_id(dag_run) -> str: + dag_id = dag_run.dag_id + run_id = dag_run.run_id + start_dt = dag_run.start_date + hash_seed = f"{dag_id}_{run_id}_{start_dt.timestamp()}" + hash_hex = md5(hash_seed.encode("utf-8")).hexdigest() + return hash_hex + + +def gen_span_id_from_ti_key(ti_key) -> str: + """Generate span id from TI key.""" + dag_id = ti_key.dag_id + run_id = ti_key.run_id + task_id = ti_key.task_id + try_num = ti_key.try_number # key always has next number, not current + hash_seed = f"{dag_id}_{run_id}_{task_id}_{try_num}" + hash_hex = md5(hash_seed.encode("utf-8")).hexdigest()[16:] + return hash_hex + + +def gen_dag_span_id(dag_run): + """Generate dag's root span id using dag_run.""" + dag_id = dag_run.dag_id + run_id = dag_run.run_id + start_dt = dag_run.start_date + hash_seed = f"{dag_id}_{run_id}_{start_dt.timestamp()}" + hash_hex = md5(hash_seed.encode("utf-8")).hexdigest()[16:] + return hash_hex + + +def gen_span_id(ti): + """Generate span id from the task instance.""" + dag_run = ti.dag_run + dag_id = dag_run.dag_id + run_id = dag_run.run_id + task_id = ti.task_id + """in terms of ti when this is called, the try_number is already set to next, hence the subtraction""" + try_num = ti.try_number - 1 + hash_seed = f"{dag_id}_{run_id}_{task_id}_{try_num}" + hash_hex = md5(hash_seed.encode("utf-8")).hexdigest()[16:] + return hash_hex + + +def parse_traceparent(traceparent_str: str | None = None) -> dict: + """Parse traceparent string: 00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01.""" + if traceparent_str is None: + return {} + tokens = traceparent_str.split("-") + if len(tokens) != 4: + raise ValueError("The traceparent string does not have the correct format.") + return {"version": tokens[0], "trace_id": tokens[1], "parent_id": tokens[2], "flags": tokens[3]} + + +def parse_tracestate(tracestate_str: str | None = None) -> dict: + """Parse tracestate string: rojo=00f067aa0ba902b7,congo=t61rcWkgMzE.""" + if tracestate_str is None: + return {} + tokens = tracestate_str.split(",") + result = {} + for pair in tokens: + key, value = pair.split("=") + result[key.strip()] = value.strip() + return result + + +def is_valid_trace_id(trace_id: str) -> bool: + """Check whether trace id is valid.""" + if trace_id is not None and len(trace_id) == 32 and trace_id != "0x00000000000000000000000000000000": + return True + else: + return False Review Comment: `0x00000000000000000000000000000000` lends itself to typo and easy mistakes. Consider: ``` if trace_id is not None and \ len(trace_id) == 32 and \ trace_id[:2] == "0x" and \ trace_id[2:] != "0" * 30: # Make sure the last 30 characters are not all "0" ``` or perhaps better: ``` if trace_id is not None and \ len(trace_id) == 32 and \ int(trace_id, 16) != 0: # Convert from Hex to decimal and make sure the decimal value is not 0 ``` Also, whichever way you go with that, you can just return the check itself: ``` return trace_id is not None and len(trace_id) == 32 and trace_id != "0x00000000000000000000000000000000" ``` ########## airflow/traces/utils.py: ########## @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + Review Comment: These all look good and fairly straight-forward, but unit tests would prevent someone from monkeying with them in the future. ########## airflow/traces/tracer.py: ########## @@ -0,0 +1,256 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import logging +import socket +import typing +from typing import ( + TYPE_CHECKING, + Callable, +) + +from airflow.configuration import conf +from airflow.typing_compat import Protocol + +log = logging.getLogger(__name__) + + +def gen_context(trace_id, span_id): + """Generate span context from trace_id and span_id.""" + from airflow.traces.otel_tracer import gen_context as otel_gen_context + + return otel_gen_context(trace_id, span_id) + + +def gen_links_from_kv_list(list): + """Generate links from kv list of {trace_id:int, span_id:int}.""" + from airflow.traces.otel_tracer import gen_links_from_kv_list + + return gen_links_from_kv_list(list) + + +def span(func): + """Decorate a function with span.""" + + def wrapper(*args, **kwargs): + func_name = func.__name__ + qual_name = func.__qualname__ + module_name = func.__module__ + if "." in qual_name: + component = f"{qual_name.rsplit('.', 1)[0]}" + else: + component = module_name + with Trace.start_span(span_name=func_name, component=component): + return func(*args, **kwargs) + + return wrapper + + +class DummyContext: Review Comment: Here and elsewhere: We aren't supposed to use "Dummy" anymore. Consider "Empty", "Base", "Safe", "Default", or "Placeholder" maybe. That should actually have been caught by the static checks for inclusive language.... ########## airflow/traces/utils.py: ########## @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import logging + +from airflow.utils.hashlib_wrapper import md5 + +log = logging.getLogger(__name__) + + +def gen_trace_id(dag_run) -> str: + dag_id = dag_run.dag_id + run_id = dag_run.run_id + start_dt = dag_run.start_date + hash_seed = f"{dag_id}_{run_id}_{start_dt.timestamp()}" + hash_hex = md5(hash_seed.encode("utf-8")).hexdigest() + return hash_hex + + +def gen_span_id_from_ti_key(ti_key) -> str: + """Generate span id from TI key.""" + dag_id = ti_key.dag_id + run_id = ti_key.run_id + task_id = ti_key.task_id + try_num = ti_key.try_number # key always has next number, not current + hash_seed = f"{dag_id}_{run_id}_{task_id}_{try_num}" + hash_hex = md5(hash_seed.encode("utf-8")).hexdigest()[16:] + return hash_hex + + +def gen_dag_span_id(dag_run): + """Generate dag's root span id using dag_run.""" + dag_id = dag_run.dag_id + run_id = dag_run.run_id + start_dt = dag_run.start_date + hash_seed = f"{dag_id}_{run_id}_{start_dt.timestamp()}" + hash_hex = md5(hash_seed.encode("utf-8")).hexdigest()[16:] + return hash_hex + + +def gen_span_id(ti): + """Generate span id from the task instance.""" + dag_run = ti.dag_run + dag_id = dag_run.dag_id + run_id = dag_run.run_id + task_id = ti.task_id + """in terms of ti when this is called, the try_number is already set to next, hence the subtraction""" Review Comment: ```suggestion # When this is called, the try_number is already set to next, hence the subtraction. ``` ########## airflow/traces/utils.py: ########## @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import logging + +from airflow.utils.hashlib_wrapper import md5 + +log = logging.getLogger(__name__) + + +def gen_trace_id(dag_run) -> str: Review Comment: This appears to be the only method in this module missing a docstring? ########## airflow/traces/utils.py: ########## @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import logging + +from airflow.utils.hashlib_wrapper import md5 + +log = logging.getLogger(__name__) + + +def gen_trace_id(dag_run) -> str: + dag_id = dag_run.dag_id + run_id = dag_run.run_id + start_dt = dag_run.start_date + hash_seed = f"{dag_id}_{run_id}_{start_dt.timestamp()}" + hash_hex = md5(hash_seed.encode("utf-8")).hexdigest() + return hash_hex + + +def gen_span_id_from_ti_key(ti_key) -> str: + """Generate span id from TI key.""" + dag_id = ti_key.dag_id + run_id = ti_key.run_id + task_id = ti_key.task_id + try_num = ti_key.try_number # key always has next number, not current + hash_seed = f"{dag_id}_{run_id}_{task_id}_{try_num}" + hash_hex = md5(hash_seed.encode("utf-8")).hexdigest()[16:] + return hash_hex + + +def gen_dag_span_id(dag_run): + """Generate dag's root span id using dag_run.""" + dag_id = dag_run.dag_id + run_id = dag_run.run_id + start_dt = dag_run.start_date + hash_seed = f"{dag_id}_{run_id}_{start_dt.timestamp()}" + hash_hex = md5(hash_seed.encode("utf-8")).hexdigest()[16:] + return hash_hex + + +def gen_span_id(ti): + """Generate span id from the task instance.""" + dag_run = ti.dag_run + dag_id = dag_run.dag_id + run_id = dag_run.run_id + task_id = ti.task_id + """in terms of ti when this is called, the try_number is already set to next, hence the subtraction""" + try_num = ti.try_number - 1 + hash_seed = f"{dag_id}_{run_id}_{task_id}_{try_num}" + hash_hex = md5(hash_seed.encode("utf-8")).hexdigest()[16:] + return hash_hex + + +def parse_traceparent(traceparent_str: str | None = None) -> dict: + """Parse traceparent string: 00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01.""" + if traceparent_str is None: + return {} + tokens = traceparent_str.split("-") + if len(tokens) != 4: + raise ValueError("The traceparent string does not have the correct format.") + return {"version": tokens[0], "trace_id": tokens[1], "parent_id": tokens[2], "flags": tokens[3]} + + +def parse_tracestate(tracestate_str: str | None = None) -> dict: + """Parse tracestate string: rojo=00f067aa0ba902b7,congo=t61rcWkgMzE.""" + if tracestate_str is None: + return {} + tokens = tracestate_str.split(",") + result = {} + for pair in tokens: + key, value = pair.split("=") + result[key.strip()] = value.strip() + return result + + +def is_valid_trace_id(trace_id: str) -> bool: + """Check whether trace id is valid.""" + if trace_id is not None and len(trace_id) == 32 and trace_id != "0x00000000000000000000000000000000": + return True + else: + return False + + +def is_valid_span_id(span_id: str) -> bool: + """Check whether span id is valid.""" Review Comment: Same thoughts as above. ########## airflow/traces/utils.py: ########## @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import logging + +from airflow.utils.hashlib_wrapper import md5 + +log = logging.getLogger(__name__) + + +def gen_trace_id(dag_run) -> str: + dag_id = dag_run.dag_id + run_id = dag_run.run_id + start_dt = dag_run.start_date + hash_seed = f"{dag_id}_{run_id}_{start_dt.timestamp()}" + hash_hex = md5(hash_seed.encode("utf-8")).hexdigest() + return hash_hex + + +def gen_span_id_from_ti_key(ti_key) -> str: + """Generate span id from TI key.""" + dag_id = ti_key.dag_id + run_id = ti_key.run_id + task_id = ti_key.task_id + try_num = ti_key.try_number # key always has next number, not current + hash_seed = f"{dag_id}_{run_id}_{task_id}_{try_num}" + hash_hex = md5(hash_seed.encode("utf-8")).hexdigest()[16:] + return hash_hex + + +def gen_dag_span_id(dag_run): + """Generate dag's root span id using dag_run.""" + dag_id = dag_run.dag_id + run_id = dag_run.run_id + start_dt = dag_run.start_date + hash_seed = f"{dag_id}_{run_id}_{start_dt.timestamp()}" + hash_hex = md5(hash_seed.encode("utf-8")).hexdigest()[16:] + return hash_hex + + +def gen_span_id(ti): + """Generate span id from the task instance.""" + dag_run = ti.dag_run + dag_id = dag_run.dag_id + run_id = dag_run.run_id + task_id = ti.task_id + """in terms of ti when this is called, the try_number is already set to next, hence the subtraction""" + try_num = ti.try_number - 1 + hash_seed = f"{dag_id}_{run_id}_{task_id}_{try_num}" + hash_hex = md5(hash_seed.encode("utf-8")).hexdigest()[16:] + return hash_hex + + +def parse_traceparent(traceparent_str: str | None = None) -> dict: + """Parse traceparent string: 00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01.""" + if traceparent_str is None: + return {} + tokens = traceparent_str.split("-") + if len(tokens) != 4: + raise ValueError("The traceparent string does not have the correct format.") + return {"version": tokens[0], "trace_id": tokens[1], "parent_id": tokens[2], "flags": tokens[3]} + + +def parse_tracestate(tracestate_str: str | None = None) -> dict: + """Parse tracestate string: rojo=00f067aa0ba902b7,congo=t61rcWkgMzE.""" + if tracestate_str is None: + return {} + tokens = tracestate_str.split(",") + result = {} + for pair in tokens: + key, value = pair.split("=") + result[key.strip()] = value.strip() + return result + + +def is_valid_trace_id(trace_id: str) -> bool: + """Check whether trace id is valid.""" + if trace_id is not None and len(trace_id) == 32 and trace_id != "0x00000000000000000000000000000000": + return True + else: + return False Review Comment: If you use the hex/decimal conversion you may need more logic, that will throw a ValueError if it isn't a valid hexadecimal value being passed in -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
