This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 42e2b80 Rename `xcom.dagrun_id` to `xcom.dag_run_id` (#21806)
42e2b80 is described below
commit 42e2b801fe4cdb9e6fcff3c53b7d732bde59282b
Author: Daniel Standish <[email protected]>
AuthorDate: Wed Mar 2 11:03:19 2022 -0800
Rename `xcom.dagrun_id` to `xcom.dag_run_id` (#21806)
We use snake case for tables and columns in the metadata database. So
references (in the database) to `dag_run.id` should be `dag_run_id`.
Historically though, in many places in the codebase we used the expression "dag
run id" to refer to the `run_id` attribtue of the DagRun model. So here we try
to disambiguate this where necessary by using `run_id` to refer to
DagRun.run_id and `dag_run_id` to refer to `DagRun.id`.
One area where I did not make any changes is in the REST API which exposes
parameter `dag_run_id` to refer to the `run_id` field in DagRun. Since this
param is user-facing, it's OK for it to be disconnected from the internal code
references and certainly from database object naming. We can consider renaming
to `run_id` in the next major API release. Also did not change the code in
`views.py`.
Squashed commits:
* rename IN_MEMORY_DAGRUN_ID
* rename ARG_EXECUTION_DATE_OR_RUN_ID
* rename DagrunIdDep
* fake_dagrun_id
* rename dagrun_id to dag_run_id
* clarify docstrings / comments
* rename dag_run_id to run_id in dag model
* _set_dag_run_state param dag_run_id -> run_id
---
airflow/api/common/mark_tasks.py | 24 +++++++++----------
.../endpoints/task_instance_endpoint.py | 2 +-
airflow/cli/cli_parser.py | 14 +++++------
airflow/cli/commands/task_command.py | 4 ++--
.../7b2661a43ba3_taskinstance_keyed_to_dagrun.py | 2 +-
...c306b5b5ae4a_switch_xcom_table_to_use_run_id.py | 4 ++--
airflow/models/dag.py | 12 +++++-----
airflow/models/xcom.py | 20 ++++++++--------
airflow/ti_deps/dependencies_deps.py | 4 ++--
.../{dagrun_id_dep.py => dagrun_backfill_dep.py} | 10 ++++----
airflow/www/views.py | 6 ++---
tests/api/common/test_mark_tasks.py | 28 +++++++++++-----------
tests/api_connexion/endpoints/test_log_endpoint.py | 2 +-
.../endpoints/test_task_instance_endpoint.py | 4 ++--
.../api_connexion/endpoints/test_xcom_endpoint.py | 2 +-
.../schemas/test_task_instance_schema.py | 2 +-
tests/api_connexion/schemas/test_xcom_schema.py | 2 +-
tests/models/test_dag.py | 2 +-
tests/models/test_xcom.py | 8 +++----
tests/providers/apache/hive/operators/test_hive.py | 4 ++--
tests/ti_deps/deps/test_dagrun_id_dep.py | 16 ++++++-------
21 files changed, 86 insertions(+), 86 deletions(-)
diff --git a/airflow/api/common/mark_tasks.py b/airflow/api/common/mark_tasks.py
index 3eeb15b..a280d0f 100644
--- a/airflow/api/common/mark_tasks.py
+++ b/airflow/api/common/mark_tasks.py
@@ -80,7 +80,7 @@ def _create_dagruns(
def set_state(
*,
tasks: Iterable[Operator],
- dag_run_id: Optional[str] = None,
+ run_id: Optional[str] = None,
execution_date: Optional[datetime] = None,
upstream: bool = False,
downstream: bool = False,
@@ -98,7 +98,7 @@ def set_state(
on the schedule (but it will as for subdag dag runs if needed).
:param tasks: the iterable of tasks from which to work. task.task.dag
needs to be set
- :param dag_run_id: the run_id of the dagrun to start looking from
+ :param run_id: the run_id of the dagrun to start looking from
:param execution_date: the execution date from which to start
looking(deprecated)
:param upstream: Mark all parents (upstream tasks)
:param downstream: Mark all siblings (downstream tasks) of task_id,
including SubDags
@@ -113,7 +113,7 @@ def set_state(
if not tasks:
return []
- if not exactly_one(execution_date, dag_run_id):
+ if not exactly_one(execution_date, run_id):
raise ValueError("Exactly one of dag_run_id and execution_date must be
set")
if execution_date and not timezone.is_localized(execution_date):
@@ -127,11 +127,11 @@ def set_state(
raise ValueError("Received tasks with no DAG")
if execution_date:
- dag_run_id = dag.get_dagrun(execution_date=execution_date).run_id
- if not dag_run_id:
- raise ValueError("Received tasks with no dag_run_id")
+ run_id = dag.get_dagrun(execution_date=execution_date).run_id
+ if not run_id:
+ raise ValueError("Received tasks with no run_id")
- dag_run_ids = get_run_ids(dag, dag_run_id, future, past)
+ dag_run_ids = get_run_ids(dag, run_id, future, past)
task_ids = list(find_task_relatives(tasks, downstream, upstream))
@@ -344,16 +344,16 @@ def get_run_ids(dag: DAG, run_id: str, future: bool,
past: bool, session: SASess
return run_ids
-def _set_dag_run_state(dag_id: str, dag_run_id: str, state: DagRunState,
session: SASession = NEW_SESSION):
+def _set_dag_run_state(dag_id: str, run_id: str, state: DagRunState, session:
SASession = NEW_SESSION):
"""
Helper method that set dag run state in the DB.
:param dag_id: dag_id of target dag run
- :param dag_run_id: dag run id of target dag run
+ :param run_id: run id of target dag run
:param state: target state
:param session: database session
"""
- dag_run = session.query(DagRun).filter(DagRun.dag_id == dag_id,
DagRun.run_id == dag_run_id).one()
+ dag_run = session.query(DagRun).filter(DagRun.dag_id == dag_id,
DagRun.run_id == run_id).one()
dag_run.state = state
if state == State.RUNNING:
dag_run.start_date = timezone.utcnow()
@@ -407,7 +407,7 @@ def set_dag_run_state_to_success(
# Mark all task instances of the dag run to success.
for task in dag.tasks:
task.dag = dag
- return set_state(tasks=dag.tasks, dag_run_id=run_id, state=State.SUCCESS,
commit=commit, session=session)
+ return set_state(tasks=dag.tasks, run_id=run_id, state=State.SUCCESS,
commit=commit, session=session)
@provide_session
@@ -469,7 +469,7 @@ def set_dag_run_state_to_failed(
task.dag = dag
tasks.append(task)
- return set_state(tasks=tasks, dag_run_id=run_id, state=State.FAILED,
commit=commit, session=session)
+ return set_state(tasks=tasks, run_id=run_id, state=State.FAILED,
commit=commit, session=session)
@provide_session
diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py
b/airflow/api_connexion/endpoints/task_instance_endpoint.py
index af1b4e3..7611bbb 100644
--- a/airflow/api_connexion/endpoints/task_instance_endpoint.py
+++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py
@@ -357,7 +357,7 @@ def post_set_task_instances_state(*, dag_id: str, session:
Session = NEW_SESSION
tis = dag.set_task_instance_state(
task_id=task_id,
- dag_run_id=run_id,
+ run_id=run_id,
execution_date=execution_date,
state=data["new_state"],
upstream=data["include_upstream"],
diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index e74edec..f621052 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -173,7 +173,7 @@ def string_list_type(val):
ARG_DAG_ID = Arg(("dag_id",), help="The id of the dag")
ARG_TASK_ID = Arg(("task_id",), help="The id of the task")
ARG_EXECUTION_DATE = Arg(("execution_date",), help="The execution date of the
DAG", type=parsedate)
-ARG_EXECUTION_DATE_OR_DAGRUN_ID = Arg(
+ARG_EXECUTION_DATE_OR_RUN_ID = Arg(
('execution_date_or_run_id',), help="The execution_date of the DAG or
run_id of the DAGRun"
)
ARG_TASK_REGEX = Arg(
@@ -1174,7 +1174,7 @@ TASKS_COMMANDS = (
args=(
ARG_DAG_ID,
ARG_TASK_ID,
- ARG_EXECUTION_DATE_OR_DAGRUN_ID,
+ ARG_EXECUTION_DATE_OR_RUN_ID,
ARG_SUBDIR,
ARG_VERBOSE,
ARG_MAP_INDEX,
@@ -1189,7 +1189,7 @@ TASKS_COMMANDS = (
"and then run by an executor."
),
func=lazy_load_command('airflow.cli.commands.task_command.task_failed_deps'),
- args=(ARG_DAG_ID, ARG_TASK_ID, ARG_EXECUTION_DATE_OR_DAGRUN_ID,
ARG_SUBDIR, ARG_MAP_INDEX),
+ args=(ARG_DAG_ID, ARG_TASK_ID, ARG_EXECUTION_DATE_OR_RUN_ID,
ARG_SUBDIR, ARG_MAP_INDEX),
),
ActionCommand(
name='render',
@@ -1198,7 +1198,7 @@ TASKS_COMMANDS = (
args=(
ARG_DAG_ID,
ARG_TASK_ID,
- ARG_EXECUTION_DATE_OR_DAGRUN_ID,
+ ARG_EXECUTION_DATE_OR_RUN_ID,
ARG_SUBDIR,
ARG_VERBOSE,
ARG_MAP_INDEX,
@@ -1211,7 +1211,7 @@ TASKS_COMMANDS = (
args=(
ARG_DAG_ID,
ARG_TASK_ID,
- ARG_EXECUTION_DATE_OR_DAGRUN_ID,
+ ARG_EXECUTION_DATE_OR_RUN_ID,
ARG_SUBDIR,
ARG_MARK_SUCCESS,
ARG_FORCE,
@@ -1242,7 +1242,7 @@ TASKS_COMMANDS = (
args=(
ARG_DAG_ID,
ARG_TASK_ID,
- ARG_EXECUTION_DATE_OR_DAGRUN_ID,
+ ARG_EXECUTION_DATE_OR_RUN_ID,
ARG_SUBDIR,
ARG_DRY_RUN,
ARG_TASK_PARAMS,
@@ -1255,7 +1255,7 @@ TASKS_COMMANDS = (
name='states-for-dag-run',
help="Get the status of all task instances in a dag run",
func=lazy_load_command('airflow.cli.commands.task_command.task_states_for_dag_run'),
- args=(ARG_DAG_ID, ARG_EXECUTION_DATE_OR_DAGRUN_ID, ARG_OUTPUT,
ARG_VERBOSE),
+ args=(ARG_DAG_ID, ARG_EXECUTION_DATE_OR_RUN_ID, ARG_OUTPUT,
ARG_VERBOSE),
),
)
POOLS_COMMANDS = (
diff --git a/airflow/cli/commands/task_command.py
b/airflow/cli/commands/task_command.py
index 11bbc9b..3c82740 100644
--- a/airflow/cli/commands/task_command.py
+++ b/airflow/cli/commands/task_command.py
@@ -39,7 +39,7 @@ from airflow.models import DagPickle, TaskInstance
from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import DAG
from airflow.models.dagrun import DagRun
-from airflow.models.xcom import IN_MEMORY_DAGRUN_ID
+from airflow.models.xcom import IN_MEMORY_RUN_ID
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS
from airflow.utils import cli as cli_utils
@@ -98,7 +98,7 @@ def _get_dag_run(
) from None
if execution_date is not None:
- return DagRun(dag.dag_id, run_id=IN_MEMORY_DAGRUN_ID,
execution_date=execution_date)
+ return DagRun(dag.dag_id, run_id=IN_MEMORY_RUN_ID,
execution_date=execution_date)
return DagRun(dag.dag_id, run_id=exec_date_or_run_id,
execution_date=timezone.utcnow())
diff --git
a/airflow/migrations/versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py
b/airflow/migrations/versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py
index eee0bd2..924e15c 100644
--- a/airflow/migrations/versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py
+++ b/airflow/migrations/versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py
@@ -158,7 +158,7 @@ def upgrade():
mssql_where=sa.text("state='queued'"),
)
else:
- # Make sure DagRun id columns are non-nullable
+ # Make sure DagRun PK columns are non-nullable
with op.batch_alter_table('dag_run', schema=None) as batch_op:
batch_op.alter_column('dag_id', existing_type=string_id_col_type,
nullable=False)
batch_op.alter_column('execution_date', existing_type=dt_type,
nullable=False)
diff --git
a/airflow/migrations/versions/c306b5b5ae4a_switch_xcom_table_to_use_run_id.py
b/airflow/migrations/versions/c306b5b5ae4a_switch_xcom_table_to_use_run_id.py
index 4e38062..1167573 100644
---
a/airflow/migrations/versions/c306b5b5ae4a_switch_xcom_table_to_use_run_id.py
+++
b/airflow/migrations/versions/c306b5b5ae4a_switch_xcom_table_to_use_run_id.py
@@ -42,7 +42,7 @@ metadata = MetaData()
def _get_new_xcom_columns() -> Sequence[Column]:
return [
- Column("dagrun_id", Integer(), nullable=False),
+ Column("dag_run_id", Integer(), nullable=False),
Column("task_id", StringID(), nullable=False),
Column("key", StringID(length=512), nullable=False),
Column("value", LargeBinary),
@@ -117,7 +117,7 @@ def upgrade():
op.rename_table("__airflow_tmp_xcom", "xcom")
with op.batch_alter_table("xcom") as batch_op:
- batch_op.create_primary_key("xcom_pkey", ["dagrun_id", "task_id",
"key"])
+ batch_op.create_primary_key("xcom_pkey", ["dag_run_id", "task_id",
"key"])
batch_op.create_index("idx_xcom_key", ["key"])
batch_op.create_index("idx_xcom_ti_id", ["dag_id", "task_id",
"run_id"])
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 95f2be6..23eb0ce 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1613,7 +1613,7 @@ class DAG(LoggingMixin):
*,
task_id: str,
execution_date: Optional[datetime] = None,
- dag_run_id: Optional[str] = None,
+ run_id: Optional[str] = None,
state: TaskInstanceState,
upstream: bool = False,
downstream: bool = False,
@@ -1628,7 +1628,7 @@ class DAG(LoggingMixin):
:param task_id: Task ID of the TaskInstance
:param execution_date: Execution date of the TaskInstance
- :param dag_run_id: The run_id of the TaskInstance
+ :param run_id: The run_id of the TaskInstance
:param state: State to set the TaskInstance to
:param upstream: Include all upstream tasks of the given task_id
:param downstream: Include all downstream tasks of the given task_id
@@ -1638,12 +1638,12 @@ class DAG(LoggingMixin):
"""
from airflow.api.common.mark_tasks import set_state
- if not exactly_one(execution_date, dag_run_id):
- raise ValueError("Exactly one of execution_date or dag_run_id must
be provided")
+ if not exactly_one(execution_date, run_id):
+ raise ValueError("Exactly one of execution_date or run_id must be
provided")
if execution_date is None:
dag_run = (
- session.query(DagRun).filter(DagRun.run_id == dag_run_id,
DagRun.dag_id == self.dag_id).one()
+ session.query(DagRun).filter(DagRun.run_id == run_id,
DagRun.dag_id == self.dag_id).one()
) # Raises an error if not found
resolve_execution_date = dag_run.execution_date
else:
@@ -1655,7 +1655,7 @@ class DAG(LoggingMixin):
altered = set_state(
tasks=[task],
execution_date=execution_date,
- dag_run_id=dag_run_id,
+ run_id=run_id,
upstream=upstream,
downstream=downstream,
future=future,
diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py
index 3f58c1d..32190d2 100644
--- a/airflow/models/xcom.py
+++ b/airflow/models/xcom.py
@@ -48,7 +48,7 @@ XCOM_RETURN_KEY = 'return_value'
# Stand-in value for 'airflow task test' generating a temporary in-memory DAG
# run without storing it in the database.
-IN_MEMORY_DAGRUN_ID = "__airflow_in_memory_dagrun__"
+IN_MEMORY_RUN_ID = "__airflow_in_memory_dagrun__"
if TYPE_CHECKING:
from airflow.models.taskinstance import TaskInstanceKey
@@ -59,7 +59,7 @@ class BaseXCom(Base, LoggingMixin):
__tablename__ = "xcom"
- dagrun_id = Column(Integer(), nullable=False, primary_key=True)
+ dag_run_id = Column(Integer(), nullable=False, primary_key=True)
task_id = Column(String(ID_LEN, **COLLATION_ARGS), nullable=False,
primary_key=True)
key = Column(String(512, **COLLATION_ARGS), nullable=False,
primary_key=True)
@@ -163,18 +163,18 @@ class BaseXCom(Base, LoggingMixin):
message = "Passing 'execution_date' to 'XCom.set()' is deprecated.
Use 'run_id' instead."
warnings.warn(message, DeprecationWarning, stacklevel=3)
try:
- dagrun_id, run_id = (
+ dag_run_id, run_id = (
session.query(DagRun.id, DagRun.run_id)
.filter(DagRun.dag_id == dag_id, DagRun.execution_date ==
execution_date)
.one()
)
except NoResultFound:
raise ValueError(f"DAG run not found on DAG {dag_id!r} at
{execution_date}") from None
- elif run_id == IN_MEMORY_DAGRUN_ID:
- dagrun_id = -1
+ elif run_id == IN_MEMORY_RUN_ID:
+ dag_run_id = -1
else:
- dagrun_id = session.query(DagRun.id).filter_by(dag_id=dag_id,
run_id=run_id).scalar()
- if dagrun_id is None:
+ dag_run_id = session.query(DagRun.id).filter_by(dag_id=dag_id,
run_id=run_id).scalar()
+ if dag_run_id is None:
raise ValueError(f"DAG run not found on DAG {dag_id!r} with ID
{run_id!r}")
value = cls.serialize_value(
@@ -182,7 +182,7 @@ class BaseXCom(Base, LoggingMixin):
key=key,
task_id=task_id,
dag_id=dag_id,
- run_id=dagrun_id,
+ run_id=run_id,
)
# Remove duplicate XComs and insert a new one.
@@ -193,7 +193,7 @@ class BaseXCom(Base, LoggingMixin):
cls.dag_id == dag_id,
).delete()
new = cast(Any, cls)( # Work around Mypy complaining model not
defining '__init__'.
- dagrun_id=dagrun_id,
+ dag_run_id=dag_run_id,
key=key,
value=value,
run_id=run_id,
@@ -410,7 +410,7 @@ class BaseXCom(Base, LoggingMixin):
if execution_date is not None:
query = query.filter(DagRun.execution_date <= execution_date)
else:
- # This returns an empty query result for IN_MEMORY_DAGRUN_ID,
+ # This returns an empty query result for IN_MEMORY_RUN_ID,
# but that is impossible to implement. Sorry?
dr = session.query(DagRun.execution_date).filter(DagRun.run_id
== run_id).subquery()
query = query.filter(cls.execution_date <= dr.c.execution_date)
diff --git a/airflow/ti_deps/dependencies_deps.py
b/airflow/ti_deps/dependencies_deps.py
index 7062995..cfacb11 100644
--- a/airflow/ti_deps/dependencies_deps.py
+++ b/airflow/ti_deps/dependencies_deps.py
@@ -23,8 +23,8 @@ from airflow.ti_deps.dependencies_states import (
)
from airflow.ti_deps.deps.dag_ti_slots_available_dep import
DagTISlotsAvailableDep
from airflow.ti_deps.deps.dag_unpaused_dep import DagUnpausedDep
+from airflow.ti_deps.deps.dagrun_backfill_dep import DagRunNotBackfillDep
from airflow.ti_deps.deps.dagrun_exists_dep import DagrunRunningDep
-from airflow.ti_deps.deps.dagrun_id_dep import DagrunIdDep
from airflow.ti_deps.deps.exec_date_after_start_date_dep import
ExecDateAfterStartDateDep
from airflow.ti_deps.deps.pool_slots_available_dep import PoolSlotsAvailableDep
from airflow.ti_deps.deps.runnable_exec_date_dep import RunnableExecDateDep
@@ -85,7 +85,7 @@ SCHEDULER_QUEUED_DEPS = {
TaskConcurrencyDep(),
PoolSlotsAvailableDep(),
DagrunRunningDep(),
- DagrunIdDep(),
+ DagRunNotBackfillDep(),
DagUnpausedDep(),
ExecDateAfterStartDateDep(),
TaskNotRunningDep(),
diff --git a/airflow/ti_deps/deps/dagrun_id_dep.py
b/airflow/ti_deps/deps/dagrun_backfill_dep.py
similarity index 84%
rename from airflow/ti_deps/deps/dagrun_id_dep.py
rename to airflow/ti_deps/deps/dagrun_backfill_dep.py
index 84be503..949af43 100644
--- a/airflow/ti_deps/deps/dagrun_id_dep.py
+++ b/airflow/ti_deps/deps/dagrun_backfill_dep.py
@@ -16,17 +16,17 @@
# specific language governing permissions and limitations
# under the License.
-"""This module defines dep for DagRun ID validation"""
+"""This module defines dep for making sure DagRun not a backfill."""
from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
from airflow.utils.session import provide_session
from airflow.utils.types import DagRunType
-class DagrunIdDep(BaseTIDep):
- """Dep for valid DagRun ID to schedule from scheduler"""
+class DagRunNotBackfillDep(BaseTIDep):
+ """Dep for valid DagRun run_id to schedule from scheduler"""
- NAME = "Dagrun run_id is not backfill job ID"
+ NAME = "DagRun is not backfill job"
IGNORABLE = True
@provide_session
@@ -37,7 +37,7 @@ class DagrunIdDep(BaseTIDep):
:param ti: the task instance to get the dependency status for
:param session: database session
:param dep_context: the context for which this dependency should be
evaluated for
- :return: True if DagRun ID is valid for scheduling from scheduler.
+ :return: True if DagRun is valid for scheduling from scheduler.
"""
dagrun = ti.get_dagrun(session)
diff --git a/airflow/www/views.py b/airflow/www/views.py
index f913838..2c3382c 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -2221,7 +2221,7 @@ class Airflow(AirflowBaseView):
altered = dag.set_task_instance_state(
task_id=task_id,
- dag_run_id=dag_run_id,
+ run_id=dag_run_id,
state=state,
upstream=upstream,
downstream=downstream,
@@ -2283,7 +2283,7 @@ class Airflow(AirflowBaseView):
to_be_altered = set_state(
tasks=[task],
- dag_run_id=dag_run_id,
+ run_id=dag_run_id,
upstream=upstream,
downstream=downstream,
future=future,
@@ -3596,7 +3596,7 @@ class XComModelView(AirflowModelView):
search_columns = ['key', 'value', 'timestamp', 'dag_id', 'task_id',
'run_id']
list_columns = ['key', 'value', 'timestamp', 'dag_id', 'task_id', 'run_id']
- base_order = ('dagrun_id', 'desc')
+ base_order = ('dag_run_id', 'desc')
base_filters = [['dag_id', DagFilter, lambda: []]]
diff --git a/tests/api/common/test_mark_tasks.py
b/tests/api/common/test_mark_tasks.py
index fd14a77..7662eec 100644
--- a/tests/api/common/test_mark_tasks.py
+++ b/tests/api/common/test_mark_tasks.py
@@ -153,7 +153,7 @@ class TestMarkTasks:
dr = DagRun.find(dag_id=self.dag1.dag_id,
execution_date=self.execution_dates[0])[0]
altered = set_state(
tasks=[task],
- dag_run_id=dr.run_id,
+ run_id=dr.run_id,
upstream=False,
downstream=False,
future=False,
@@ -167,7 +167,7 @@ class TestMarkTasks:
# set one and only one task to success
altered = set_state(
tasks=[task],
- dag_run_id=dr.run_id,
+ run_id=dr.run_id,
upstream=False,
downstream=False,
future=False,
@@ -181,7 +181,7 @@ class TestMarkTasks:
# set no tasks
altered = set_state(
tasks=[task],
- dag_run_id=dr.run_id,
+ run_id=dr.run_id,
upstream=False,
downstream=False,
future=False,
@@ -195,7 +195,7 @@ class TestMarkTasks:
# set task to other than success
altered = set_state(
tasks=[task],
- dag_run_id=dr.run_id,
+ run_id=dr.run_id,
upstream=False,
downstream=False,
future=False,
@@ -211,7 +211,7 @@ class TestMarkTasks:
task = self.dag1.get_task("runme_0")
altered = set_state(
tasks=[task],
- dag_run_id=dr.run_id,
+ run_id=dr.run_id,
upstream=False,
downstream=False,
future=False,
@@ -228,7 +228,7 @@ class TestMarkTasks:
dr = DagRun.find(dag_id=self.dag3.dag_id,
execution_date=self.dag3_execution_dates[1])[0]
altered = set_state(
tasks=[task],
- dag_run_id=dr.run_id,
+ run_id=dr.run_id,
upstream=False,
downstream=False,
future=False,
@@ -255,7 +255,7 @@ class TestMarkTasks:
altered = set_state(
tasks=[task],
- dag_run_id=dr.run_id,
+ run_id=dr.run_id,
upstream=False,
downstream=True,
future=False,
@@ -277,7 +277,7 @@ class TestMarkTasks:
altered = set_state(
tasks=[task],
- dag_run_id=dr.run_id,
+ run_id=dr.run_id,
upstream=True,
downstream=False,
future=False,
@@ -295,7 +295,7 @@ class TestMarkTasks:
dr = DagRun.find(dag_id=self.dag1.dag_id,
execution_date=self.execution_dates[0])[0]
altered = set_state(
tasks=[task],
- dag_run_id=dr.run_id,
+ run_id=dr.run_id,
upstream=False,
downstream=False,
future=True,
@@ -311,7 +311,7 @@ class TestMarkTasks:
dr = DagRun.find(dag_id=self.dag3.dag_id,
execution_date=self.dag3_execution_dates[1])[0]
altered = set_state(
tasks=[task],
- dag_run_id=dr.run_id,
+ run_id=dr.run_id,
upstream=False,
downstream=False,
future=True,
@@ -330,7 +330,7 @@ class TestMarkTasks:
dr = DagRun.find(dag_id=self.dag1.dag_id,
execution_date=self.execution_dates[1])[0]
altered = set_state(
tasks=[task],
- dag_run_id=dr.run_id,
+ run_id=dr.run_id,
upstream=False,
downstream=False,
future=False,
@@ -346,7 +346,7 @@ class TestMarkTasks:
dr = DagRun.find(dag_id=self.dag3.dag_id,
execution_date=self.dag3_execution_dates[1])[0]
altered = set_state(
tasks=[task],
- dag_run_id=dr.run_id,
+ run_id=dr.run_id,
upstream=False,
downstream=False,
future=False,
@@ -365,7 +365,7 @@ class TestMarkTasks:
dr = DagRun.find(dag_id=self.dag1.dag_id,
execution_date=self.execution_dates[0])[0]
altered = set_state(
tasks=tasks,
- dag_run_id=dr.run_id,
+ run_id=dr.run_id,
upstream=False,
downstream=False,
future=False,
@@ -392,7 +392,7 @@ class TestMarkTasks:
altered = set_state(
tasks=[task],
- dag_run_id=dr.run_id,
+ run_id=dr.run_id,
upstream=False,
downstream=True,
future=False,
diff --git a/tests/api_connexion/endpoints/test_log_endpoint.py
b/tests/api_connexion/endpoints/test_log_endpoint.py
index f5d2e5d..3b64387 100644
--- a/tests/api_connexion/endpoints/test_log_endpoint.py
+++ b/tests/api_connexion/endpoints/test_log_endpoint.py
@@ -266,7 +266,7 @@ class TestGetLog:
def test_raises_404_for_invalid_dag_run_id(self):
response = self.client.get(
- f"api/v1/dags/{self.DAG_ID}/dagRuns/NO_DAG_RUN/" # invalid
dagrun_id
+ f"api/v1/dags/{self.DAG_ID}/dagRuns/NO_DAG_RUN/" # invalid run_id
f"taskInstances/{self.TASK_ID}/logs/1?",
headers={'Accept': 'application/json'},
environ_overrides={'REMOTE_USER': "test"},
diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py
b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
index 8f5676e..b4353f8 100644
--- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py
+++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
@@ -1126,7 +1126,7 @@ class
TestPostSetTaskInstanceState(TestTaskInstanceEndpoint):
mock_set_task_instance_state.assert_called_once_with(
commit=False,
downstream=True,
- dag_run_id=None,
+ run_id=None,
execution_date=DEFAULT_DATETIME_1,
future=True,
past=True,
@@ -1175,7 +1175,7 @@ class
TestPostSetTaskInstanceState(TestTaskInstanceEndpoint):
mock_set_task_instance_state.assert_called_once_with(
commit=False,
downstream=True,
- dag_run_id=run_id,
+ run_id=run_id,
execution_date=None,
future=True,
past=True,
diff --git a/tests/api_connexion/endpoints/test_xcom_endpoint.py
b/tests/api_connexion/endpoints/test_xcom_endpoint.py
index 0a04371..931325d 100644
--- a/tests/api_connexion/endpoints/test_xcom_endpoint.py
+++ b/tests/api_connexion/endpoints/test_xcom_endpoint.py
@@ -443,7 +443,7 @@ class TestPaginationGetXComEntries(TestXComEndpoint):
with create_session() as session:
for i in range(1, 11):
xcom = XCom(
- dagrun_id=dagrun.id,
+ dag_run_id=dagrun.id,
key=f"TEST_XCOM_KEY{i}",
value=b"null",
run_id=self.dag_run_id,
diff --git a/tests/api_connexion/schemas/test_task_instance_schema.py
b/tests/api_connexion/schemas/test_task_instance_schema.py
index 69111ff..4aeea63 100644
--- a/tests/api_connexion/schemas/test_task_instance_schema.py
+++ b/tests/api_connexion/schemas/test_task_instance_schema.py
@@ -214,7 +214,7 @@ class TestSetTaskInstanceStateFormSchema:
({"include_future": "foo"},),
({"execution_date": "NOW"},),
({"new_state": "INVALID_STATE"},),
- ({"execution_date": "2020-01-01T00:00:00+00:00", "dag_run_id":
"dagrun_id"},),
+ ({"execution_date": "2020-01-01T00:00:00+00:00", "dag_run_id":
"some-run-id"},),
]
)
def test_validation_error(self, override_data):
diff --git a/tests/api_connexion/schemas/test_xcom_schema.py
b/tests/api_connexion/schemas/test_xcom_schema.py
index 84b2663..a9fd275 100644
--- a/tests/api_connexion/schemas/test_xcom_schema.py
+++ b/tests/api_connexion/schemas/test_xcom_schema.py
@@ -48,7 +48,7 @@ def create_xcom(create_task_instance, session):
)
run: DagRun = ti.dag_run
xcom = XCom(
- dagrun_id=run.id,
+ dag_run_id=run.id,
task_id=ti.task_id,
key=key,
value=value,
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 2204a06..cbda0ef 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -2260,7 +2260,7 @@ def test_set_task_instance_state(run_id, execution_date,
session, dag_maker):
altered = dag.set_task_instance_state(
task_id=task_1.task_id,
- dag_run_id=run_id,
+ run_id=run_id,
execution_date=execution_date,
state=State.SUCCESS,
session=session,
diff --git a/tests/models/test_xcom.py b/tests/models/test_xcom.py
index 733b2d4..5dd979f 100644
--- a/tests/models/test_xcom.py
+++ b/tests/models/test_xcom.py
@@ -25,7 +25,7 @@ import pytest
from airflow.configuration import conf
from airflow.models.dagrun import DagRun, DagRunType
from airflow.models.taskinstance import TaskInstanceKey
-from airflow.models.xcom import IN_MEMORY_DAGRUN_ID, XCOM_RETURN_KEY,
BaseXCom, XCom, resolve_xcom_backend
+from airflow.models.xcom import IN_MEMORY_RUN_ID, XCOM_RETURN_KEY, BaseXCom,
XCom, resolve_xcom_backend
from airflow.settings import json
from airflow.utils import timezone
from airflow.utils.session import create_session
@@ -202,7 +202,7 @@ class TestXCom:
key=XCOM_RETURN_KEY,
dag_id="test_dag",
task_id="test_task",
- run_id=IN_MEMORY_DAGRUN_ID,
+ run_id=IN_MEMORY_RUN_ID,
)
XCom = resolve_xcom_backend()
@@ -244,9 +244,9 @@ class TestXCom:
key=XCOM_RETURN_KEY,
dag_id="test_dag",
task_id="test_task",
- run_id=IN_MEMORY_DAGRUN_ID,
+ run_id=IN_MEMORY_RUN_ID,
)
- expected = {**kwargs, 'run_id': -1}
+ expected = {**kwargs, 'run_id': '__airflow_in_memory_dagrun__'}
XCom = resolve_xcom_backend()
XCom.set(**kwargs)
serialize_watcher.assert_called_once_with(**expected)
diff --git a/tests/providers/apache/hive/operators/test_hive.py
b/tests/providers/apache/hive/operators/test_hive.py
index 8a167da..97027a1 100644
--- a/tests/providers/apache/hive/operators/test_hive.py
+++ b/tests/providers/apache/hive/operators/test_hive.py
@@ -79,10 +79,10 @@ class HiveOperatorTest(TestHiveEnvironment):
mock_get_hook.return_value = mock_hook
op = HiveOperator(task_id='test_mapred_job_name', hql=self.hql,
dag=self.dag)
- fake_dagrun_id = "test_mapred_job_name"
+ fake_run_id = "test_mapred_job_name"
fake_execution_date = timezone.datetime(2018, 6, 19)
fake_ti = TaskInstance(task=op)
- fake_ti.dag_run = DagRun(run_id=fake_dagrun_id,
execution_date=fake_execution_date)
+ fake_ti.dag_run = DagRun(run_id=fake_run_id,
execution_date=fake_execution_date)
fake_ti.hostname = 'fake_hostname'
fake_context = {'ti': fake_ti}
diff --git a/tests/ti_deps/deps/test_dagrun_id_dep.py
b/tests/ti_deps/deps/test_dagrun_id_dep.py
index e416dd5..8bb4ad0 100644
--- a/tests/ti_deps/deps/test_dagrun_id_dep.py
+++ b/tests/ti_deps/deps/test_dagrun_id_dep.py
@@ -21,31 +21,31 @@ import unittest
from unittest.mock import Mock
from airflow.models.dagrun import DagRun
-from airflow.ti_deps.deps.dagrun_id_dep import DagrunIdDep
+from airflow.ti_deps.deps.dagrun_backfill_dep import DagRunNotBackfillDep
from airflow.utils.types import DagRunType
class TestDagrunRunningDep(unittest.TestCase):
- def test_dagrun_id_is_backfill(self):
+ def test_run_id_is_backfill(self):
"""
- Task instances whose dagrun ID is a backfill dagrun ID should fail
this dep.
+ Task instances whose run_id is a backfill dagrun run_id should fail
this dep.
"""
dagrun = DagRun()
dagrun.run_id = "anything"
dagrun.run_type = DagRunType.BACKFILL_JOB
ti = Mock(get_dagrun=Mock(return_value=dagrun))
- assert not DagrunIdDep().is_met(ti=ti)
+ assert not DagRunNotBackfillDep().is_met(ti=ti)
- def test_dagrun_id_is_not_backfill(self):
+ def test_run_id_is_not_backfill(self):
"""
- Task instances whose dagrun ID is not a backfill dagrun ID should pass
this dep.
+ Task instances whose run_id is not a backfill run_id should pass this
dep.
"""
dagrun = DagRun()
dagrun.run_type = 'custom_type'
ti = Mock(get_dagrun=Mock(return_value=dagrun))
- assert DagrunIdDep().is_met(ti=ti)
+ assert DagRunNotBackfillDep().is_met(ti=ti)
dagrun = DagRun()
dagrun.run_id = None
ti = Mock(get_dagrun=Mock(return_value=dagrun))
- assert DagrunIdDep().is_met(ti=ti)
+ assert DagRunNotBackfillDep().is_met(ti=ti)