This is an automated email from the ASF dual-hosted git repository.

rahulvats pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 00e3862067c Fix the defaults for OTEL dag_run and task_instance attrs 
(#48777)
00e3862067c is described below

commit 00e3862067ca3bf3935b38405cbcc63df1d571e0
Author: Daniel Standish <[email protected]>
AuthorDate: Fri Apr 4 04:19:51 2025 -0700

    Fix the defaults for OTEL dag_run and task_instance attrs (#48777)
    
    Fix the defaults for OTEL dag_run and task_instance attrs
---
 airflow-core/docs/img/airflow_erd.sha256                         | 2 +-
 .../migrations/versions/0065_3_0_0_add_new_otel_span_fields.py   | 9 ++++++---
 airflow-core/src/airflow/models/dagrun.py                        | 2 +-
 airflow-core/src/airflow/models/taskinstance.py                  | 2 +-
 airflow-core/src/airflow/models/taskinstancehistory.py           | 2 +-
 airflow-core/tests/unit/api_fastapi/common/test_exceptions.py    | 6 +++---
 airflow-core/tests/unit/jobs/test_scheduler_job.py               | 1 +
 airflow-core/tests/unit/models/test_dag.py                       | 1 +
 airflow-core/tests/unit/models/test_dagrun.py                    | 5 +++--
 9 files changed, 18 insertions(+), 12 deletions(-)

diff --git a/airflow-core/docs/img/airflow_erd.sha256 
b/airflow-core/docs/img/airflow_erd.sha256
index f13a304e3e0..6eac20952c8 100644
--- a/airflow-core/docs/img/airflow_erd.sha256
+++ b/airflow-core/docs/img/airflow_erd.sha256
@@ -1 +1 @@
-d0ef3225afc0b4388e3c39548af9d8ef9cd213a644860b5d4043e3d1c3d21304
\ No newline at end of file
+5ee140103f80a15dc6cc7c808a49c415901a7c062d2cab9d26d49abd86e82aea
\ No newline at end of file
diff --git 
a/airflow-core/src/airflow/migrations/versions/0065_3_0_0_add_new_otel_span_fields.py
 
b/airflow-core/src/airflow/migrations/versions/0065_3_0_0_add_new_otel_span_fields.py
index 44d092ba956..5b9e9067dc7 100644
--- 
a/airflow-core/src/airflow/migrations/versions/0065_3_0_0_add_new_otel_span_fields.py
+++ 
b/airflow-core/src/airflow/migrations/versions/0065_3_0_0_add_new_otel_span_fields.py
@@ -44,16 +44,19 @@ def upgrade():
     """Apply add new otel span fields."""
     op.add_column("dag_run", sa.Column("scheduled_by_job_id", sa.Integer, 
nullable=True))
     op.add_column("dag_run", sa.Column("context_carrier", ExtendedJSON, 
nullable=True))
-    op.add_column("dag_run", sa.Column("span_status", sa.String(250), 
nullable=False, default="not_started"))
+    op.add_column(
+        "dag_run", sa.Column("span_status", sa.String(250), nullable=False, 
server_default="not_started")
+    )
 
     op.add_column("task_instance", sa.Column("context_carrier", ExtendedJSON, 
nullable=True))
     op.add_column(
-        "task_instance", sa.Column("span_status", sa.String(250), 
nullable=False, default="not_started")
+        "task_instance",
+        sa.Column("span_status", sa.String(250), nullable=False, 
server_default="not_started"),
     )
     op.add_column("task_instance_history", sa.Column("context_carrier", 
ExtendedJSON, nullable=True))
     op.add_column(
         "task_instance_history",
-        sa.Column("span_status", sa.String(250), nullable=False, 
default="not_started"),
+        sa.Column("span_status", sa.String(250), nullable=False, 
server_default="not_started"),
     )
 
 
diff --git a/airflow-core/src/airflow/models/dagrun.py 
b/airflow-core/src/airflow/models/dagrun.py
index 1217c6e4147..f05405977f6 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -190,7 +190,7 @@ class DagRun(Base, LoggingMixin):
     scheduled_by_job_id = Column(Integer)
     # Span context carrier, used for context propagation.
     context_carrier = Column(MutableDict.as_mutable(ExtendedJSON))
-    span_status = Column(String(250), default=SpanStatus.NOT_STARTED, 
nullable=False)
+    span_status = Column(String(250), server_default=SpanStatus.NOT_STARTED, 
nullable=False)
 
     # Remove this `if` after upgrading Sphinx-AutoAPI
     if not TYPE_CHECKING and "BUILDING_AIRFLOW_DOCS" in os.environ:
diff --git a/airflow-core/src/airflow/models/taskinstance.py 
b/airflow-core/src/airflow/models/taskinstance.py
index 8517827de36..479fedd9249 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -1660,7 +1660,7 @@ class TaskInstance(Base, LoggingMixin):
     updated_at = Column(UtcDateTime, default=timezone.utcnow, 
onupdate=timezone.utcnow)
     _rendered_map_index = Column("rendered_map_index", String(250))
     context_carrier = Column(MutableDict.as_mutable(ExtendedJSON))
-    span_status = Column(String(250), default=SpanStatus.NOT_STARTED, 
nullable=False)
+    span_status = Column(String(250), server_default=SpanStatus.NOT_STARTED, 
nullable=False)
 
     external_executor_id = Column(StringID())
 
diff --git a/airflow-core/src/airflow/models/taskinstancehistory.py 
b/airflow-core/src/airflow/models/taskinstancehistory.py
index 5a251c7223e..56b9e13d29c 100644
--- a/airflow-core/src/airflow/models/taskinstancehistory.py
+++ b/airflow-core/src/airflow/models/taskinstancehistory.py
@@ -94,7 +94,7 @@ class TaskInstanceHistory(Base):
     updated_at = Column(UtcDateTime, default=timezone.utcnow, 
onupdate=timezone.utcnow)
     rendered_map_index = Column(String(250))
     context_carrier = Column(MutableDict.as_mutable(ExtendedJSON))
-    span_status = Column(String(250), default=SpanStatus.NOT_STARTED, 
nullable=False)
+    span_status = Column(String(250), server_default=SpanStatus.NOT_STARTED, 
nullable=False)
 
     external_executor_id = Column(StringID())
     trigger_id = Column(Integer)
diff --git a/airflow-core/tests/unit/api_fastapi/common/test_exceptions.py 
b/airflow-core/tests/unit/api_fastapi/common/test_exceptions.py
index dbcf77e92e1..bc0a6dd9c4c 100644
--- a/airflow-core/tests/unit/api_fastapi/common/test_exceptions.py
+++ b/airflow-core/tests/unit/api_fastapi/common/test_exceptions.py
@@ -186,7 +186,7 @@ class TestUniqueConstraintErrorHandler:
                         status_code=status.HTTP_409_CONFLICT,
                         detail={
                             "reason": "Unique constraint violation",
-                            "statement": "INSERT INTO dag_run (dag_id, 
queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id, 
run_type, triggered_by, conf, data_interval_start, data_interval_end, 
run_after, last_scheduling_decision, log_template_id, updated_at, clear_number, 
backfill_id, bundle_version, scheduled_by_job_id, context_carrier, span_status) 
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, (SELECT 
max(log_template.id) AS max_1 \nFROM log_template), [...]
+                            "statement": "INSERT INTO dag_run (dag_id, 
queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id, 
run_type, triggered_by, conf, data_interval_start, data_interval_end, 
run_after, last_scheduling_decision, log_template_id, updated_at, clear_number, 
backfill_id, bundle_version, scheduled_by_job_id, context_carrier) VALUES (?, 
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, (SELECT max(log_template.id) AS max_1 
\nFROM log_template), ?, ?, ?, ?, ?, ?)",
                             "orig_error": "UNIQUE constraint failed: 
dag_run.dag_id, dag_run.run_id",
                         },
                     ),
