jedcunningham commented on code in PR #31735:
URL: https://github.com/apache/airflow/pull/31735#discussion_r1222095941


##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1699,7 +1699,7 @@ def _find_zombies(self) -> None:
                     .where(TI.state == TaskInstanceState.RUNNING)
                     .where(
                         or_(
-                            Job.state != State.RUNNING,
+                            Job.state != TaskInstanceState.RUNNING,

Review Comment:
   And here.



##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1592,10 +1592,10 @@ def adopt_or_reset_orphaned_tasks(self, session: 
Session = NEW_SESSION) -> int:
                         update(Job)
                         .where(
                             Job.job_type == "SchedulerJob",
-                            Job.state == State.RUNNING,
+                            Job.state == TaskInstanceState.RUNNING,
                             Job.latest_heartbeat < (timezone.utcnow() - 
timedelta(seconds=timeout)),
                         )
-                        .values(state=State.FAILED)
+                        .values(state=TaskInstanceState.FAILED)

Review Comment:
   And here.



##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1592,10 +1592,10 @@ def adopt_or_reset_orphaned_tasks(self, session: 
Session = NEW_SESSION) -> int:
                         update(Job)
                         .where(
                             Job.job_type == "SchedulerJob",
-                            Job.state == State.RUNNING,
+                            Job.state == TaskInstanceState.RUNNING,

Review Comment:
   Another that shouldn't use TIState.



##########
airflow/cli/commands/jobs_command.py:
##########
@@ -32,7 +32,11 @@ def check(args, session: Session = NEW_SESSION) -> None:
     if args.hostname and args.local:
         raise SystemExit("You can't use --hostname and --local at the same 
time")
 
-    query = session.query(Job).filter(Job.state == 
State.RUNNING).order_by(Job.latest_heartbeat.desc())
+    query = (
+        session.query(Job)
+        .filter(Job.state == TaskInstanceState.RUNNING)

Review Comment:
   Yeah, I don't think we should use TIState here. Either keep it State, or a 
new JobState, good with either.



##########
airflow/sentry.py:
##########
@@ -143,7 +143,7 @@ def add_breadcrumbs(self, task_instance, session=None):
                 return
             dr = task_instance.get_dagrun(session)
             task_instances = dr.get_task_instances(
-                state={State.SUCCESS, State.FAILED},
+                state={DagRunState.SUCCESS, DagRunState.FAILED},

Review Comment:
   These should be TIStates instead.



##########
airflow/kubernetes/pod_launcher_deprecated.py:
##########
@@ -306,15 +306,15 @@ def process_status(self, job_id, status):
         """Process status information for the job."""
         status = status.lower()
         if status == PodStatus.PENDING:
-            return State.QUEUED
+            return TaskInstanceState.QUEUED
         elif status == PodStatus.FAILED:
             self.log.error("Event with job id %s Failed", job_id)
-            return State.FAILED
+            return TaskInstanceState.FAILED
         elif status == PodStatus.SUCCEEDED:
             self.log.info("Event with job id %s Succeeded", job_id)
-            return State.SUCCESS
+            return TaskInstanceState.SUCCESS

Review Comment:
   I'm not so sure about this section, I'll have to dig a little deeper. But 
using the pod state being succeeded to assume TIState was success isn't right, 
TIState could have also failed!



##########
airflow/kubernetes/pod_launcher_deprecated.py:
##########
@@ -306,15 +306,15 @@ def process_status(self, job_id, status):
         """Process status information for the job."""
         status = status.lower()
         if status == PodStatus.PENDING:
-            return State.QUEUED
+            return TaskInstanceState.QUEUED
         elif status == PodStatus.FAILED:
             self.log.error("Event with job id %s Failed", job_id)
-            return State.FAILED
+            return TaskInstanceState.FAILED
         elif status == PodStatus.SUCCEEDED:
             self.log.info("Event with job id %s Succeeded", job_id)
-            return State.SUCCESS
+            return TaskInstanceState.SUCCESS

Review Comment:
   Same concerns in a lot of this file, to be honest.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to