potiuk commented on code in PR #30376:
URL: https://github.com/apache/airflow/pull/30376#discussion_r1154841883


##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -121,22 +121,26 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
     job_type = "SchedulerJob"
     heartrate: int = conf.getint("scheduler", "SCHEDULER_HEARTBEAT_SEC")
 
-    job: BaseJob  # scheduler can only run with BaseJob class not the Pydantic 
serialized version
-
     def __init__(
         self,
+        job: BaseJob,
         subdir: str = settings.DAGS_FOLDER,
         num_runs: int = conf.getint("scheduler", "num_runs"),
         num_times_parse_dags: int = -1,
         scheduler_idle_sleep_time: float = conf.getfloat("scheduler", 
"scheduler_idle_sleep_time"),
         do_pickle: bool = False,
         log: logging.Logger | None = None,
         processor_poll_interval: float | None = None,
-        *args,
-        **kwargs,
     ):
+        super().__init__()
+        if job.job_type and job.job_type != self.job_type:
+            raise Exception(

Review Comment:
   Not really. This is a misconception. AirlfowException is a base for other 
"meaningful" errors (AirlfowSkip, AirflowFailed) but other than that it has no 
practical meaning and it does not matter if AirflowException or Exceptoin is 
called., But you are right - we should be more specific here. I would be 
inclined to raise ValueError https://docs.python.org/3/library/exceptions.html  
here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to