Lee-W commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1832281554


##########
airflow/models/dag.py:
##########
@@ -1337,42 +1322,35 @@ def set_task_group_state(
         """
         from airflow.api.common.mark_tasks import set_state
 
-        if not exactly_one(execution_date, run_id):
-            raise ValueError("Exactly one of execution_date or run_id must be 
provided")
-
         tasks_to_set_state: list[BaseOperator | tuple[BaseOperator, int]] = []
         task_ids: list[str] = []
 
-        if execution_date is None:
-            dag_run = session.scalars(
-                select(DagRun).where(DagRun.run_id == run_id, DagRun.dag_id == 
self.dag_id)
-            ).one()  # Raises an error if not found
-            resolve_execution_date = dag_run.execution_date
-        else:
-            resolve_execution_date = execution_date
-
-        end_date = resolve_execution_date if not future else None
-        start_date = resolve_execution_date if not past else None
-
         task_group_dict = self.task_group.get_task_group_dict()
         task_group = task_group_dict.get(group_id)
         if task_group is None:
             raise ValueError("TaskGroup {group_id} could not be found")
         tasks_to_set_state = [task for task in task_group.iter_tasks() if 
isinstance(task, BaseOperator)]
         task_ids = [task.task_id for task in task_group.iter_tasks()]
         dag_runs_query = select(DagRun.id).where(DagRun.dag_id == self.dag_id)
-        if start_date is None and end_date is None:
-            dag_runs_query = dag_runs_query.where(DagRun.execution_date == 
start_date)
-        else:
-            if start_date is not None:
-                dag_runs_query = dag_runs_query.where(DagRun.execution_date >= 
start_date)
-            if end_date is not None:
-                dag_runs_query = dag_runs_query.where(DagRun.execution_date <= 
end_date)
+
+        @cache
+        def get_logical_date() -> datetime:
+            stmt = select(DagRun.logical_date).where(DagRun.run_id == run_id, 
DagRun.dag_id == self.dag_id)
+            return session.scalars(stmt).one()  # Raises an error if not found
+
+        end_date = get_logical_date() if not future else None
+        start_date = get_logical_date() if not past else None

Review Comment:
   ```suggestion
           end_date = None if future else get_logical_date()
           start_date = None if past else get_logical_date()
   ```



##########
airflow/cli/commands/task_command.py:
##########
@@ -112,49 +147,48 @@ def _get_dag_run(
        the logical date; otherwise use it as a run ID and set the logical date
        to the current time.
     """
-    if not exec_date_or_run_id and not create_if_necessary:
-        raise ValueError("Must provide `exec_date_or_run_id` if not 
`create_if_necessary`.")
-    execution_date: pendulum.DateTime | None = None
-    if exec_date_or_run_id:
-        dag_run = DAG.fetch_dagrun(dag_id=dag.dag_id, 
run_id=exec_date_or_run_id, session=session)
-        if dag_run:
-            return dag_run, False
-        with suppress(ParserError, TypeError):
-            execution_date = timezone.parse(exec_date_or_run_id)
-        if execution_date:
-            dag_run = DAG.fetch_dagrun(dag_id=dag.dag_id, 
execution_date=execution_date, session=session)
-        if dag_run:
+    if not logical_date_or_run_id and not create_if_necessary:
+        raise ValueError("Must provide `logical_date_or_run_id` if not 
`create_if_necessary`.")
+
+    logical_date = None
+    if logical_date_or_run_id:
+        dag_run, logical_date = 
_fetch_dag_run_from_run_id_or_logical_date_string(
+            dag_id=dag.dag_id,
+            value=logical_date_or_run_id,
+            session=session,
+        )
+        if dag_run is not None:
             return dag_run, False
         elif not create_if_necessary:
             raise DagRunNotFound(
-                f"DagRun for {dag.dag_id} with run_id or execution_date "
-                f"of {exec_date_or_run_id!r} not found"
+                f"DagRun for {dag.dag_id} with run_id or logical_date "
+                f"of {logical_date_or_run_id!r} not found"
             )
 
-    if execution_date is not None:
-        dag_run_execution_date = execution_date
+    if logical_date is not None:
+        dag_run_logical_date = logical_date
     else:
-        dag_run_execution_date = pendulum.instance(timezone.utcnow())
+        dag_run_logical_date = pendulum.instance(timezone.utcnow())
 
     if create_if_necessary == "memory":
         dag_run = DagRun(
             dag_id=dag.dag_id,
-            run_id=exec_date_or_run_id,
-            execution_date=dag_run_execution_date,
-            
data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_execution_date),
+            run_id=logical_date_or_run_id,
+            logical_date=dag_run_logical_date,
+            
data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date),
             triggered_by=DagRunTriggeredByType.CLI,
         )
         return dag_run, True
     elif create_if_necessary == "db":
         dag_run = dag.create_dagrun(
             state=DagRunState.QUEUED,
-            execution_date=dag_run_execution_date,
+            logical_date=dag_run_logical_date,
             run_id=_generate_temporary_run_id(),
-            
data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_execution_date),
+            
data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date),
             session=session,
             triggered_by=DagRunTriggeredByType.CLI,
         )
