dstandish commented on code in PR #30255:
URL: https://github.com/apache/airflow/pull/30255#discussion_r1147109642
##########
airflow/cli/commands/standalone_command.py:
##########
@@ -235,13 +236,13 @@ def port_open(self, port):
return False
return True
- def job_running(self, job):
+ def job_running(self, job_runner_class: type[BaseJobRunner]):
"""
Checks if the given job name is running and heartbeating correctly.
Used to tell if scheduler is alive.
"""
- recent = job.most_recent_job()
+ recent = job_runner_class.most_recent_job()
Review Comment:
i do not mean to nit pick or comment on trivial things here but... just
curious i guess...
this method (most_recent_job) does feel like it wants to be a classmethod on
BaseJob. Why not leave it there? I guess you could do
BaseJob.most_recent_job(job_type=job_runner_class.job_type) or something
anyway, just a thought, since i'm here :)
##########
airflow/jobs/local_task_job.py:
##########
@@ -204,6 +204,10 @@ def sigusr2_debug_handler(signum, frame):
finally:
self.on_kill()
+ @staticmethod
+ def get_job_type() -> str:
+ return "LocalTaskJob"
Review Comment:
curious why method instead of class attr
##########
airflow/cli/commands/standalone_command.py:
##########
@@ -30,8 +30,9 @@
from airflow.configuration import AIRFLOW_HOME, conf,
make_group_other_inaccessible
from airflow.executors import executor_constants
from airflow.executors.executor_loader import ExecutorLoader
-from airflow.jobs.scheduler_job import SchedulerJob
-from airflow.jobs.triggerer_job import TriggererJob
+from airflow.jobs.job_runner import BaseJobRunner
+from airflow.jobs.scheduler_job import SchedulerJobRunner
Review Comment:
the file naming is tiny bit inconsistent since BaseJobRunner is in
job_runner but SchedulerJobRunner is in scheduler_job
##########
airflow/task/task_runner/base_task_runner.py:
##########
@@ -46,18 +49,32 @@ class BaseTaskRunner(LoggingMixin):
Invoke the `airflow tasks run` command with raw mode enabled in a
subprocess.
- :param local_task_job: The local task job associated with running the
- associated task instance.
+ :param base_job: The job associated with running the associated task
instance. The job_runner for it
+ should be LocalTaskJobRunner
+ :param job_runner: The job runner associated with running the associated
task instance. This parameter
+ is only used in case base_job is BaseJobPydantic instance - i.e. in
case of db-less mode of
+ Airflow workers. This means that in case you have your own custom
task runner, you should
+ adopt it to handle both BaseJob (with runner being part of the job)
and
+ BaseJobPydantic (with runner passed as a separate parameter). For
non-db-less-mode, the
+ runner is always part of the job, so custom task runners should be
backwards-compatible.
"""
- def __init__(self, local_task_job):
+ def __init__(self, base_job: BaseJob | BaseJobPydantic, job_runner:
LocalTaskJobRunner | None = None):
+ if hasattr(base_job, "job_runner") and job_runner is None:
Review Comment:
are we not able to guarantee that all calls provide job_runner if one exists?
if we _can_ guarantee it, would be nice to chop the logic
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]