potiuk commented on PR #30255: URL: https://github.com/apache/airflow/pull/30255#issuecomment-1481539324
Hello Everyone, I've been working on it on-off for quite a while as I attempted to make things work for the AIP-44. I actually attempted to do in a few different ways and I did not like the previous ones. Ths one I think is at the sweet-spot of solving the (a little) intertwined BaseJob dependencies with the need of decoupling of the Task running from the ORM models.. ## A little context on that one. LocalTaskJob as we have it curently implemented inherits from BaseJob (same as other jobs). It is in fact a polimorphic dependency - all of the jobs are stored in the same 'BaseJob' table. This is (and always has been) a little problematic - because the ''Job" objects inherit from the ORM object and there is an assumption that they are DB-related, on the other had they also had the "running" logic implemented in `_execute` method of the same ORM-derived objects. That made it rather dificult to split out the logic from database - for TriggererJob, LocalTaskJob and DagFileProcessorJob especially. I attempted to do it in various ways but I had the goals: a) the resulting architecture wil decouple from the ORM object from the logic (so that we could have serialized Pydantic objects introduced in #29776 used instead (so basically we should be able to pass and use BaseJob and BaseJobPydantic around) b) it shoud touch as little logic change as possible (basically shuffling around objects and calling different objects was most of the changes I wanted to do) - so that it will be easy to review and reason about. c) the resulting architecture will make sense ## Result I think I finally achieved all three goals. Summarizing of what has been done here: * I renamed the *Jobs (LocalTaskJob, SchedulerJob etc. to JobRunners which are NOT ORM objects - the whole logic is kept in there. I am just passing the job (BaseJob or BaseJobPydantic) as property of those JobRunners, and I changed all the calls to the internal job fields ot call that job property. * where needed (heartbeat mostly) I extracted the logic of updating the job so that it will save changed heartbeat to database even if JobBasePydantic is used * we do not have any longer polimorphic BaseJob - BaseJob is just regular ORM class that keeps records for all the runners - uses the same job "types" as before so it is backwards-compatible - but simpler. All the places where polimorphism was used have been updated. * I think the resulting architecture is quite a bit easier to reason about as it effectively allows to decouple DB operations to manage state and actual job done by a running task. ## Current state It looks like a huge change but if you look closly **most** of the changes are changes in tests to adapt to the new object hierarchy. So I hope the review will not be that difficult. I still have a few (heartbeat) tests not passing and I am working on those (likely something missing in heartbeat processing) - but other than those, I think everything else is in place. ## Future Now - this is not YET AIP-44-compatible change. This refactor is just a basic decoupling. We will need to implement several other follow up changes after this one is merged: * separate DB/non-DB operations and turn the BaseJob's run() into a standalone method that will be possible to run with/without DB * (possibly) move the JobRunners to a separate package (or rename modules) to not confuse them with `BaseJob` - they are quite a different beast * decorate the DB methods with `@internal_api` calls to make them AIP-44 ready ## Follow ups I am not sure of that but those changes also make it possible to do something else. Namely they allow us to limit for how long connections are opened from running tasks. Previously we kept them open all the time when the task was running and that was kinda strange as in most circumstances we only needed it to do some initial setup, heartbeat and save job state when complete. I **think** this change will enable something else (but that's something to see when the other changes are completed - we could optimize that away and (mimicking what Internal API will be doing) we could only get the session/connections established for short times by the running task. I hope we can get there. Looking forward to comments and feedback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
