This is an automated email from the ASF dual-hosted git repository.
rahulvats pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 00e3862067c Fix the defaults for OTEL dag_run and task_instance attrs
(#48777)
00e3862067c is described below
commit 00e3862067ca3bf3935b38405cbcc63df1d571e0
Author: Daniel Standish <[email protected]>
AuthorDate: Fri Apr 4 04:19:51 2025 -0700
Fix the defaults for OTEL dag_run and task_instance attrs (#48777)
Fix the defaults for OTEL dag_run and task_instance attrs
---
airflow-core/docs/img/airflow_erd.sha256 | 2 +-
.../migrations/versions/0065_3_0_0_add_new_otel_span_fields.py | 9 ++++++---
airflow-core/src/airflow/models/dagrun.py | 2 +-
airflow-core/src/airflow/models/taskinstance.py | 2 +-
airflow-core/src/airflow/models/taskinstancehistory.py | 2 +-
airflow-core/tests/unit/api_fastapi/common/test_exceptions.py | 6 +++---
airflow-core/tests/unit/jobs/test_scheduler_job.py | 1 +
airflow-core/tests/unit/models/test_dag.py | 1 +
airflow-core/tests/unit/models/test_dagrun.py | 5 +++--
9 files changed, 18 insertions(+), 12 deletions(-)
diff --git a/airflow-core/docs/img/airflow_erd.sha256
b/airflow-core/docs/img/airflow_erd.sha256
index f13a304e3e0..6eac20952c8 100644
--- a/airflow-core/docs/img/airflow_erd.sha256
+++ b/airflow-core/docs/img/airflow_erd.sha256
@@ -1 +1 @@
-d0ef3225afc0b4388e3c39548af9d8ef9cd213a644860b5d4043e3d1c3d21304
\ No newline at end of file
+5ee140103f80a15dc6cc7c808a49c415901a7c062d2cab9d26d49abd86e82aea
\ No newline at end of file
diff --git
a/airflow-core/src/airflow/migrations/versions/0065_3_0_0_add_new_otel_span_fields.py
b/airflow-core/src/airflow/migrations/versions/0065_3_0_0_add_new_otel_span_fields.py
index 44d092ba956..5b9e9067dc7 100644
---
a/airflow-core/src/airflow/migrations/versions/0065_3_0_0_add_new_otel_span_fields.py
+++
b/airflow-core/src/airflow/migrations/versions/0065_3_0_0_add_new_otel_span_fields.py
@@ -44,16 +44,19 @@ def upgrade():
"""Apply add new otel span fields."""
op.add_column("dag_run", sa.Column("scheduled_by_job_id", sa.Integer,
nullable=True))
op.add_column("dag_run", sa.Column("context_carrier", ExtendedJSON,
nullable=True))
- op.add_column("dag_run", sa.Column("span_status", sa.String(250),
nullable=False, default="not_started"))
+ op.add_column(
+ "dag_run", sa.Column("span_status", sa.String(250), nullable=False,
server_default="not_started")
+ )
op.add_column("task_instance", sa.Column("context_carrier", ExtendedJSON,
nullable=True))
op.add_column(
- "task_instance", sa.Column("span_status", sa.String(250),
nullable=False, default="not_started")
+ "task_instance",
+ sa.Column("span_status", sa.String(250), nullable=False,
server_default="not_started"),
)
op.add_column("task_instance_history", sa.Column("context_carrier",
ExtendedJSON, nullable=True))
op.add_column(
"task_instance_history",
- sa.Column("span_status", sa.String(250), nullable=False,
default="not_started"),
+ sa.Column("span_status", sa.String(250), nullable=False,
server_default="not_started"),
)
diff --git a/airflow-core/src/airflow/models/dagrun.py
b/airflow-core/src/airflow/models/dagrun.py
index 1217c6e4147..f05405977f6 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -190,7 +190,7 @@ class DagRun(Base, LoggingMixin):
scheduled_by_job_id = Column(Integer)
# Span context carrier, used for context propagation.
context_carrier = Column(MutableDict.as_mutable(ExtendedJSON))
- span_status = Column(String(250), default=SpanStatus.NOT_STARTED,
nullable=False)
+ span_status = Column(String(250), server_default=SpanStatus.NOT_STARTED,
nullable=False)
# Remove this `if` after upgrading Sphinx-AutoAPI
if not TYPE_CHECKING and "BUILDING_AIRFLOW_DOCS" in os.environ:
diff --git a/airflow-core/src/airflow/models/taskinstance.py
b/airflow-core/src/airflow/models/taskinstance.py
index 8517827de36..479fedd9249 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -1660,7 +1660,7 @@ class TaskInstance(Base, LoggingMixin):
updated_at = Column(UtcDateTime, default=timezone.utcnow,
onupdate=timezone.utcnow)
_rendered_map_index = Column("rendered_map_index", String(250))
context_carrier = Column(MutableDict.as_mutable(ExtendedJSON))
- span_status = Column(String(250), default=SpanStatus.NOT_STARTED,
nullable=False)
+ span_status = Column(String(250), server_default=SpanStatus.NOT_STARTED,
nullable=False)
external_executor_id = Column(StringID())
diff --git a/airflow-core/src/airflow/models/taskinstancehistory.py
b/airflow-core/src/airflow/models/taskinstancehistory.py
index 5a251c7223e..56b9e13d29c 100644
--- a/airflow-core/src/airflow/models/taskinstancehistory.py
+++ b/airflow-core/src/airflow/models/taskinstancehistory.py
@@ -94,7 +94,7 @@ class TaskInstanceHistory(Base):
updated_at = Column(UtcDateTime, default=timezone.utcnow,
onupdate=timezone.utcnow)
rendered_map_index = Column(String(250))
context_carrier = Column(MutableDict.as_mutable(ExtendedJSON))
- span_status = Column(String(250), default=SpanStatus.NOT_STARTED,
nullable=False)
+ span_status = Column(String(250), server_default=SpanStatus.NOT_STARTED,
nullable=False)
external_executor_id = Column(StringID())
trigger_id = Column(Integer)
diff --git a/airflow-core/tests/unit/api_fastapi/common/test_exceptions.py
b/airflow-core/tests/unit/api_fastapi/common/test_exceptions.py
index dbcf77e92e1..bc0a6dd9c4c 100644
--- a/airflow-core/tests/unit/api_fastapi/common/test_exceptions.py
+++ b/airflow-core/tests/unit/api_fastapi/common/test_exceptions.py
@@ -186,7 +186,7 @@ class TestUniqueConstraintErrorHandler:
status_code=status.HTTP_409_CONFLICT,
detail={
"reason": "Unique constraint violation",
- "statement": "INSERT INTO dag_run (dag_id,
queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id,
run_type, triggered_by, conf, data_interval_start, data_interval_end,
run_after, last_scheduling_decision, log_template_id, updated_at, clear_number,
backfill_id, bundle_version, scheduled_by_job_id, context_carrier, span_status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, (SELECT
max(log_template.id) AS max_1 \nFROM log_template), [...]
+ "statement": "INSERT INTO dag_run (dag_id,
queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id,
run_type, triggered_by, conf, data_interval_start, data_interval_end,
run_after, last_scheduling_decision, log_template_id, updated_at, clear_number,
backfill_id, bundle_version, scheduled_by_job_id, context_carrier) VALUES (?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, (SELECT max(log_template.id) AS max_1
\nFROM log_template), ?, ?, ?, ?, ?, ?)",
"orig_error": "UNIQUE constraint failed:
dag_run.dag_id, dag_run.run_id",
},
),
@@ -194,7 +194,7 @@ class TestUniqueConstraintErrorHandler:
status_code=status.HTTP_409_CONFLICT,
detail={
"reason": "Unique constraint violation",
- "statement": "INSERT INTO dag_run (dag_id,
queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id,
run_type, triggered_by, conf, data_interval_start, data_interval_end,
run_after, last_scheduling_decision, log_template_id, updated_at, clear_number,
backfill_id, bundle_version, scheduled_by_job_id, context_carrier, span_status)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, (SELECT
max(log_template.id) AS max_1 \nFROM [...]
+ "statement": "INSERT INTO dag_run (dag_id,
queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id,
run_type, triggered_by, conf, data_interval_start, data_interval_end,
run_after, last_scheduling_decision, log_template_id, updated_at, clear_number,
backfill_id, bundle_version, scheduled_by_job_id, context_carrier) VALUES (%s,
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, (SELECT
max(log_template.id) AS max_1 \nFROM log_template [...]
"orig_error": "(1062, \"Duplicate entry
'test_dag_id-test_run_id' for key 'dag_run.dag_run_dag_id_run_id_key'\")",
},
),
@@ -202,7 +202,7 @@ class TestUniqueConstraintErrorHandler:
status_code=status.HTTP_409_CONFLICT,
detail={
"reason": "Unique constraint violation",
- "statement": "INSERT INTO dag_run (dag_id,
queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id,
run_type, triggered_by, conf, data_interval_start, data_interval_end,
run_after, last_scheduling_decision, log_template_id, updated_at, clear_number,
backfill_id, bundle_version, scheduled_by_job_id, context_carrier, span_status)
VALUES (%(dag_id)s, %(queued_at)s, %(logical_date)s, %(start_date)s,
%(end_date)s, %(state)s, %(run_id)s, %(crea [...]
+ "statement": "INSERT INTO dag_run (dag_id,
queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id,
run_type, triggered_by, conf, data_interval_start, data_interval_end,
run_after, last_scheduling_decision, log_template_id, updated_at, clear_number,
backfill_id, bundle_version, scheduled_by_job_id, context_carrier) VALUES
(%(dag_id)s, %(queued_at)s, %(logical_date)s, %(start_date)s, %(end_date)s,
%(state)s, %(run_id)s, %(creating_job_id)s [...]
"orig_error": 'duplicate key value violates unique
constraint "dag_run_dag_id_run_id_key"\nDETAIL: Key (dag_id,
run_id)=(test_dag_id, test_run_id) already exists.\n',
},
),
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index c703037b0f0..e35b83c6e8b 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -3160,6 +3160,7 @@ class TestSchedulerJob:
run_after=next_info.run_after,
state=DagRunState.RUNNING,
triggered_by=DagRunTriggeredByType.TEST,
+ session=session,
)
next_info = dag.next_dagrun_info(next_info.data_interval)
if next_info is None:
diff --git a/airflow-core/tests/unit/models/test_dag.py
b/airflow-core/tests/unit/models/test_dag.py
index b2af9adcb89..b541f3048ee 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -1088,6 +1088,7 @@ class TestDag:
run_after=logical_date,
dag_version=dag_v,
triggered_by=DagRunTriggeredByType.TEST,
+ session=session,
)
ti_op1 = dr.get_task_instance(task_id=op1.task_id, session=session)
ti_op1.set_state(state=TaskInstanceState.FAILED, session=session)
diff --git a/airflow-core/tests/unit/models/test_dagrun.py
b/airflow-core/tests/unit/models/test_dagrun.py
index b7b2cfa9b24..06b43ade895 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -125,6 +125,7 @@ class TestDagRun:
state=state,
dag_version=dag_version or
DagVersion.get_latest_version(dag.dag_id, session=session),
triggered_by=DagRunTriggeredByType.TEST,
+ session=session,
)
if task_states is not None:
@@ -1080,7 +1081,7 @@ class TestDagRun:
dag = DAG(dag_id="test_dagrun_stats",
schedule=datetime.timedelta(days=1), start_date=DEFAULT_DATE)
dag_task = EmptyOperator(task_id="dummy", dag=dag)
- dag.sync_to_db()
+ dag.sync_to_db(session=session)
SerializedDagModel.write_dag(dag, bundle_name="testing",
session=session)
initial_task_states = {
@@ -1088,7 +1089,7 @@ class TestDagRun:
}
dag_run = self.create_dag_run(dag=dag,
task_states=initial_task_states, session=session)
- dag_run.update_state()
+ dag_run.update_state(session=session)
assert call(f"dagrun.{dag.dag_id}.first_task_scheduling_delay") not in
stats_mock.mock_calls
@pytest.mark.parametrize(