This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new fa8834cd67b include parameter to method doc in DAG, use from_db to use
airflow.models.DAG rather than sdk.definitions.DAG to use clear method (#48471)
fa8834cd67b is described below
commit fa8834cd67b459aca6174d53fe942b2a3846a8a1
Author: Bugra Ozturk <[email protected]>
AuthorDate: Fri Mar 28 15:38:09 2025 +0100
include parameter to method doc in DAG, use from_db to use
airflow.models.DAG rather than sdk.definitions.DAG to use clear method (#48471)
closes: #48430
Use from_db to use `airflow.models.DAG` (Returned as SerializedDAG) rather
than `sdk.definitions.DAG` to use `clear` method. If it read from other than
db, the methods returning `sdk.definitions.DAG`.
Local Run SS:

---
airflow-core/src/airflow/cli/commands/task_command.py | 4 ++--
airflow-core/src/airflow/models/dag.py | 1 +
airflow-core/src/airflow/utils/cli.py | 4 ++--
3 files changed, 5 insertions(+), 4 deletions(-)
diff --git a/airflow-core/src/airflow/cli/commands/task_command.py
b/airflow-core/src/airflow/cli/commands/task_command.py
index 32a6f4a2870..0cd092626bb 100644
--- a/airflow-core/src/airflow/cli/commands/task_command.py
+++ b/airflow-core/src/airflow/cli/commands/task_command.py
@@ -446,12 +446,12 @@ def task_render(args, dag: DAG | None = None) -> None:
def task_clear(args) -> None:
"""Clear all task instances or only those matched by regex for a DAG(s)."""
logging.basicConfig(level=settings.LOGGING_LEVEL,
format=settings.SIMPLE_LOG_FORMAT)
-
if args.dag_id and not args.subdir and not args.dag_regex and not
args.task_regex:
dags = [get_dag_by_file_location(args.dag_id)]
else:
# todo clear command only accepts a single dag_id. no reason for
get_dags with 's' except regex?
- dags = get_dags(args.subdir, args.dag_id, use_regex=args.dag_regex)
+ # Reading from_db because clear method still not implemented in Task
SDK DAG
+ dags = get_dags(args.subdir, args.dag_id, use_regex=args.dag_regex,
from_db=True)
if args.task_regex:
for idx, dag in enumerate(dags):
diff --git a/airflow-core/src/airflow/models/dag.py
b/airflow-core/src/airflow/models/dag.py
index 981c9469274..7261c48dee0 100644
--- a/airflow-core/src/airflow/models/dag.py
+++ b/airflow-core/src/airflow/models/dag.py
@@ -1465,6 +1465,7 @@ class DAG(TaskSDKDag, LoggingMixin):
Clear a set of task instances associated with the current dag for a
specified date range.
:param task_ids: List of task ids or (``task_id``, ``map_index``)
tuples to clear
+ :param run_id: The run_id for which the tasks should be cleared
:param start_date: The minimum logical_date to clear
:param end_date: The maximum logical_date to clear
:param only_failed: Only clear failed tasks
diff --git a/airflow-core/src/airflow/utils/cli.py
b/airflow-core/src/airflow/utils/cli.py
index bc74bb7a6ee..0815ec0ac07 100644
--- a/airflow-core/src/airflow/utils/cli.py
+++ b/airflow-core/src/airflow/utils/cli.py
@@ -289,12 +289,12 @@ def get_dag(subdir: str | None, dag_id: str, from_db:
bool = False) -> DAG:
return dag
-def get_dags(subdir: str | None, dag_id: str, use_regex: bool = False):
+def get_dags(subdir: str | None, dag_id: str, use_regex: bool = False,
from_db: bool = False):
"""Return DAG(s) matching a given regex or dag_id."""
from airflow.models import DagBag
if not use_regex:
- return [get_dag(subdir, dag_id)]
+ return [get_dag(subdir=subdir, dag_id=dag_id, from_db=from_db)]
dagbag = DagBag(process_subdir(subdir))
matched_dags = [dag for dag in dagbag.dags.values() if re.search(dag_id,
dag.dag_id)]
if not matched_dags: