Lee-W commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1832281554
##########
airflow/models/dag.py:
##########
@@ -1337,42 +1322,35 @@ def set_task_group_state(
"""
from airflow.api.common.mark_tasks import set_state
- if not exactly_one(execution_date, run_id):
- raise ValueError("Exactly one of execution_date or run_id must be
provided")
-
tasks_to_set_state: list[BaseOperator | tuple[BaseOperator, int]] = []
task_ids: list[str] = []
- if execution_date is None:
- dag_run = session.scalars(
- select(DagRun).where(DagRun.run_id == run_id, DagRun.dag_id ==
self.dag_id)
- ).one() # Raises an error if not found
- resolve_execution_date = dag_run.execution_date
- else:
- resolve_execution_date = execution_date
-
- end_date = resolve_execution_date if not future else None
- start_date = resolve_execution_date if not past else None
-
task_group_dict = self.task_group.get_task_group_dict()
task_group = task_group_dict.get(group_id)
if task_group is None:
raise ValueError("TaskGroup {group_id} could not be found")
tasks_to_set_state = [task for task in task_group.iter_tasks() if
isinstance(task, BaseOperator)]
task_ids = [task.task_id for task in task_group.iter_tasks()]
dag_runs_query = select(DagRun.id).where(DagRun.dag_id == self.dag_id)
- if start_date is None and end_date is None:
- dag_runs_query = dag_runs_query.where(DagRun.execution_date ==
start_date)
- else:
- if start_date is not None:
- dag_runs_query = dag_runs_query.where(DagRun.execution_date >=
start_date)
- if end_date is not None:
- dag_runs_query = dag_runs_query.where(DagRun.execution_date <=
end_date)
+
+ @cache
+ def get_logical_date() -> datetime:
+ stmt = select(DagRun.logical_date).where(DagRun.run_id == run_id,
DagRun.dag_id == self.dag_id)
+ return session.scalars(stmt).one() # Raises an error if not found
+
+ end_date = get_logical_date() if not future else None
+ start_date = get_logical_date() if not past else None
Review Comment:
```suggestion
end_date = None if future else get_logical_date()
start_date = None if past else get_logical_date()
```
##########
airflow/cli/commands/task_command.py:
##########
@@ -112,49 +147,48 @@ def _get_dag_run(
the logical date; otherwise use it as a run ID and set the logical date
to the current time.
"""
- if not exec_date_or_run_id and not create_if_necessary:
- raise ValueError("Must provide `exec_date_or_run_id` if not
`create_if_necessary`.")
- execution_date: pendulum.DateTime | None = None
- if exec_date_or_run_id:
- dag_run = DAG.fetch_dagrun(dag_id=dag.dag_id,
run_id=exec_date_or_run_id, session=session)
- if dag_run:
- return dag_run, False
- with suppress(ParserError, TypeError):
- execution_date = timezone.parse(exec_date_or_run_id)
- if execution_date:
- dag_run = DAG.fetch_dagrun(dag_id=dag.dag_id,
execution_date=execution_date, session=session)
- if dag_run:
+ if not logical_date_or_run_id and not create_if_necessary:
+ raise ValueError("Must provide `logical_date_or_run_id` if not
`create_if_necessary`.")
+
+ logical_date = None
+ if logical_date_or_run_id:
+ dag_run, logical_date =
_fetch_dag_run_from_run_id_or_logical_date_string(
+ dag_id=dag.dag_id,
+ value=logical_date_or_run_id,
+ session=session,
+ )
+ if dag_run is not None:
return dag_run, False
elif not create_if_necessary:
raise DagRunNotFound(
- f"DagRun for {dag.dag_id} with run_id or execution_date "
- f"of {exec_date_or_run_id!r} not found"
+ f"DagRun for {dag.dag_id} with run_id or logical_date "
+ f"of {logical_date_or_run_id!r} not found"
)
- if execution_date is not None:
- dag_run_execution_date = execution_date
+ if logical_date is not None:
+ dag_run_logical_date = logical_date
else:
- dag_run_execution_date = pendulum.instance(timezone.utcnow())
+ dag_run_logical_date = pendulum.instance(timezone.utcnow())
if create_if_necessary == "memory":
dag_run = DagRun(
dag_id=dag.dag_id,
- run_id=exec_date_or_run_id,
- execution_date=dag_run_execution_date,
-
data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_execution_date),
+ run_id=logical_date_or_run_id,
+ logical_date=dag_run_logical_date,
+
data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date),
triggered_by=DagRunTriggeredByType.CLI,
)
return dag_run, True
elif create_if_necessary == "db":
dag_run = dag.create_dagrun(
state=DagRunState.QUEUED,
- execution_date=dag_run_execution_date,
+ logical_date=dag_run_logical_date,
run_id=_generate_temporary_run_id(),
-
data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_execution_date),
+
data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date),
session=session,
triggered_by=DagRunTriggeredByType.CLI,
)
- return dag_run, True
+ return dag_run, True # type: ignore[return-value]
Review Comment:
May I know why do we need it here?
##########
airflow/models/dag.py:
##########
@@ -1700,8 +1664,8 @@ def create_dagrun(
self,
state: DagRunState,
*,
- triggered_by: DagRunTriggeredByType,
- execution_date: datetime | None = None,
+ triggered_by: DagRunTriggeredByType | None = None,
Review Comment:
Why is None allowed here now? Is there anything we change?
##########
airflow/models/dag.py:
##########
@@ -1419,24 +1399,8 @@ def clear(
session: Session = NEW_SESSION,
dag_bag: DagBag | None = None,
exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None =
frozenset(),
+ exclude_run_ids: frozenset[str] | None = frozenset(),
) -> int | Iterable[TaskInstance]:
- """
Review Comment:
Why removing the docstring
##########
providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -38,9 +38,14 @@
from kubernetes.dynamic import DynamicClient
from sqlalchemy import or_, select, update
+try:
+ from airflow.cli.cli_config import ARG_LOGICAL_DATE
+except ImportError: # 2.x compatibility.
+ from airflow.cli.cli_config import ( # type: ignore[attr-defined,
no-redef]
+ ARG_EXECUTION_DATE as ARG_LOGICAL_DATE,
+ )
Review Comment:
We probably could move it to common.compat provider?
##########
airflow/ti_deps/deps/exec_date_after_start_date_dep.py:
##########
@@ -24,23 +24,23 @@
class ExecDateAfterStartDateDep(BaseTIDep):
Review Comment:
Do need to rename this class
##########
airflow/cli/commands/task_command.py:
##########
@@ -91,19 +91,54 @@ def _generate_temporary_run_id() -> str:
return f"__airflow_temporary_run_{timezone.utcnow().isoformat()}__"
+def _fetch_dag_run_from_run_id_or_logical_date_string(
+ *,
+ dag_id: str,
+ value: str,
Review Comment:
`value` is quite vague. maybe we can add a line to describe what it's could
be?
##########
airflow/models/slamiss.py:
##########
@@ -14,25 +14,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
Review Comment:
What is this change?
##########
airflow/models/dag.py:
##########
@@ -1600,23 +1564,23 @@ def add_logger_if_needed(ti: TaskInstance):
exit_stack.callback(lambda: secrets_backend_list.pop(0))
with exit_stack:
- execution_date = execution_date or timezone.utcnow()
+ logical_date = logical_date or timezone.utcnow()
self.validate()
- self.log.debug("Clearing existing task instances for execution
date %s", execution_date)
+ self.log.debug("Clearing existing task instances for execution
date %s", logical_date)
Review Comment:
Should we use the name logical date in the doc and log? or should we just
keep it as execution date?
##########
airflow/models/dag.py:
##########
@@ -2439,24 +2403,24 @@ def _get_or_create_dagrun(
:param dag: DAG to be used to find run.
:param conf: Configuration to pass to newly created run.
:param start_date: Start date of new run.
- :param execution_date: Logical date for finding an existing run.
+ :param logical_date: Logical date for finding an existing run.
Review Comment:
Looks like we're changing the docstring here. We probably should change
execution date to logical date elsewhere?
##########
airflow/models/log.py:
##########
@@ -73,7 +73,7 @@ def __init__(
if task_instance:
self.dag_id = task_instance.dag_id
self.task_id = task_instance.task_id
- if execution_date := getattr(task_instance, "execution_date",
None):
+ if execution_date := getattr(task_instance, "logical_date", None):
self.execution_date = execution_date
Review Comment:
Do we need to change it as logical_date?
##########
airflow/www/views.py:
##########
@@ -221,16 +221,16 @@ def get_safe_url(url):
def get_date_time_num_runs_dag_runs_form_data(www_request, session, dag):
"""Get Execution Data, Base Date & Number of runs from a Request."""
- date_time = www_request.args.get("execution_date")
+ date_time = www_request.args.get("logical_date")
run_id = www_request.args.get("run_id")
- # First check run id, then check execution date, if not fall back on the
latest dagrun
+ # First check run id, then check logical_date date, if not fall back on
the latest dagrun
Review Comment:
```suggestion
# First check run id, then check logical_date, if not fall back on the
latest dagrun
```
##########
airflow/www/views.py:
##########
@@ -1654,10 +1666,10 @@ def get_logs_with_metadata(self, session: Session =
NEW_SESSION):
# Convert string datetime into actual datetime
try:
- execution_date = timezone.parse(execution_date_str, strict=True)
+ logical_date = timezone.parse(logical_date_str, strict=True)
except ValueError:
error_message = (
- f"Given execution date {execution_date_str!r} could not be
identified as a date. "
+ f"Given execution date {logical_date_str!r} could not be
identified as a date. "
Review Comment:
```suggestion
f"Given logical date {logical_date_str!r} could not be
identified as a date. "
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]