potiuk commented on PR #30255:
URL: https://github.com/apache/airflow/pull/30255#issuecomment-1481539324

   Hello Everyone, I've been working on it on-off for quite a while as I 
attempted to make things work for the AIP-44. I actually attempted to do in a 
few different ways and I did not like the previous ones. Ths one I think is at 
the sweet-spot of solving the (a little) intertwined BaseJob dependencies with 
the need of decoupling of the Task running from the ORM models..
   
   ## A little context on that one.
   
   LocalTaskJob as we have it curently implemented inherits from BaseJob (same 
as other jobs). It is in fact a polimorphic dependency - all of the jobs are 
stored in the same 'BaseJob' table. This is (and always has been) a little 
problematic - because the ''Job" objects inherit from the ORM object and there 
is an assumption that they are DB-related, on the other had they also had the 
"running" logic implemented in `_execute` method of the same ORM-derived 
objects. That made it rather dificult to split out the logic from database - 
for TriggererJob, LocalTaskJob and DagFileProcessorJob especially.
   
   I attempted to do it in various ways but I had the goals:
   
   a) the resulting architecture wil decouple from the ORM object from the 
logic (so that we could have serialized Pydantic objects introduced in #29776  
used instead (so basically we should be able to pass and use BaseJob and 
BaseJobPydantic around) 
   
   b) it shoud touch as little logic change as possible (basically shuffling 
around objects and calling different objects was most of the changes I wanted 
to do) - so that it will be easy to review and reason about.
   
   c) the resulting architecture will make sense
   
   ## Result
   
   I think I finally achieved all three goals. Summarizing of what has been 
done here:
   
   * I renamed the *Jobs (LocalTaskJob, SchedulerJob etc. to JobRunners which 
are NOT ORM objects - the whole logic is kept in there. I am just passing the 
job (BaseJob or BaseJobPydantic) as property of those JobRunners, and I changed 
all the calls to the internal job fields ot call that job property.
   
   * where needed (heartbeat mostly) I extracted the logic of updating the job 
so that it will save changed heartbeat to database even if JobBasePydantic is 
used
   
   * we do not have any longer polimorphic BaseJob - BaseJob is just regular 
ORM class that keeps records for all the runners - uses the same job "types" as 
before so it is backwards-compatible - but simpler. All the places where 
polimorphism was used have been updated.
   
   * I think the resulting architecture is quite a bit easier to reason about 
as it effectively allows to decouple DB operations to manage state and actual 
job done by a running task.
   
   ## Current state
   
   It looks like a huge change but if you look closly **most** of the changes 
are changes in tests to adapt to the new object hierarchy. So I hope the review 
will not be that difficult.
   
   I still have a few (heartbeat) tests not passing and I am working on those 
(likely something missing in heartbeat processing) - but other than those, I 
think everything else is in place. 
   
   ## Future
   
   Now - this is not YET AIP-44-compatible change. This refactor is just a 
basic decoupling. 
   
   We will need to implement several other follow up changes after this one is 
merged:
   
   * separate DB/non-DB operations and  turn the BaseJob's run() into a 
standalone method that will be possible to run with/without DB
   * (possibly) move the JobRunners to a separate package (or rename modules) 
to not confuse them with `BaseJob` - they are quite a different beast
   * decorate the DB methods with `@internal_api` calls to make them AIP-44 
ready
   
   ## Follow ups
   
   I am not sure of that but those changes also make it possible to do 
something else. Namely they allow us to limit for how long connections are 
opened from running tasks. Previously we kept them open all the time when the 
task was running and that was kinda strange as in most circumstances we only 
needed it to do some initial setup, heartbeat and save job state when complete. 
I **think** this change will enable something else (but that's something to see 
when the other changes are completed - we could optimize that away and 
(mimicking what Internal API will be doing) we could only get the 
session/connections established for short times by the running task. I hope we 
can get there.
   
   Looking forward to comments and feedback.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to