-        return dag_run, True
+        return dag_run, True  # type: ignore[return-value]

Review Comment:
   May I know why do we need it here?



##########
airflow/models/dag.py:
##########
@@ -1700,8 +1664,8 @@ def create_dagrun(
         self,
         state: DagRunState,
         *,
-        triggered_by: DagRunTriggeredByType,
-        execution_date: datetime | None = None,
+        triggered_by: DagRunTriggeredByType | None = None,

Review Comment:
   Why is None allowed here now? Is there anything we change?



##########
airflow/models/dag.py:
##########
@@ -1419,24 +1399,8 @@ def clear(
         session: Session = NEW_SESSION,
         dag_bag: DagBag | None = None,
         exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = 
frozenset(),
+        exclude_run_ids: frozenset[str] | None = frozenset(),
     ) -> int | Iterable[TaskInstance]:
-        """

Review Comment:
   Why removing the docstring



##########
providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -38,9 +38,14 @@
 from kubernetes.dynamic import DynamicClient
 from sqlalchemy import or_, select, update
 
+try:
+    from airflow.cli.cli_config import ARG_LOGICAL_DATE
+except ImportError:  # 2.x compatibility.
+    from airflow.cli.cli_config import (  # type: ignore[attr-defined, 
no-redef]
+        ARG_EXECUTION_DATE as ARG_LOGICAL_DATE,
+    )

Review Comment:
   We probably could move it to common.compat provider?



##########
airflow/ti_deps/deps/exec_date_after_start_date_dep.py:
##########
@@ -24,23 +24,23 @@
 class ExecDateAfterStartDateDep(BaseTIDep):

Review Comment:
   Do need to rename this class



##########
airflow/cli/commands/task_command.py:
##########
@@ -91,19 +91,54 @@ def _generate_temporary_run_id() -> str:
     return f"__airflow_temporary_run_{timezone.utcnow().isoformat()}__"
 
 
+def _fetch_dag_run_from_run_id_or_logical_date_string(
+    *,
+    dag_id: str,
+    value: str,

Review Comment:
   `value` is quite vague. maybe we can add a line to describe what it's could 
be?



##########
airflow/models/slamiss.py:
##########
@@ -14,25 +14,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-

Review Comment:
   What is this change?



##########
airflow/models/dag.py:
##########
@@ -1600,23 +1564,23 @@ def add_logger_if_needed(ti: TaskInstance):
             exit_stack.callback(lambda: secrets_backend_list.pop(0))
 
         with exit_stack:
-            execution_date = execution_date or timezone.utcnow()
+            logical_date = logical_date or timezone.utcnow()
             self.validate()
-            self.log.debug("Clearing existing task instances for execution 
date %s", execution_date)
+            self.log.debug("Clearing existing task instances for execution 
date %s", logical_date)

Review Comment:
   Should we use the name logical date in the doc and log? or should we just 
keep it as execution date?



##########
airflow/models/dag.py:
##########
@@ -2439,24 +2403,24 @@ def _get_or_create_dagrun(
     :param dag: DAG to be used to find run.
     :param conf: Configuration to pass to newly created run.
     :param start_date: Start date of new run.
-    :param execution_date: Logical date for finding an existing run.
+    :param logical_date: Logical date for finding an existing run.

Review Comment:
   Looks like we're changing the docstring here. We probably should change 
execution date to logical date elsewhere?



##########
airflow/models/log.py:
##########
@@ -73,7 +73,7 @@ def __init__(
         if task_instance:
             self.dag_id = task_instance.dag_id
             self.task_id = task_instance.task_id
-            if execution_date := getattr(task_instance, "execution_date", 
None):
+            if execution_date := getattr(task_instance, "logical_date", None):
                 self.execution_date = execution_date

Review Comment:
   Do we need to change it as logical_date?



##########
airflow/www/views.py:
##########
@@ -221,16 +221,16 @@ def get_safe_url(url):
 
 def get_date_time_num_runs_dag_runs_form_data(www_request, session, dag):
     """Get Execution Data, Base Date & Number of runs from a Request."""
-    date_time = www_request.args.get("execution_date")
+    date_time = www_request.args.get("logical_date")
     run_id = www_request.args.get("run_id")
-    # First check run id, then check execution date, if not fall back on the 
latest dagrun
+    # First check run id, then check logical_date date, if not fall back on 
the latest dagrun

Review Comment:
   ```suggestion
       # First check run id, then check logical_date, if not fall back on the 
latest dagrun
   ```



##########
airflow/www/views.py:
##########
@@ -1654,10 +1666,10 @@ def get_logs_with_metadata(self, session: Session = 
NEW_SESSION):
 
         # Convert string datetime into actual datetime
         try:
-            execution_date = timezone.parse(execution_date_str, strict=True)
+            logical_date = timezone.parse(logical_date_str, strict=True)
         except ValueError:
             error_message = (
-                f"Given execution date {execution_date_str!r} could not be 
identified as a date. "
+                f"Given execution date {logical_date_str!r} could not be 
identified as a date. "

Review Comment:
   ```suggestion
                   f"Given logical date {logical_date_str!r} could not be 
identified as a date. "
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to