This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 42e2b80  Rename `xcom.dagrun_id` to `xcom.dag_run_id` (#21806)
42e2b80 is described below

commit 42e2b801fe4cdb9e6fcff3c53b7d732bde59282b
Author: Daniel Standish <[email protected]>
AuthorDate: Wed Mar 2 11:03:19 2022 -0800

    Rename `xcom.dagrun_id` to `xcom.dag_run_id` (#21806)
    
    We use snake case for tables and columns in the metadata database.  So 
references (in the database) to `dag_run.id` should be `dag_run_id`.  
Historically though, in many places in the codebase we used the expression "dag 
run id" to refer to the `run_id` attribtue of the DagRun model.  So here we try 
to disambiguate this where necessary by using `run_id` to refer to 
DagRun.run_id and `dag_run_id` to refer to `DagRun.id`.
    
    One area where I did not make any changes is in the REST API which exposes 
parameter `dag_run_id` to refer to the `run_id` field in DagRun.  Since this 
param is user-facing, it's OK for it to be disconnected from the internal code 
references and certainly from database object naming.  We can consider renaming 
to `run_id` in the next major API release.  Also did not change the code in 
`views.py`.
    
    Squashed commits:
    
    * rename IN_MEMORY_DAGRUN_ID
    * rename ARG_EXECUTION_DATE_OR_RUN_ID
    * rename DagrunIdDep
    * fake_dagrun_id
    * rename dagrun_id to dag_run_id
    * clarify docstrings / comments
    * rename dag_run_id to run_id in dag model
    * _set_dag_run_state param dag_run_id -> run_id
---
 airflow/api/common/mark_tasks.py                   | 24 +++++++++----------
 .../endpoints/task_instance_endpoint.py            |  2 +-
 airflow/cli/cli_parser.py                          | 14 +++++------
 airflow/cli/commands/task_command.py               |  4 ++--
 .../7b2661a43ba3_taskinstance_keyed_to_dagrun.py   |  2 +-
 ...c306b5b5ae4a_switch_xcom_table_to_use_run_id.py |  4 ++--
 airflow/models/dag.py                              | 12 +++++-----
 airflow/models/xcom.py                             | 20 ++++++++--------
 airflow/ti_deps/dependencies_deps.py               |  4 ++--
 .../{dagrun_id_dep.py => dagrun_backfill_dep.py}   | 10 ++++----
 airflow/www/views.py                               |  6 ++---
 tests/api/common/test_mark_tasks.py                | 28 +++++++++++-----------
 tests/api_connexion/endpoints/test_log_endpoint.py |  2 +-
 .../endpoints/test_task_instance_endpoint.py       |  4 ++--
 .../api_connexion/endpoints/test_xcom_endpoint.py  |  2 +-
 .../schemas/test_task_instance_schema.py           |  2 +-
 tests/api_connexion/schemas/test_xcom_schema.py    |  2 +-
 tests/models/test_dag.py                           |  2 +-
 tests/models/test_xcom.py                          |  8 +++----
 tests/providers/apache/hive/operators/test_hive.py |  4 ++--
 tests/ti_deps/deps/test_dagrun_id_dep.py           | 16 ++++++-------
 21 files changed, 86 insertions(+), 86 deletions(-)

diff --git a/airflow/api/common/mark_tasks.py b/airflow/api/common/mark_tasks.py
index 3eeb15b..a280d0f 100644
--- a/airflow/api/common/mark_tasks.py
+++ b/airflow/api/common/mark_tasks.py
@@ -80,7 +80,7 @@ def _create_dagruns(
 def set_state(
     *,
     tasks: Iterable[Operator],
-    dag_run_id: Optional[str] = None,
+    run_id: Optional[str] = None,
     execution_date: Optional[datetime] = None,
     upstream: bool = False,
     downstream: bool = False,
@@ -98,7 +98,7 @@ def set_state(
     on the schedule (but it will as for subdag dag runs if needed).
 
     :param tasks: the iterable of tasks from which to work. task.task.dag 
needs to be set
-    :param dag_run_id: the run_id of the dagrun to start looking from
+    :param run_id: the run_id of the dagrun to start looking from
     :param execution_date: the execution date from which to start 
looking(deprecated)
     :param upstream: Mark all parents (upstream tasks)
     :param downstream: Mark all siblings (downstream tasks) of task_id, 
including SubDags
@@ -113,7 +113,7 @@ def set_state(
     if not tasks:
         return []
 
-    if not exactly_one(execution_date, dag_run_id):
+    if not exactly_one(execution_date, run_id):
         raise ValueError("Exactly one of dag_run_id and execution_date must be 
set")
 
     if execution_date and not timezone.is_localized(execution_date):
@@ -127,11 +127,11 @@ def set_state(
         raise ValueError("Received tasks with no DAG")
 
     if execution_date:
-        dag_run_id = dag.get_dagrun(execution_date=execution_date).run_id
-    if not dag_run_id:
-        raise ValueError("Received tasks with no dag_run_id")
+        run_id = dag.get_dagrun(execution_date=execution_date).run_id
+    if not run_id:
+        raise ValueError("Received tasks with no run_id")
 
-    dag_run_ids = get_run_ids(dag, dag_run_id, future, past)
+    dag_run_ids = get_run_ids(dag, run_id, future, past)
 
     task_ids = list(find_task_relatives(tasks, downstream, upstream))
 
@@ -344,16 +344,16 @@ def get_run_ids(dag: DAG, run_id: str, future: bool, 
past: bool, session: SASess
     return run_ids
 
 
-def _set_dag_run_state(dag_id: str, dag_run_id: str, state: DagRunState, 
session: SASession = NEW_SESSION):
+def _set_dag_run_state(dag_id: str, run_id: str, state: DagRunState, session: 
SASession = NEW_SESSION):
     """
     Helper method that set dag run state in the DB.
 
     :param dag_id: dag_id of target dag run
-    :param dag_run_id: dag run id of target dag run
+    :param run_id: run id of target dag run
     :param state: target state
     :param session: database session
     """
-    dag_run = session.query(DagRun).filter(DagRun.dag_id == dag_id, 
DagRun.run_id == dag_run_id).one()
+    dag_run = session.query(DagRun).filter(DagRun.dag_id == dag_id, 
DagRun.run_id == run_id).one()
     dag_run.state = state
     if state == State.RUNNING:
         dag_run.start_date = timezone.utcnow()
@@ -407,7 +407,7 @@ def set_dag_run_state_to_success(
     # Mark all task instances of the dag run to success.
     for task in dag.tasks:
         task.dag = dag
-    return set_state(tasks=dag.tasks, dag_run_id=run_id, state=State.SUCCESS, 
commit=commit, session=session)
+    return set_state(tasks=dag.tasks, run_id=run_id, state=State.SUCCESS, 
commit=commit, session=session)
 
 
 @provide_session
@@ -469,7 +469,7 @@ def set_dag_run_state_to_failed(
         task.dag = dag
         tasks.append(task)
 
-    return set_state(tasks=tasks, dag_run_id=run_id, state=State.FAILED, 
commit=commit, session=session)
+    return set_state(tasks=tasks, run_id=run_id, state=State.FAILED, 
commit=commit, session=session)
 
 
 @provide_session
diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py 
b/airflow/api_connexion/endpoints/task_instance_endpoint.py
index af1b4e3..7611bbb 100644
--- a/airflow/api_connexion/endpoints/task_instance_endpoint.py
+++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py
@@ -357,7 +357,7 @@ def post_set_task_instances_state(*, dag_id: str, session: 
Session = NEW_SESSION
 
     tis = dag.set_task_instance_state(
         task_id=task_id,
-        dag_run_id=run_id,
+        run_id=run_id,
         execution_date=execution_date,
         state=data["new_state"],
         upstream=data["include_upstream"],
diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index e74edec..f621052 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -173,7 +173,7 @@ def string_list_type(val):
 ARG_DAG_ID = Arg(("dag_id",), help="The id of the dag")
 ARG_TASK_ID = Arg(("task_id",), help="The id of the task")
 ARG_EXECUTION_DATE = Arg(("execution_date",), help="The execution date of the 
DAG", type=parsedate)
-ARG_EXECUTION_DATE_OR_DAGRUN_ID = Arg(
+ARG_EXECUTION_DATE_OR_RUN_ID = Arg(
     ('execution_date_or_run_id',), help="The execution_date of the DAG or 
run_id of the DAGRun"
 )
 ARG_TASK_REGEX = Arg(
@@ -1174,7 +1174,7 @@ TASKS_COMMANDS = (
         args=(
             ARG_DAG_ID,
             ARG_TASK_ID,
-            ARG_EXECUTION_DATE_OR_DAGRUN_ID,
+            ARG_EXECUTION_DATE_OR_RUN_ID,
             ARG_SUBDIR,
             ARG_VERBOSE,
             ARG_MAP_INDEX,
@@ -1189,7 +1189,7 @@ TASKS_COMMANDS = (
             "and then run by an executor."
         ),
         
func=lazy_load_command('airflow.cli.commands.task_command.task_failed_deps'),
-        args=(ARG_DAG_ID, ARG_TASK_ID, ARG_EXECUTION_DATE_OR_DAGRUN_ID, 
ARG_SUBDIR, ARG_MAP_INDEX),
+        args=(ARG_DAG_ID, ARG_TASK_ID, ARG_EXECUTION_DATE_OR_RUN_ID, 
ARG_SUBDIR, ARG_MAP_INDEX),
     ),
     ActionCommand(
         name='render',
@@ -1198,7 +1198,7 @@ TASKS_COMMANDS = (
         args=(
             ARG_DAG_ID,
             ARG_TASK_ID,
-            ARG_EXECUTION_DATE_OR_DAGRUN_ID,
+            ARG_EXECUTION_DATE_OR_RUN_ID,
             ARG_SUBDIR,
             ARG_VERBOSE,
             ARG_MAP_INDEX,
@@ -1211,7 +1211,7 @@ TASKS_COMMANDS = (
         args=(
             ARG_DAG_ID,
             ARG_TASK_ID,
-            ARG_EXECUTION_DATE_OR_DAGRUN_ID,
+            ARG_EXECUTION_DATE_OR_RUN_ID,
             ARG_SUBDIR,
             ARG_MARK_SUCCESS,
             ARG_FORCE,
@@ -1242,7 +1242,7 @@ TASKS_COMMANDS = (
         args=(
             ARG_DAG_ID,
             ARG_TASK_ID,
-            ARG_EXECUTION_DATE_OR_DAGRUN_ID,
+            ARG_EXECUTION_DATE_OR_RUN_ID,
             ARG_SUBDIR,
             ARG_DRY_RUN,
             ARG_TASK_PARAMS,
@@ -1255,7 +1255,7 @@ TASKS_COMMANDS = (
         name='states-for-dag-run',
         help="Get the status of all task instances in a dag run",
         
func=lazy_load_command('airflow.cli.commands.task_command.task_states_for_dag_run'),
-        args=(ARG_DAG_ID, ARG_EXECUTION_DATE_OR_DAGRUN_ID, ARG_OUTPUT, 
ARG_VERBOSE),
+        args=(ARG_DAG_ID, ARG_EXECUTION_DATE_OR_RUN_ID, ARG_OUTPUT, 
ARG_VERBOSE),
     ),
 )
 POOLS_COMMANDS = (
diff --git a/airflow/cli/commands/task_command.py 
b/airflow/cli/commands/task_command.py
index 11bbc9b..3c82740 100644
--- a/airflow/cli/commands/task_command.py
+++ b/airflow/cli/commands/task_command.py
@@ -39,7 +39,7 @@ from airflow.models import DagPickle, TaskInstance
 from airflow.models.baseoperator import BaseOperator
 from airflow.models.dag import DAG
 from airflow.models.dagrun import DagRun
-from airflow.models.xcom import IN_MEMORY_DAGRUN_ID
+from airflow.models.xcom import IN_MEMORY_RUN_ID
 from airflow.ti_deps.dep_context import DepContext
 from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS
 from airflow.utils import cli as cli_utils
@@ -98,7 +98,7 @@ def _get_dag_run(
             ) from None
 
     if execution_date is not None:
-        return DagRun(dag.dag_id, run_id=IN_MEMORY_DAGRUN_ID, 
execution_date=execution_date)
+        return DagRun(dag.dag_id, run_id=IN_MEMORY_RUN_ID, 
execution_date=execution_date)
     return DagRun(dag.dag_id, run_id=exec_date_or_run_id, 
execution_date=timezone.utcnow())
 
 
diff --git 
a/airflow/migrations/versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py 
b/airflow/migrations/versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py
index eee0bd2..924e15c 100644
--- a/airflow/migrations/versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py
+++ b/airflow/migrations/versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py
@@ -158,7 +158,7 @@ def upgrade():
                 mssql_where=sa.text("state='queued'"),
             )
     else:
-        # Make sure DagRun id columns are non-nullable
+        # Make sure DagRun PK columns are non-nullable
         with op.batch_alter_table('dag_run', schema=None) as batch_op:
             batch_op.alter_column('dag_id', existing_type=string_id_col_type, 
nullable=False)
             batch_op.alter_column('execution_date', existing_type=dt_type, 
nullable=False)
diff --git 
a/airflow/migrations/versions/c306b5b5ae4a_switch_xcom_table_to_use_run_id.py 
b/airflow/migrations/versions/c306b5b5ae4a_switch_xcom_table_to_use_run_id.py
index 4e38062..1167573 100644
--- 
a/airflow/migrations/versions/c306b5b5ae4a_switch_xcom_table_to_use_run_id.py
+++ 
b/airflow/migrations/versions/c306b5b5ae4a_switch_xcom_table_to_use_run_id.py
@@ -42,7 +42,7 @@ metadata = MetaData()
 
 def _get_new_xcom_columns() -> Sequence[Column]:
     return [
-        Column("dagrun_id", Integer(), nullable=False),
+        Column("dag_run_id", Integer(), nullable=False),
         Column("task_id", StringID(), nullable=False),
         Column("key", StringID(length=512), nullable=False),
         Column("value", LargeBinary),
@@ -117,7 +117,7 @@ def upgrade():
     op.rename_table("__airflow_tmp_xcom", "xcom")
 
     with op.batch_alter_table("xcom") as batch_op:
-        batch_op.create_primary_key("xcom_pkey", ["dagrun_id", "task_id", 
"key"])
+        batch_op.create_primary_key("xcom_pkey", ["dag_run_id", "task_id", 
"key"])
         batch_op.create_index("idx_xcom_key", ["key"])
         batch_op.create_index("idx_xcom_ti_id", ["dag_id", "task_id", 
"run_id"])
 
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 95f2be6..23eb0ce 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1613,7 +1613,7 @@ class DAG(LoggingMixin):
         *,
         task_id: str,
         execution_date: Optional[datetime] = None,
-        dag_run_id: Optional[str] = None,
+        run_id: Optional[str] = None,
         state: TaskInstanceState,
         upstream: bool = False,
         downstream: bool = False,
@@ -1628,7 +1628,7 @@ class DAG(LoggingMixin):
 
         :param task_id: Task ID of the TaskInstance
         :param execution_date: Execution date of the TaskInstance
-        :param dag_run_id: The run_id of the TaskInstance
+        :param run_id: The run_id of the TaskInstance
         :param state: State to set the TaskInstance to
         :param upstream: Include all upstream tasks of the given task_id
         :param downstream: Include all downstream tasks of the given task_id
@@ -1638,12 +1638,12 @@ class DAG(LoggingMixin):
         """
         from airflow.api.common.mark_tasks import set_state
 
-        if not exactly_one(execution_date, dag_run_id):
-            raise ValueError("Exactly one of execution_date or dag_run_id must 
be provided")
+        if not exactly_one(execution_date, run_id):
+            raise ValueError("Exactly one of execution_date or run_id must be 
provided")
 
         if execution_date is None:
             dag_run = (
-                session.query(DagRun).filter(DagRun.run_id == dag_run_id, 
DagRun.dag_id == self.dag_id).one()
+                session.query(DagRun).filter(DagRun.run_id == run_id, 
DagRun.dag_id == self.dag_id).one()
             )  # Raises an error if not found
             resolve_execution_date = dag_run.execution_date
         else:
@@ -1655,7 +1655,7 @@ class DAG(LoggingMixin):
         altered = set_state(
             tasks=[task],
             execution_date=execution_date,
-            dag_run_id=dag_run_id,
+            run_id=run_id,
             upstream=upstream,
             downstream=downstream,
             future=future,
diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py
index 3f58c1d..32190d2 100644
--- a/airflow/models/xcom.py
+++ b/airflow/models/xcom.py
@@ -48,7 +48,7 @@ XCOM_RETURN_KEY = 'return_value'
 
 # Stand-in value for 'airflow task test' generating a temporary in-memory DAG
 # run without storing it in the database.
-IN_MEMORY_DAGRUN_ID = "__airflow_in_memory_dagrun__"
+IN_MEMORY_RUN_ID = "__airflow_in_memory_dagrun__"
 
 if TYPE_CHECKING:
     from airflow.models.taskinstance import TaskInstanceKey
@@ -59,7 +59,7 @@ class BaseXCom(Base, LoggingMixin):
 
     __tablename__ = "xcom"
 
-    dagrun_id = Column(Integer(), nullable=False, primary_key=True)
+    dag_run_id = Column(Integer(), nullable=False, primary_key=True)
     task_id = Column(String(ID_LEN, **COLLATION_ARGS), nullable=False, 
primary_key=True)
     key = Column(String(512, **COLLATION_ARGS), nullable=False, 
primary_key=True)
 
@@ -163,18 +163,18 @@ class BaseXCom(Base, LoggingMixin):
             message = "Passing 'execution_date' to 'XCom.set()' is deprecated. 
Use 'run_id' instead."
             warnings.warn(message, DeprecationWarning, stacklevel=3)
             try:
-                dagrun_id, run_id = (
+                dag_run_id, run_id = (
                     session.query(DagRun.id, DagRun.run_id)
                     .filter(DagRun.dag_id == dag_id, DagRun.execution_date == 
execution_date)
                     .one()
                 )
             except NoResultFound:
                 raise ValueError(f"DAG run not found on DAG {dag_id!r} at 
{execution_date}") from None
-        elif run_id == IN_MEMORY_DAGRUN_ID:
-            dagrun_id = -1
+        elif run_id == IN_MEMORY_RUN_ID:
+            dag_run_id = -1
         else:
-            dagrun_id = session.query(DagRun.id).filter_by(dag_id=dag_id, 
run_id=run_id).scalar()
-            if dagrun_id is None:
+            dag_run_id = session.query(DagRun.id).filter_by(dag_id=dag_id, 
run_id=run_id).scalar()
+            if dag_run_id is None:
                 raise ValueError(f"DAG run not found on DAG {dag_id!r} with ID 
{run_id!r}")
 
         value = cls.serialize_value(
@@ -182,7 +182,7 @@ class BaseXCom(Base, LoggingMixin):
             key=key,
             task_id=task_id,
             dag_id=dag_id,
-            run_id=dagrun_id,
+            run_id=run_id,
         )
 
         # Remove duplicate XComs and insert a new one.
@@ -193,7 +193,7 @@ class BaseXCom(Base, LoggingMixin):
             cls.dag_id == dag_id,
         ).delete()
         new = cast(Any, cls)(  # Work around Mypy complaining model not 
defining '__init__'.
-            dagrun_id=dagrun_id,
+            dag_run_id=dag_run_id,
             key=key,
             value=value,
             run_id=run_id,
@@ -410,7 +410,7 @@ class BaseXCom(Base, LoggingMixin):
             if execution_date is not None:
                 query = query.filter(DagRun.execution_date <= execution_date)
             else:
-                # This returns an empty query result for IN_MEMORY_DAGRUN_ID,
+                # This returns an empty query result for IN_MEMORY_RUN_ID,
                 # but that is impossible to implement. Sorry?
                 dr = session.query(DagRun.execution_date).filter(DagRun.run_id 
== run_id).subquery()
                 query = query.filter(cls.execution_date <= dr.c.execution_date)
diff --git a/airflow/ti_deps/dependencies_deps.py 
b/airflow/ti_deps/dependencies_deps.py
index 7062995..cfacb11 100644
--- a/airflow/ti_deps/dependencies_deps.py
+++ b/airflow/ti_deps/dependencies_deps.py
@@ -23,8 +23,8 @@ from airflow.ti_deps.dependencies_states import (
 )
 from airflow.ti_deps.deps.dag_ti_slots_available_dep import 
DagTISlotsAvailableDep
 from airflow.ti_deps.deps.dag_unpaused_dep import DagUnpausedDep
+from airflow.ti_deps.deps.dagrun_backfill_dep import DagRunNotBackfillDep
 from airflow.ti_deps.deps.dagrun_exists_dep import DagrunRunningDep
-from airflow.ti_deps.deps.dagrun_id_dep import DagrunIdDep
 from airflow.ti_deps.deps.exec_date_after_start_date_dep import 
ExecDateAfterStartDateDep
 from airflow.ti_deps.deps.pool_slots_available_dep import PoolSlotsAvailableDep
 from airflow.ti_deps.deps.runnable_exec_date_dep import RunnableExecDateDep
@@ -85,7 +85,7 @@ SCHEDULER_QUEUED_DEPS = {
     TaskConcurrencyDep(),
     PoolSlotsAvailableDep(),
     DagrunRunningDep(),
-    DagrunIdDep(),
+    DagRunNotBackfillDep(),
     DagUnpausedDep(),
     ExecDateAfterStartDateDep(),
     TaskNotRunningDep(),
diff --git a/airflow/ti_deps/deps/dagrun_id_dep.py 
b/airflow/ti_deps/deps/dagrun_backfill_dep.py
similarity index 84%
rename from airflow/ti_deps/deps/dagrun_id_dep.py
rename to airflow/ti_deps/deps/dagrun_backfill_dep.py
index 84be503..949af43 100644
--- a/airflow/ti_deps/deps/dagrun_id_dep.py
+++ b/airflow/ti_deps/deps/dagrun_backfill_dep.py
@@ -16,17 +16,17 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""This module defines dep for DagRun ID validation"""
+"""This module defines dep for making sure DagRun not a backfill."""
 
 from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
 from airflow.utils.session import provide_session
 from airflow.utils.types import DagRunType
 
 
-class DagrunIdDep(BaseTIDep):
-    """Dep for valid DagRun ID to schedule from scheduler"""
+class DagRunNotBackfillDep(BaseTIDep):
+    """Dep for valid DagRun run_id to schedule from scheduler"""
 
-    NAME = "Dagrun run_id is not backfill job ID"
+    NAME = "DagRun is not backfill job"
     IGNORABLE = True
 
     @provide_session
@@ -37,7 +37,7 @@ class DagrunIdDep(BaseTIDep):
         :param ti: the task instance to get the dependency status for
         :param session: database session
         :param dep_context: the context for which this dependency should be 
evaluated for
-        :return: True if DagRun ID is valid for scheduling from scheduler.
+        :return: True if DagRun is valid for scheduling from scheduler.
         """
         dagrun = ti.get_dagrun(session)
 
diff --git a/airflow/www/views.py b/airflow/www/views.py
index f913838..2c3382c 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -2221,7 +2221,7 @@ class Airflow(AirflowBaseView):
 
         altered = dag.set_task_instance_state(
             task_id=task_id,
-            dag_run_id=dag_run_id,
+            run_id=dag_run_id,
             state=state,
             upstream=upstream,
             downstream=downstream,
@@ -2283,7 +2283,7 @@ class Airflow(AirflowBaseView):
 
         to_be_altered = set_state(
             tasks=[task],
-            dag_run_id=dag_run_id,
+            run_id=dag_run_id,
             upstream=upstream,
             downstream=downstream,
             future=future,
@@ -3596,7 +3596,7 @@ class XComModelView(AirflowModelView):
 
     search_columns = ['key', 'value', 'timestamp', 'dag_id', 'task_id', 
'run_id']
     list_columns = ['key', 'value', 'timestamp', 'dag_id', 'task_id', 'run_id']
-    base_order = ('dagrun_id', 'desc')
+    base_order = ('dag_run_id', 'desc')
 
     base_filters = [['dag_id', DagFilter, lambda: []]]
 
diff --git a/tests/api/common/test_mark_tasks.py 
b/tests/api/common/test_mark_tasks.py
index fd14a77..7662eec 100644
--- a/tests/api/common/test_mark_tasks.py
+++ b/tests/api/common/test_mark_tasks.py
@@ -153,7 +153,7 @@ class TestMarkTasks:
         dr = DagRun.find(dag_id=self.dag1.dag_id, 
execution_date=self.execution_dates[0])[0]
         altered = set_state(
             tasks=[task],
-            dag_run_id=dr.run_id,
+            run_id=dr.run_id,
             upstream=False,
             downstream=False,
             future=False,
@@ -167,7 +167,7 @@ class TestMarkTasks:
         # set one and only one task to success
         altered = set_state(
             tasks=[task],
-            dag_run_id=dr.run_id,
+            run_id=dr.run_id,
             upstream=False,
             downstream=False,
             future=False,
@@ -181,7 +181,7 @@ class TestMarkTasks:
         # set no tasks
         altered = set_state(
             tasks=[task],
-            dag_run_id=dr.run_id,
+            run_id=dr.run_id,
             upstream=False,
             downstream=False,
             future=False,
@@ -195,7 +195,7 @@ class TestMarkTasks:
         # set task to other than success
         altered = set_state(
             tasks=[task],
-            dag_run_id=dr.run_id,
+            run_id=dr.run_id,
             upstream=False,
             downstream=False,
             future=False,
@@ -211,7 +211,7 @@ class TestMarkTasks:
         task = self.dag1.get_task("runme_0")
         altered = set_state(
             tasks=[task],
-            dag_run_id=dr.run_id,
+            run_id=dr.run_id,
             upstream=False,
             downstream=False,
             future=False,
@@ -228,7 +228,7 @@ class TestMarkTasks:
         dr = DagRun.find(dag_id=self.dag3.dag_id, 
execution_date=self.dag3_execution_dates[1])[0]
         altered = set_state(
             tasks=[task],
-            dag_run_id=dr.run_id,
+            run_id=dr.run_id,
             upstream=False,
             downstream=False,
             future=False,
@@ -255,7 +255,7 @@ class TestMarkTasks:
 
         altered = set_state(
             tasks=[task],
-            dag_run_id=dr.run_id,
+            run_id=dr.run_id,
             upstream=False,
             downstream=True,
             future=False,
@@ -277,7 +277,7 @@ class TestMarkTasks:
 
         altered = set_state(
             tasks=[task],
-            dag_run_id=dr.run_id,
+            run_id=dr.run_id,
             upstream=True,
             downstream=False,
             future=False,
@@ -295,7 +295,7 @@ class TestMarkTasks:
         dr = DagRun.find(dag_id=self.dag1.dag_id, 
execution_date=self.execution_dates[0])[0]
         altered = set_state(
             tasks=[task],
-            dag_run_id=dr.run_id,
+            run_id=dr.run_id,
             upstream=False,
             downstream=False,
             future=True,
@@ -311,7 +311,7 @@ class TestMarkTasks:
         dr = DagRun.find(dag_id=self.dag3.dag_id, 
execution_date=self.dag3_execution_dates[1])[0]
         altered = set_state(
             tasks=[task],
-            dag_run_id=dr.run_id,
+            run_id=dr.run_id,
             upstream=False,
             downstream=False,
             future=True,
@@ -330,7 +330,7 @@ class TestMarkTasks:
         dr = DagRun.find(dag_id=self.dag1.dag_id, 
execution_date=self.execution_dates[1])[0]
         altered = set_state(
             tasks=[task],
-            dag_run_id=dr.run_id,
+            run_id=dr.run_id,
             upstream=False,
             downstream=False,
             future=False,
@@ -346,7 +346,7 @@ class TestMarkTasks:
         dr = DagRun.find(dag_id=self.dag3.dag_id, 
execution_date=self.dag3_execution_dates[1])[0]
         altered = set_state(
             tasks=[task],
-            dag_run_id=dr.run_id,
+            run_id=dr.run_id,
             upstream=False,
             downstream=False,
             future=False,
@@ -365,7 +365,7 @@ class TestMarkTasks:
         dr = DagRun.find(dag_id=self.dag1.dag_id, 
execution_date=self.execution_dates[0])[0]
         altered = set_state(
             tasks=tasks,
-            dag_run_id=dr.run_id,
+            run_id=dr.run_id,
             upstream=False,
             downstream=False,
             future=False,
@@ -392,7 +392,7 @@ class TestMarkTasks:
 
         altered = set_state(
             tasks=[task],
-            dag_run_id=dr.run_id,
+            run_id=dr.run_id,
             upstream=False,
             downstream=True,
             future=False,
diff --git a/tests/api_connexion/endpoints/test_log_endpoint.py 
b/tests/api_connexion/endpoints/test_log_endpoint.py
index f5d2e5d..3b64387 100644
--- a/tests/api_connexion/endpoints/test_log_endpoint.py
+++ b/tests/api_connexion/endpoints/test_log_endpoint.py
@@ -266,7 +266,7 @@ class TestGetLog:
 
     def test_raises_404_for_invalid_dag_run_id(self):
         response = self.client.get(
-            f"api/v1/dags/{self.DAG_ID}/dagRuns/NO_DAG_RUN/"  # invalid 
dagrun_id
+            f"api/v1/dags/{self.DAG_ID}/dagRuns/NO_DAG_RUN/"  # invalid run_id
             f"taskInstances/{self.TASK_ID}/logs/1?",
             headers={'Accept': 'application/json'},
             environ_overrides={'REMOTE_USER': "test"},
diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py 
b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
index 8f5676e..b4353f8 100644
--- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py
+++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
@@ -1126,7 +1126,7 @@ class 
TestPostSetTaskInstanceState(TestTaskInstanceEndpoint):
         mock_set_task_instance_state.assert_called_once_with(
             commit=False,
             downstream=True,
-            dag_run_id=None,
+            run_id=None,
             execution_date=DEFAULT_DATETIME_1,
             future=True,
             past=True,
@@ -1175,7 +1175,7 @@ class 
TestPostSetTaskInstanceState(TestTaskInstanceEndpoint):
         mock_set_task_instance_state.assert_called_once_with(
             commit=False,
             downstream=True,
-            dag_run_id=run_id,
+            run_id=run_id,
             execution_date=None,
             future=True,
             past=True,
diff --git a/tests/api_connexion/endpoints/test_xcom_endpoint.py 
b/tests/api_connexion/endpoints/test_xcom_endpoint.py
index 0a04371..931325d 100644
--- a/tests/api_connexion/endpoints/test_xcom_endpoint.py
+++ b/tests/api_connexion/endpoints/test_xcom_endpoint.py
@@ -443,7 +443,7 @@ class TestPaginationGetXComEntries(TestXComEndpoint):
         with create_session() as session:
             for i in range(1, 11):
                 xcom = XCom(
-                    dagrun_id=dagrun.id,
+                    dag_run_id=dagrun.id,
                     key=f"TEST_XCOM_KEY{i}",
                     value=b"null",
                     run_id=self.dag_run_id,
diff --git a/tests/api_connexion/schemas/test_task_instance_schema.py 
b/tests/api_connexion/schemas/test_task_instance_schema.py
index 69111ff..4aeea63 100644
--- a/tests/api_connexion/schemas/test_task_instance_schema.py
+++ b/tests/api_connexion/schemas/test_task_instance_schema.py
@@ -214,7 +214,7 @@ class TestSetTaskInstanceStateFormSchema:
             ({"include_future": "foo"},),
             ({"execution_date": "NOW"},),
             ({"new_state": "INVALID_STATE"},),
-            ({"execution_date": "2020-01-01T00:00:00+00:00", "dag_run_id": 
"dagrun_id"},),
+            ({"execution_date": "2020-01-01T00:00:00+00:00", "dag_run_id": 
"some-run-id"},),
         ]
     )
     def test_validation_error(self, override_data):
diff --git a/tests/api_connexion/schemas/test_xcom_schema.py 
b/tests/api_connexion/schemas/test_xcom_schema.py
index 84b2663..a9fd275 100644
--- a/tests/api_connexion/schemas/test_xcom_schema.py
+++ b/tests/api_connexion/schemas/test_xcom_schema.py
@@ -48,7 +48,7 @@ def create_xcom(create_task_instance, session):
         )
         run: DagRun = ti.dag_run
         xcom = XCom(
-            dagrun_id=run.id,
+            dag_run_id=run.id,
             task_id=ti.task_id,
             key=key,
             value=value,
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 2204a06..cbda0ef 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -2260,7 +2260,7 @@ def test_set_task_instance_state(run_id, execution_date, 
session, dag_maker):
 
     altered = dag.set_task_instance_state(
         task_id=task_1.task_id,
-        dag_run_id=run_id,
+        run_id=run_id,
         execution_date=execution_date,
         state=State.SUCCESS,
         session=session,
diff --git a/tests/models/test_xcom.py b/tests/models/test_xcom.py
index 733b2d4..5dd979f 100644
--- a/tests/models/test_xcom.py
+++ b/tests/models/test_xcom.py
@@ -25,7 +25,7 @@ import pytest
 from airflow.configuration import conf
 from airflow.models.dagrun import DagRun, DagRunType
 from airflow.models.taskinstance import TaskInstanceKey
-from airflow.models.xcom import IN_MEMORY_DAGRUN_ID, XCOM_RETURN_KEY, 
BaseXCom, XCom, resolve_xcom_backend
+from airflow.models.xcom import IN_MEMORY_RUN_ID, XCOM_RETURN_KEY, BaseXCom, 
XCom, resolve_xcom_backend
 from airflow.settings import json
 from airflow.utils import timezone
 from airflow.utils.session import create_session
@@ -202,7 +202,7 @@ class TestXCom:
             key=XCOM_RETURN_KEY,
             dag_id="test_dag",
             task_id="test_task",
-            run_id=IN_MEMORY_DAGRUN_ID,
+            run_id=IN_MEMORY_RUN_ID,
         )
 
         XCom = resolve_xcom_backend()
@@ -244,9 +244,9 @@ class TestXCom:
             key=XCOM_RETURN_KEY,
             dag_id="test_dag",
             task_id="test_task",
-            run_id=IN_MEMORY_DAGRUN_ID,
+            run_id=IN_MEMORY_RUN_ID,
         )
-        expected = {**kwargs, 'run_id': -1}
+        expected = {**kwargs, 'run_id': '__airflow_in_memory_dagrun__'}
         XCom = resolve_xcom_backend()
         XCom.set(**kwargs)
         serialize_watcher.assert_called_once_with(**expected)
diff --git a/tests/providers/apache/hive/operators/test_hive.py 
b/tests/providers/apache/hive/operators/test_hive.py
index 8a167da..97027a1 100644
--- a/tests/providers/apache/hive/operators/test_hive.py
+++ b/tests/providers/apache/hive/operators/test_hive.py
@@ -79,10 +79,10 @@ class HiveOperatorTest(TestHiveEnvironment):
         mock_get_hook.return_value = mock_hook
         op = HiveOperator(task_id='test_mapred_job_name', hql=self.hql, 
dag=self.dag)
 
-        fake_dagrun_id = "test_mapred_job_name"
+        fake_run_id = "test_mapred_job_name"
         fake_execution_date = timezone.datetime(2018, 6, 19)
         fake_ti = TaskInstance(task=op)
-        fake_ti.dag_run = DagRun(run_id=fake_dagrun_id, 
execution_date=fake_execution_date)
+        fake_ti.dag_run = DagRun(run_id=fake_run_id, 
execution_date=fake_execution_date)
         fake_ti.hostname = 'fake_hostname'
         fake_context = {'ti': fake_ti}
 
diff --git a/tests/ti_deps/deps/test_dagrun_id_dep.py 
b/tests/ti_deps/deps/test_dagrun_id_dep.py
index e416dd5..8bb4ad0 100644
--- a/tests/ti_deps/deps/test_dagrun_id_dep.py
+++ b/tests/ti_deps/deps/test_dagrun_id_dep.py
@@ -21,31 +21,31 @@ import unittest
 from unittest.mock import Mock
 
 from airflow.models.dagrun import DagRun
-from airflow.ti_deps.deps.dagrun_id_dep import DagrunIdDep
+from airflow.ti_deps.deps.dagrun_backfill_dep import DagRunNotBackfillDep
 from airflow.utils.types import DagRunType
 
 
 class TestDagrunRunningDep(unittest.TestCase):
-    def test_dagrun_id_is_backfill(self):
+    def test_run_id_is_backfill(self):
         """
-        Task instances whose dagrun ID is a backfill dagrun ID should fail 
this dep.
+        Task instances whose run_id is a backfill dagrun run_id should fail 
this dep.
         """
         dagrun = DagRun()
         dagrun.run_id = "anything"
         dagrun.run_type = DagRunType.BACKFILL_JOB
         ti = Mock(get_dagrun=Mock(return_value=dagrun))
-        assert not DagrunIdDep().is_met(ti=ti)
+        assert not DagRunNotBackfillDep().is_met(ti=ti)
 
-    def test_dagrun_id_is_not_backfill(self):
+    def test_run_id_is_not_backfill(self):
         """
-        Task instances whose dagrun ID is not a backfill dagrun ID should pass 
this dep.
+        Task instances whose run_id is not a backfill run_id should pass this 
dep.
         """
         dagrun = DagRun()
         dagrun.run_type = 'custom_type'
         ti = Mock(get_dagrun=Mock(return_value=dagrun))
-        assert DagrunIdDep().is_met(ti=ti)
+        assert DagRunNotBackfillDep().is_met(ti=ti)
 
         dagrun = DagRun()
         dagrun.run_id = None
         ti = Mock(get_dagrun=Mock(return_value=dagrun))
-        assert DagrunIdDep().is_met(ti=ti)
+        assert DagRunNotBackfillDep().is_met(ti=ti)

Reply via email to