@@ -194,7 +194,7 @@ class TestUniqueConstraintErrorHandler:
                         status_code=status.HTTP_409_CONFLICT,
                         detail={
                             "reason": "Unique constraint violation",
-                            "statement": "INSERT INTO dag_run (dag_id, 
queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id, 
run_type, triggered_by, conf, data_interval_start, data_interval_end, 
run_after, last_scheduling_decision, log_template_id, updated_at, clear_number, 
backfill_id, bundle_version, scheduled_by_job_id, context_carrier, span_status) 
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, (SELECT 
max(log_template.id) AS max_1 \nFROM [...]
+                            "statement": "INSERT INTO dag_run (dag_id, 
queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id, 
run_type, triggered_by, conf, data_interval_start, data_interval_end, 
run_after, last_scheduling_decision, log_template_id, updated_at, clear_number, 
backfill_id, bundle_version, scheduled_by_job_id, context_carrier) VALUES (%s, 
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, (SELECT 
max(log_template.id) AS max_1 \nFROM log_template [...]
                             "orig_error": "(1062, \"Duplicate entry 
'test_dag_id-test_run_id' for key 'dag_run.dag_run_dag_id_run_id_key'\")",
                         },
                     ),
@@ -202,7 +202,7 @@ class TestUniqueConstraintErrorHandler:
                         status_code=status.HTTP_409_CONFLICT,
                         detail={
                             "reason": "Unique constraint violation",
-                            "statement": "INSERT INTO dag_run (dag_id, 
queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id, 
run_type, triggered_by, conf, data_interval_start, data_interval_end, 
run_after, last_scheduling_decision, log_template_id, updated_at, clear_number, 
backfill_id, bundle_version, scheduled_by_job_id, context_carrier, span_status) 
VALUES (%(dag_id)s, %(queued_at)s, %(logical_date)s, %(start_date)s, 
%(end_date)s, %(state)s, %(run_id)s, %(crea [...]
+                            "statement": "INSERT INTO dag_run (dag_id, 
queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id, 
run_type, triggered_by, conf, data_interval_start, data_interval_end, 
run_after, last_scheduling_decision, log_template_id, updated_at, clear_number, 
backfill_id, bundle_version, scheduled_by_job_id, context_carrier) VALUES 
(%(dag_id)s, %(queued_at)s, %(logical_date)s, %(start_date)s, %(end_date)s, 
%(state)s, %(run_id)s, %(creating_job_id)s [...]
                             "orig_error": 'duplicate key value violates unique 
constraint "dag_run_dag_id_run_id_key"\nDETAIL:  Key (dag_id, 
run_id)=(test_dag_id, test_run_id) already exists.\n',
                         },
                     ),
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index c703037b0f0..e35b83c6e8b 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -3160,6 +3160,7 @@ class TestSchedulerJob:
                     run_after=next_info.run_after,
                     state=DagRunState.RUNNING,
                     triggered_by=DagRunTriggeredByType.TEST,
