amoghrajesh commented on code in PR #55292:
URL: https://github.com/apache/airflow/pull/55292#discussion_r2325007951


##########
task-sdk/src/airflow/sdk/definitions/dag.py:
##########
@@ -1319,8 +1328,18 @@ def _run_task(*, ti, task, run_triggerer=False):
     Bypasses a lot of extra steps used in `task.run` to keep our local running 
as fast as
     possible.  This function is only meant for the `dag.test` function as a 
helper function.
     """
+    from airflow.sdk import TaskInstanceState
     from airflow.sdk.module_loading import import_string
-    from airflow.utils.state import State
+
+    FINISHED_STATES = frozenset(
+        [
+            TaskInstanceState.SUCCESS,
+            TaskInstanceState.FAILED,
+            TaskInstanceState.SKIPPED,
+            TaskInstanceState.UPSTREAM_FAILED,
+            TaskInstanceState.REMOVED,
+        ]
+    )

Review Comment:
   Why twice?



##########
task-sdk/src/airflow/sdk/definitions/dag.py:
##########
@@ -1135,9 +1135,18 @@ def test(
         from airflow.sdk import DagRunState, TaskInstanceState, timezone
         from airflow.secrets.local_filesystem import LocalFilesystemBackend
         from airflow.serialization.serialized_objects import SerializedDAG
-        from airflow.utils.state import State
         from airflow.utils.types import DagRunTriggeredByType, DagRunType
 
+        FINISHED_STATES = frozenset(
+            [
+                TaskInstanceState.SUCCESS,
+                TaskInstanceState.FAILED,
+                TaskInstanceState.SKIPPED,
+                TaskInstanceState.UPSTREAM_FAILED,
+                TaskInstanceState.REMOVED,
+            ]
+        )
+
         if TYPE_CHECKING:
             from airflow.models.taskinstance import TaskInstance

Review Comment:
   This doesn't seem to be correct



##########
task-sdk/src/airflow/sdk/definitions/dag.py:
##########
@@ -1369,14 +1388,14 @@ def _run_task(*, ti, task, run_triggerer=False):
 
                 # Set the state to SCHEDULED so that the task can be resumed.
                 with create_session() as session:
-                    ti.state = State.SCHEDULED
+                    ti.state = TaskInstanceState.SCHEDULED
                     session.add(ti)
 
             return taskrun_result
         except Exception:
             log.exception("[DAG TEST] Error running task %s", ti)
-            if ti.state not in State.finished:
-                ti.set_state(State.FAILED)
+            if ti.state not in FINISHED_STATES:
+                ti.set_state(TaskInstanceState.FAILED)

Review Comment:
   The fact that it is used in `_run_task` might be a good indication to place 
it in _generated.py, so that the api schema has it and it can be used, but that 
can come later if / when we decide to split `State` appropriately.



##########
task-sdk/src/airflow/sdk/definitions/dag.py:
##########
@@ -1135,9 +1135,18 @@ def test(
         from airflow.sdk import DagRunState, TaskInstanceState, timezone
         from airflow.secrets.local_filesystem import LocalFilesystemBackend
         from airflow.serialization.serialized_objects import SerializedDAG
-        from airflow.utils.state import State
         from airflow.utils.types import DagRunTriggeredByType, DagRunType
 
+        FINISHED_STATES = frozenset(
+            [
+                TaskInstanceState.SUCCESS,
+                TaskInstanceState.FAILED,
+                TaskInstanceState.SKIPPED,
+                TaskInstanceState.UPSTREAM_FAILED,
+                TaskInstanceState.REMOVED,
+            ]
+        )

Review Comment:
   This doesn't feel to be the correct place for this but I do not have a 
counter proposal now.
   
   cc @ashb or @kaxil do you have a better thought in mind?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to