ashb commented on code in PR #49180: URL: https://github.com/apache/airflow/pull/49180#discussion_r2059925068
########## airflow-core/src/airflow/jobs/scheduler_job_runner.py: ########## @@ -873,11 +875,11 @@ def process_executor_events( ti.pid, ) - if (active_ti_span := cls.active_spans.get(ti.key)) is not None: + if (active_ti_span := cls.active_spans.get("ti:" + ti.id)) is not None: cls.set_ti_span_attrs(span=active_ti_span, state=state, ti=ti) # End the span and remove it from the active_spans dict. active_ti_span.end(end_time=datetime_to_nano(ti.end_date)) - cls.active_spans.delete(ti.key) + cls.active_spans.delete("ti:" + ti.id) Review Comment: Elsewhere you have `"ti:" + str(ti.id)` -- we should be consistent. I don't mind which way we go, but lets be consistent please ########## airflow-core/src/airflow/jobs/scheduler_job_runner.py: ########## @@ -1058,27 +1060,25 @@ def _update_dag_run_state_for_paused_dags(self, session: Session = NEW_SESSION) @provide_session def _end_active_spans(self, session: Session = NEW_SESSION): # No need to do a commit for every update. The annotation will commit all of them once at the end. - for key, span in self.active_spans.get_all().items(): - from airflow.models.taskinstance import TaskInstanceKey - - if isinstance(key, TaskInstanceKey): # ti span. - # Can't compare the key directly because the try_number or the map_index might not be the same. - ti: TaskInstance = session.scalars( - select(TaskInstance).where( - TaskInstance.dag_id == key.dag_id, - TaskInstance.task_id == key.task_id, - TaskInstance.run_id == key.run_id, - ) - ).one() - if ti.state in State.finished: - self.set_ti_span_attrs(span=span, state=ti.state, ti=ti) - span.end(end_time=datetime_to_nano(ti.end_date)) - ti.span_status = SpanStatus.ENDED - else: - span.end() - ti.span_status = SpanStatus.NEEDS_CONTINUANCE - else: - dag_run: DagRun = session.scalars(select(DagRun).where(DagRun.run_id == key)).one() + for prefixed_key, span in self.active_spans.get_all().items(): + # Use partition to split on the first occurrence of ':'. + prefix, sep, key = prefixed_key.partition(":") + + if prefix == "ti": + ti: TaskInstance | None = session.scalars( + select(TaskInstance).where(TaskInstance.id == key) + ).one_or_none() + + if ti is not None: + if ti.state in State.finished: + self.set_ti_span_attrs(span=span, state=ti.state, ti=ti) + span.end(end_time=datetime_to_nano(ti.end_date)) + ti.span_status = SpanStatus.ENDED + else: + span.end() + ti.span_status = SpanStatus.NEEDS_CONTINUANCE + elif prefix == "dr": + dag_run: DagRun = session.scalars(select(DagRun).where(DagRun.run_id == str(key))).one() Review Comment: `run_id` is only unique within a single DAG, it's not globally unique. This will either need to be DagRun.id, or we'll need to store the dag ID too) ########## airflow-core/src/airflow/jobs/scheduler_job_runner.py: ########## @@ -1058,27 +1060,25 @@ def _update_dag_run_state_for_paused_dags(self, session: Session = NEW_SESSION) @provide_session def _end_active_spans(self, session: Session = NEW_SESSION): # No need to do a commit for every update. The annotation will commit all of them once at the end. - for key, span in self.active_spans.get_all().items(): - from airflow.models.taskinstance import TaskInstanceKey - - if isinstance(key, TaskInstanceKey): # ti span. - # Can't compare the key directly because the try_number or the map_index might not be the same. - ti: TaskInstance = session.scalars( - select(TaskInstance).where( - TaskInstance.dag_id == key.dag_id, - TaskInstance.task_id == key.task_id, - TaskInstance.run_id == key.run_id, - ) - ).one() - if ti.state in State.finished: - self.set_ti_span_attrs(span=span, state=ti.state, ti=ti) - span.end(end_time=datetime_to_nano(ti.end_date)) - ti.span_status = SpanStatus.ENDED - else: - span.end() - ti.span_status = SpanStatus.NEEDS_CONTINUANCE - else: - dag_run: DagRun = session.scalars(select(DagRun).where(DagRun.run_id == key)).one() + for prefixed_key, span in self.active_spans.get_all().items(): + # Use partition to split on the first occurrence of ':'. + prefix, sep, key = prefixed_key.partition(":") + + if prefix == "ti": + ti: TaskInstance | None = session.scalars( + select(TaskInstance).where(TaskInstance.id == key) + ).one_or_none() Review Comment: ```suggestion ti: TaskInstance | None = session.get(TaskInstance, key) ``` ########## airflow-core/src/airflow/executors/workloads.py: ########## @@ -69,7 +69,6 @@ class TaskInstance(BaseModel): parent_context_carrier: dict | None = None context_carrier: dict | None = None - queued_dttm: datetime | None = None Review Comment: This looks like an un-intended change/a bad rebase? ########## airflow-core/src/airflow/config_templates/config.yml: ########## @@ -1265,6 +1265,13 @@ traces: type: string example: ~ default: "False" + otel_debug_traces_on: + description: | + If True, then traces from Airflow internal methods are exported. Defaults to False. + version_added: 3.0.0 Review Comment: ```suggestion version_added: 3.1.0 ``` (Or maybe 3.0.1 -- we'll need to see) ########## airflow-core/src/airflow/jobs/scheduler_job_runner.py: ########## @@ -1058,27 +1060,25 @@ def _update_dag_run_state_for_paused_dags(self, session: Session = NEW_SESSION) @provide_session def _end_active_spans(self, session: Session = NEW_SESSION): # No need to do a commit for every update. The annotation will commit all of them once at the end. - for key, span in self.active_spans.get_all().items(): - from airflow.models.taskinstance import TaskInstanceKey - - if isinstance(key, TaskInstanceKey): # ti span. - # Can't compare the key directly because the try_number or the map_index might not be the same. - ti: TaskInstance = session.scalars( - select(TaskInstance).where( - TaskInstance.dag_id == key.dag_id, - TaskInstance.task_id == key.task_id, - TaskInstance.run_id == key.run_id, - ) - ).one() - if ti.state in State.finished: - self.set_ti_span_attrs(span=span, state=ti.state, ti=ti) - span.end(end_time=datetime_to_nano(ti.end_date)) - ti.span_status = SpanStatus.ENDED - else: - span.end() - ti.span_status = SpanStatus.NEEDS_CONTINUANCE - else: - dag_run: DagRun = session.scalars(select(DagRun).where(DagRun.run_id == key)).one() + for prefixed_key, span in self.active_spans.get_all().items(): + # Use partition to split on the first occurrence of ':'. + prefix, sep, key = prefixed_key.partition(":") Review Comment: I wonder if instead of prefixing the string we should store the keys as tuples: `("ti", str(ti.id))`, `("dr", dr.id)` etc? Not much in it but maybe it makes things slightly clearer? (Though we'd have to be more careful of the type of the id we put in -- cos `("ti", UUID(...))` wouldn't match `("ti", "the_uuid")` ########## airflow-core/src/airflow/traces/tracer.py: ########## @@ -53,7 +53,7 @@ def add_span(func): @wraps(func) def wrapper(*args, **kwargs): - with Trace.start_span(span_name=func_name, component=component): + with DebugTrace.start_span(span_name=func_name, component=component): Review Comment: Why is `add_span` always setting things as a DebugTrace? If it's only ever used for internal things I think we should rename this to `add_debug_span` to make it clearer ########## airflow-core/tests/unit/core/test_otel_tracer.py: ########## @@ -42,7 +42,8 @@ def name(): class TestOtelTrace: def test_get_otel_tracer_from_trace_metaclass(self): """Test that `Trace.some_method()`, uses an `OtelTrace` instance when otel is configured.""" - conf.add_section("traces") + if conf.getsection("traces") is None: + conf.add_section("traces") Review Comment: ```suggestion ``` Not needed anymore, `conf.set` now creates the section if needed. ########## airflow-core/tests/integration/otel/test_otel.py: ########## @@ -557,6 +557,7 @@ def print_ti_output_for_dag_run(dag_id: str, run_id: str): @pytest.mark.integration("redis") @pytest.mark.backend("postgres") +@pytest.mark.timeout("240") # 4 mins timeout for each test. Review Comment: 4m seems v long. Does it really take that long? ########## airflow-core/tests/integration/otel/test_otel.py: ########## @@ -1287,6 +1291,7 @@ def test_scheduler_exits_forcefully_in_the_middle_of_the_first_task( # Dag run should have succeeded. Test the spans in the output. check_spans_without_continuance(output=out, dag=dag, is_recreated=True, check_t1_sub_spans=False) + @pytest.mark.xfail(reason="Tests with a control file are flaky when running on the remote CI.") Review Comment: What is the plan with these long term? Having them left as xfail adds very little and is there any point keeping them? ########## airflow-core/src/airflow/jobs/scheduler_job_runner.py: ########## @@ -1058,27 +1060,25 @@ def _update_dag_run_state_for_paused_dags(self, session: Session = NEW_SESSION) @provide_session def _end_active_spans(self, session: Session = NEW_SESSION): # No need to do a commit for every update. The annotation will commit all of them once at the end. - for key, span in self.active_spans.get_all().items(): - from airflow.models.taskinstance import TaskInstanceKey - - if isinstance(key, TaskInstanceKey): # ti span. - # Can't compare the key directly because the try_number or the map_index might not be the same. - ti: TaskInstance = session.scalars( - select(TaskInstance).where( - TaskInstance.dag_id == key.dag_id, - TaskInstance.task_id == key.task_id, - TaskInstance.run_id == key.run_id, - ) - ).one() - if ti.state in State.finished: - self.set_ti_span_attrs(span=span, state=ti.state, ti=ti) - span.end(end_time=datetime_to_nano(ti.end_date)) - ti.span_status = SpanStatus.ENDED - else: - span.end() - ti.span_status = SpanStatus.NEEDS_CONTINUANCE - else: - dag_run: DagRun = session.scalars(select(DagRun).where(DagRun.run_id == key)).one() + for prefixed_key, span in self.active_spans.get_all().items(): + # Use partition to split on the first occurrence of ':'. + prefix, sep, key = prefixed_key.partition(":") + + if prefix == "ti": + ti: TaskInstance | None = session.scalars( + select(TaskInstance).where(TaskInstance.id == key) + ).one_or_none() + + if ti is not None: + if ti.state in State.finished: + self.set_ti_span_attrs(span=span, state=ti.state, ti=ti) + span.end(end_time=datetime_to_nano(ti.end_date)) + ti.span_status = SpanStatus.ENDED + else: + span.end() + ti.span_status = SpanStatus.NEEDS_CONTINUANCE + elif prefix == "dr": + dag_run: DagRun = session.scalars(select(DagRun).where(DagRun.run_id == str(key))).one() Review Comment: Whiiich it turns out was a bug in the existing impl. Oops. ########## pyproject.toml: ########## @@ -847,6 +847,9 @@ addopts = [ "--ignore-glob=**/tests/system/*", "--ignore-glob=tests/system/*", ] +markers = [ + "timeout: mark tests with a timeout (provided by pytest-timeout)", +] Review Comment: I'm surprised we needed to add this -- we have lots of tests with timeouts already... 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org