+                    session=session,
                 )
                 next_info = dag.next_dagrun_info(next_info.data_interval)
                 if next_info is None:
diff --git a/airflow-core/tests/unit/models/test_dag.py 
b/airflow-core/tests/unit/models/test_dag.py
index b2af9adcb89..b541f3048ee 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -1088,6 +1088,7 @@ class TestDag:
                 run_after=logical_date,
                 dag_version=dag_v,
                 triggered_by=DagRunTriggeredByType.TEST,
+                session=session,
             )
             ti_op1 = dr.get_task_instance(task_id=op1.task_id, session=session)
             ti_op1.set_state(state=TaskInstanceState.FAILED, session=session)
diff --git a/airflow-core/tests/unit/models/test_dagrun.py 
b/airflow-core/tests/unit/models/test_dagrun.py
index b7b2cfa9b24..06b43ade895 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -125,6 +125,7 @@ class TestDagRun:
             state=state,
             dag_version=dag_version or 
DagVersion.get_latest_version(dag.dag_id, session=session),
             triggered_by=DagRunTriggeredByType.TEST,
+            session=session,
         )
 
         if task_states is not None:
@@ -1080,7 +1081,7 @@ class TestDagRun:
         dag = DAG(dag_id="test_dagrun_stats", 
schedule=datetime.timedelta(days=1), start_date=DEFAULT_DATE)
         dag_task = EmptyOperator(task_id="dummy", dag=dag)
 
-        dag.sync_to_db()
+        dag.sync_to_db(session=session)
         SerializedDagModel.write_dag(dag, bundle_name="testing", 
session=session)
 
         initial_task_states = {
@@ -1088,7 +1089,7 @@ class TestDagRun:
         }
 
         dag_run = self.create_dag_run(dag=dag, 
task_states=initial_task_states, session=session)
-        dag_run.update_state()
+        dag_run.update_state(session=session)
         assert call(f"dagrun.{dag.dag_id}.first_task_scheduling_delay") not in 
stats_mock.mock_calls
 
     @pytest.mark.parametrize(

Reply via email to